强化学习入门第一周

内容包括ε-greedy策略,Q-learning,DQN的概念介绍、公式、以及简单的代码实现。

表格RL

ε-greedy策略:(多臂老虎机举例)

增量式更新公式:

对某个拉杆 $a$:

  • 第$n$次选择动作$a$得到的奖励记为:$R_n(a)$

  • 之前对该动作的奖励期望估计为:$Q_{n−1}(a)$

  • 选择了第$n$次后,计数器更新:$N_n(a)=N_{n−1}(a)+1$

    增量公式:

    $Q_n(a) = Q_{n-1}(a) + \frac{1}{N_n(a)}(R_n(a)-Q_{n-1}(a))$

ε-greedy 策略

基本思想是:

  • 以概率$1-ε$选择当前估计期望奖励最大的拉杆(利用)
  • 以概率 $ε$ 在所有拉杆中随机选择一个(探索)

ε-greedy 的动作选择规则为:

$ A_t= \begin{cases}
argmaxQ_t(a),\quad 以概率1-ε \
所有拉杆中等概率随机选一个,\quad 以概率ε
\end{cases}$

Q-learning算法

Q-learning 的更新公式

$Q(s_t,a_t)←Q(s_t,a_t)+α(r_t+γmaxQ(s_{t+1},a′)−Q(s_t,a_t))$

其中:

  • $α∈(0,1]$为学习率;
  • $r_t+γmax⁡Q(s_{t+1},a′)$ 被称为 TD 目标(TD target);
  • $δt=r_t+γmax⁡Q(s_{t+1},a′)−Q(s_t,a_t)$ 称为 TD 误差(TD error).

以Q-learning算法,代码实现解决GridWorld

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
import numpy as np

class GridWorld:
def __init__(self, n_rows=4, n_cols=4, terminal_states=None, trap_states=(2, 3)):
self.n_rows = n_rows
self.n_cols = n_cols
self.n_states = n_rows * n_cols

self.terminal_states = terminal_states if terminal_states is not None else [(3, 3)]
self.trap_states = trap_states if trap_states is not None else []

self.reset()

def state_to_id(self, row, col):
return row * self.n_cols + col

def id_to_state(self, state_id):
return divmod(state_id, self.n_cols)

def reset(self):
self.row, self.col = 0, 0
return self.state_to_id(self.row, self.col)

def step(self, action):

if action == 0: # up
next_row = max(self.row - 1, 0)
next_col = self.col
elif action == 1: # right
next_row = self.row
next_col = min(self.col + 1, self.n_cols - 1)
elif action == 2: # down
next_row = min(self.row + 1, self.n_rows - 1)
next_col = self.col
elif action == 3: # left
next_row = self.row
next_col = max(self.col - 1, 0)

self.row, self.col = next_row, next_col
state = (self.row, self.col)
state_id = self.state_to_id(self.row, self.col)


if state in self.terminal_states:
reward = 1.0
done = True
elif state in self.trap_states:
reward = -1.0
done = True
else:
reward = -0.01
done = False

return state_id, reward, done


def epsilon_greedy(Q, state, epsilon, n_actions):
if np.random.rand() < epsilon:

return np.random.randint(n_actions)
else:

return np.argmax(Q[state])

def train_q_learning(env, num_episodes=1000, alpha=0.05, gamma=0.9,
epsilon_start=1.0, epsilon_end=0.1, epsilon_decay=0.995):
n_states = env.n_states
n_actions = 4

Q = np.zeros((n_states, n_actions))
episode_rewards = []

epsilon = epsilon_start

for ep in range(num_episodes):
state = env.reset()
total_reward = 0.0

for step in range(100):
action = epsilon_greedy(Q, state, epsilon, n_actions)
next_state, reward, done = env.step(action)


best_next_Q = np.max(Q[next_state])
td_target = reward + gamma * best_next_Q
td_error = td_target - Q[state, action]
Q[state, action] += alpha * td_error

state = next_state
total_reward += reward

if done:
break

epsilon = max(epsilon_end, epsilon * epsilon_decay)
episode_rewards.append(total_reward)

if (ep + 1) % 100 == 0:
print(f"Episode {ep+1}, total_reward={total_reward:.3f}, epsilon={epsilon:.3f}")

return Q, episode_rewards

env = GridWorld()
Q, rewards = train_q_learning(env)

扩展

SARSA:

on-policy 版本的 Q-learning

  • 更新公式变成:$Q(s_t,a_t)←Q(s_t,a_t)+α(r_t+γQ(s_{t+1},a_{t+1})−Q(s_t,a_t)$

  • 和 Q-learning 的区别:

    Q-learning 用 $max_{a’} Q(s’, a’)$ 假设下一步一定选最优→ off-policy

    SARSA 用实际选出来的下一个动作 a_{t+1}→ on-policy

Monte Carlo(MC)

一条完整 episode 结束之后,用真实 return 更新

  • TD:有偏但更高效、更稳定
  • MC:无偏但方差大

Deep RL(DQN)

神经网络逼近$Q(s,a)$

DQN中定义一个网络输入:状态 $ s$ ,输出:每个动作的 $Q$ 值$ [Q(s,a_1), Q(s,a_2), …, Q(s,a_N)]$

写作$Q(s,a;θ)$

更新目标

与Qlearning形式一样
$$
y=r+γa^′maxQ(s^′,a^′;θ^−)
$$

损失函数是:
$$
L(\theta) = \big(y - Q(s,a;\theta)\big)^2
$$
让当前 Q(s,a; θ) 接近目标 y

用梯度下降更新 θ:
$$
\theta \leftarrow \theta - \eta \nabla_\theta L(\theta)
$$

经验回放

准备一个大容量队列 replay_buffer

每次与环境交互得到 (s, a, r, s', done)buffer.append(...)

训练时:从 buffer 里随机采样一个batch,用这些样本算 TD target,做一次梯度更新。

目标网络 Target Network

在 Q-learning 更新时,target 里有 $max_a’ Q(s’, a’)$;

如果 target 也用“当前正在训练的网络参数”计算,会导致:目标 y 和当前输出 $Q(s,a; θ)$ 同时跟着 θ 变化;

再复制一份网络作为 target_net,参数记为 $θ^⁻$;

训练时,目标:
$$
y=r+γa^′maxQ(s^′,a^′;θ^−)
$$

  • 每步只更新$policy$的 $θ$;

  • 每隔 C 步,把$θ$ 拷贝到 $θ^⁻$:

完整步骤

初始化

  1. 初始化 Q 网络 Q(s,a; θ)(policy_net)
  2. 初始化 target 网络 Q(s,a; θ⁻)(target_net),参数和 θ 相同
  3. 创建一个 replay Buffer(环形队列)
  4. 设置超参数:
    • 学习率 lr
    • 折扣因子 γ
    • ε-greedy:ε_start, ε_end, ε_decay
    • batch_size
    • target_update_freq(多少步同步一次 target_net)

训练循环

对每一步:

  1. 选动作(ε-greedy)

  2. 与环境交互

    得到$ (s, a, r, s_next, done)$,存入 replay buffer

  3. 从 buffer 采样一个 batch:

  4. 计算 target y

    如果 done,则 y = r[i]

    否则 y = r[i] + γ * q_next

  5. 计算当前 $Q(s,a; θ)$

  6. 计算损失 & 反向传播

    loss = MSE(y_batch, q_selected) 或 Huber loss

    loss.backward()

    optimizer.step()optimizer.zero_grad()

  7. 更新 ε

    epsilon = max(ε_end, ε * ε_decay)

  8. 定期更新 target 网络

    每隔 N 步:$θ^⁻ ← θ$

  9. done:重置环境,开始下一个 episode。

基于cartpole-V1代码实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
import gymnasium as gym
import numpy as np
import random
import collections
import torch
import torch.nn as nn
import torch.optim as optim

# 1. Q 网络
class DQN(nn.Module):
def __init__(self, state_dim, action_dim):
super().__init__()
self.net = nn.Sequential(
nn.Linear(state_dim, 128),
nn.ReLU(),
nn.Linear(128, 128),
nn.ReLU(),
nn.Linear(128, action_dim)
)

def forward(self, x):
return self.net(x)

# 2. 简单 Replay Buffer
class ReplayBuffer:
def __init__(self, capacity):
self.buffer = collections.deque(maxlen=capacity)

def push(self, s, a, r, s_next, done):
self.buffer.append((s, a, r, s_next, done))

def sample(self, batch_size):
batch = random.sample(self.buffer, batch_size)
s, a, r, s_next, done = zip(*batch)
return (np.array(s),
np.array(a),
np.array(r, dtype=np.float32),
np.array(s_next),
np.array(done, dtype=np.float32))

def __len__(self):
return len(self.buffer)

env = gym.make("CartPole-v1")
state_dim = env.observation_space.shape[0]
action_dim = env.action_space.n

policy_net = DQN(state_dim, action_dim)
target_net = DQN(state_dim, action_dim)
target_net.load_state_dict(policy_net.state_dict())
target_net.eval()

optimizer = optim.Adam(policy_net.parameters(), lr=1e-3)
buffer = ReplayBuffer(capacity=10000)

gamma = 0.99
batch_size = 64
epsilon_start, epsilon_end, epsilon_decay = 1.0, 0.01, 0.995
epsilon = epsilon_start

target_update_freq = 500
step_count = 0

for episode in range(500):
s = env.reset()[0] if isinstance(env.reset(), tuple) else env.reset()
total_reward = 0

for t in range(1000):
step_count += 1

# ε-greedy
if random.random() < epsilon:
a = env.action_space.sample()
else:
with torch.no_grad():
s_tensor = torch.FloatTensor(s).unsqueeze(0)
q_values = policy_net(s_tensor)
a = q_values.argmax(dim=1).item()

s_next, r, terminated, truncated, info = env.step(a)
done = terminated or truncated
total_reward += r

buffer.push(s, a, r, s_next, done)
s = s_next

# 学习
if len(buffer) >= batch_size:
s_b, a_b, r_b, s_next_b, done_b = buffer.sample(batch_size)

s_b = torch.FloatTensor(s_b)
a_b = torch.LongTensor(a_b).unsqueeze(1)
r_b = torch.FloatTensor(r_b).unsqueeze(1)
s_next_b = torch.FloatTensor(s_next_b)
done_b = torch.FloatTensor(done_b).unsqueeze(1)

# 当前 Q(s,a)
q_values = policy_net(s_b).gather(1, a_b)

# 目标 Q
with torch.no_grad():
next_q_values = target_net(s_next_b).max(dim=1, keepdim=True)[0]
y = r_b + gamma * (1 - done_b) * next_q_values

loss = nn.MSELoss()(q_values, y)
optimizer.zero_grad()
loss.backward()
optimizer.step()

# 同步 target 网络
if step_count % target_update_freq == 0:
target_net.load_state_dict(policy_net.state_dict())

# 更新 epsilon
epsilon = max(epsilon_end, epsilon * epsilon_decay)

if done:
print(f"Episode {episode+1}, reward={total_reward}, epsilon={epsilon:.3f}")
break