nature 论文对应的源代码 2015年的代码 lua语言
pytorch 官方示例
代码结构
replay memory
DQN algorithm
input extraction (预处理数据
training:
- prepare
① 参数和网络初始化
② epsilon-greedy 动作选择
③ 可视化
④ 模型优化 - practical training
代码逐行理解
env = gym.make('SpaceInvaders-v0').unwrapped
#gym.make('xxx')获得的是包装后的类,增加了约束条件,而env.unwrapped可以得到最原始的类,该类不受step次数限制等约束
# if gpu is to be used
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
Replay Memory
Transition = namedtuple('Transition',
('state', 'action', 'next_state', 'reward'))
# namedtuple 具名元组
# Transition 是元组名称
# 'state', 'action', 'next_state', 'reward' 是元组中的元素的名称
#eg:
# t = Transition(s1,a1,n_s1, r1);
# print(t);
# output:
#Transition(state=s1, action=a1, next_state=n_s1, reward=r1)
class ReplayMemory(object):
def __init__(self, capacity):
self.memory = deque([], maxlen=capacity) #初始化队列,运用deque函数,先进先出,队列中一致保持最新的 maxlen 个元素
def push(self, *args): #输入数据
self.memory.append(Transition(*args)) #push一个transition的对象进去
def sample(self, batch_size): #随机取样
return random.sample(self.memory, batch_size)
def __len__(self): #返回长度大小
return len(self.memory)
DQN algorithm
class DQN(nn.Module):
def __init__(self, h, w, outputs):# 图片高h宽w,outputs输出个数,和action个数相同
super(DQN, self).__init__()
self.conv1 = nn.Conv2d(4, 32, kernel_size=8, stride=4) # 第一个卷积层,4个特征输入,32个特征输出,卷积核大小为8,步长为4
self.bn1 = nn.BatchNorm2d(32)
self.conv2 = nn.Conv2d(32, 64, kernel_size=4, stride=2)
self.bn2 = nn.BatchNorm2d(64)
self.conv3 = nn.Conv2d(64, 64, kernel_size=3, stride=1)
self.bn3 = nn.BatchNorm2d(64)
def conv2d_size_out(size, kernel_size, stride):
return (size - (kernel_size - 1) - 1) // stride + 1 # 利用公式计算经过卷积核后的图像大小,公式示意图在贴在最末
convw = conv2d_size_out(conv2d_size_out(conv2d_size_out(w, 8, 4), 4, 2), 3, 1)
convh = conv2d_size_out(conv2d_size_out(conv2d_size_out(h, 8, 4), 4, 2), 3, 1)
linear_input_size = convw * convh * 64 # 每张图片的w*h*输出的图片张数64,得到输入到全连接层的元素的数量
self.l1 = nn.Linear(linear_input_size, 512)
self.l2 = nn.Linear(512, outputs)
def forward(self, x):
x = x.to(device) #放到相应的环境中 cpu/gpu
x = F.relu(self.bn1(self.conv1(x))) #一层卷积+BatchNorm+relu激活函数
x = F.relu(self.bn2(self.conv2(x)))
x = F.relu(self.bn3(self.conv3(x)))
x = F.relu(self.l1(x.view(x.size(0), -1))) # view函数相当于resize函数,改变张量维度,注意保持元素总数不变
return self.l2(x.view(-1, 512))
经过forward,层层处理的维度变化
Input extraction
# 游戏截图数据预处理
resize = T.Compose([T.ToPILImage(), # 张量转换为PILImage的形式
T.Grayscale(num_output_channels=1), # 对图片做灰度化处理
T.Resize((84, 84), interpolation=InterpolationMode.BICUBIC),
T.ToTensor()]) # PILImage数据重新转换为张量形式
def get_screen():
# Transpose it into torch order (CHW).
screen = env.render(mode='rgb_array').transpose((2, 0, 1))
# 经过transpose函数,调整维度,改变了数组的连续性,需要通过np.ascontiguousarray函数按行排列的方式重构数组排列。(不进行该操作,使用数组时可能会报错)
# 因为数据类型选择float,所以通过/255控制数据范围在0~1之间。
# 如果数据类型选择int,那么对于Image类型数据范围应该在0-255之间。
screen = np.ascontiguousarray(screen, dtype=np.float32) / 255
# 定义dtype是浮点类型,给PILImage传递的参数必须在0到1之间,所以“/255”。如果是int型那么给PILImage传递的参数必须在0到255之间
screen = torch.from_numpy(screen) # 转换成张量的类型
# Resize, and add a batch dimension (BCHW)
return resize(screen).unsqueeze(0) # 增加一个维度,代表批次
# eg: (4,84,84)-->(1,4,84,84)
# 当把一个batch的数据打包在一起,则变为(32,4,84,84)
Training
# 参数和网络初始化
BATCH_SIZE = 32 # batch的大小
GAMMA = 0.99 # DQN 公式中的折扣因子
EPS_START = 1.0 # epsilon-greedy 开始时的数值
EPS_END = 0.1
EPS_DECAY = 10000 # epsilon的衰减度,数字越大衰减的越慢
TARGET_UPDATE = 10 # target network滞后于policy network 更新的步数,即policy network每更新10次,target network更新一次
#网络初始化
init_screen = get_screen() # 得到游戏截图,得到截图的宽,高
_, _, screen_height, screen_width = init_screen.shape
# Get number of actions from gym action space
n_actions = env.action_space.n # 获得action的个数
policy_net = DQN(screen_height, screen_width, n_actions).to(device)
target_net = DQN(screen_height, screen_width, n_actions).to(device)
target_net.load_state_dict(policy_net.state_dict())
target_net.eval() # evaluation 在验证集用来固定dropout? 把target网络改为测试模式,不在计算其梯度?
optimizer = optim.RMSprop(policy_net.parameters()) # 定义优化器
memory = ReplayMemory(100000)
steps_done = 0
def select_action(state):
global steps_done # 参与epsilon参数的衰减,随着step_done增加,epsilon减小
sample = random.random() # 产生0-1之间的随机数
eps_threshold = EPS_END + (EPS_START - EPS_END) * \
math.exp(-1. * steps_done / EPS_DECAY)
steps_done += 1
# 判断 随机数与epsilon的大小,以决定探索还是经验
# 0到epsilon 选择探索
if sample > eps_threshold:
with torch.no_grad():
return policy_net(state).max(1)[1].view(1, 1)
else:
return torch.tensor([[random.randrange(n_actions)]], device=device, dtype=torch.long)
# max(1)[1]:获取输出数据中最大的数据
# 可视化部分
episode_durations = []
def plot_durations():
plt.figure(1) # 创建figure 标号1
plt.clf() # 清除画布,使其空白
durations_t = torch.tensor(episode_durations, dtype=torch.float) # 将数组进行tensor转化
plt.title('Training...')
plt.xlabel('Episode')
plt.ylabel('Duration')
plt.plot(durations_t.numpy())
# Take 100 episode averages and plot them too
# 每100个episode算一次均值
if len(durations_t) >= 100:
means = durations_t.unfold(0, 100, 1).mean(1).view(-1) # unfold 切片函数,Torch.Tensor.unfold(dimension待切片维度,size大小,step步长)
# eg: x=tensor([1,2,3,4,5]);x.unfold(0,2,1)=[[1,2],[2,3],[3,4],[4,5]]
means = torch.cat((torch.zeros(99), means)) # 保证数组连贯性,episode不到100时,其他位置补0
plt.plot(means.numpy())
plt.pause(0.001) # pause a bit so that plots are updated
# 每0.001s更新一次图标
# 优化器,模型优化
def optimize_model():
if len(memory) < BATCH_SIZE: # 进程刚开始,数据不够一个批次
return # 等数据够一个批次了,才开始往下走
transitions = memory.sample(BATCH_SIZE) # 从memory池子中随机采样一个batch的数据
batch = Transition(*zip(*transitions)) # 与前相同,处理成四列数据 (state列 action列 next state列 reward列)放到batch里
# Compute a mask of non-final states and concatenate the batch elements
# (a final state would've been the one after which simulation ended)
# 掩码:计算公式中的 yj
non_final_mask = torch.tensor(tuple(map(lambda s: s is not None, batch.next_state)),
device=device, dtype=torch.bool) # 判断当前状态是否终结
non_final_next_states = torch.cat([s for s in batch.next_state if s is not None]) # 找到那些下一步不是none的state
state_batch = torch.cat(batch.state) # 把一个batch中的state拼接起来,拼接后维度变为[32,4,84,84]
action_batch = torch.cat(batch.action)
reward_batch = torch.cat(batch.reward)
state_action_values = policy_net(state_batch).gather(1, action_batch) # 获得tensor中每一组当前action所对应的value (函数示例图片在最后 video 1'26''56)
next_state_values = torch.zeros(BATCH_SIZE, device=device) # 将targetnet中计算的Q估初始化为0
next_state_values[non_final_mask] = target_net(non_final_next_states).max(1)[0].detach() # 挑选出那些不是终结状态的state,去计算他们targetnet里nextstate的 Q value 即Q估
expected_state_action_values = (next_state_values * GAMMA) + reward_batch # 计算Q估*gamma+r得到yj
# Compute Huber loss
criterion = nn.MSELoss() # 均方差处理
loss = criterion(state_action_values, expected_state_action_values.unsqueeze(1)) # 为保证维度一致,又增加了一个维度
# Optimize the model 反向梯度下降
optimizer.zero_grad()
loss.backward()
for param in policy_net.parameters():
param.grad.data.clamp_(-1, 1) # 梯度裁剪,让policynet里的参数做截断,截断在[-1,1]
# 为适应激活函数的定义域,大于1或小于-1时会梯度消失/爆炸
optimizer.step()
Start Training
def random_start(skip_steps=30, m=4):
env.reset() # 重新启动游戏
state_queue = deque([], maxlen=m) # 定义state和nextstate队列,通过deque函数,保证队列中的图片都是最新的四帧图片
next_state_queue = deque([], maxlen=m)
done = False # 如果在随机训练时游戏已经结束
for i in range(skip_steps):
if (i+1) <= m:
state_queue.append(get_screen()) # 循环的前四次,直接将截图放进state队列
elif m < (i + 1) <= 2*m:
next_state_queue.append(get_screen()) # 首次循环四次后,再接下来的四帧放到next state队列中
else:
state_queue.append(next_state_queue[0]) # 将next state的第一帧添加到state的最后一帧
next_state_queue.append(get_screen()) # 将再获得的新帧添加到next state
action = env.action_space.sample() # 在随机训练时间中,action的选择随机
_, _, done, _ = env.step(action)
if done:
break
return done, state_queue, next_state_queue
# Start Training
num_episodes = 10000 # 设定最大训练次数
m = 4 # 输入的截图,一组是四帧
for i_episode in range(num_episodes):
# Initialize the environment and state
done, state_queue, next_state_queue = random_start() # 随机开始训练时间,比如随意执行30帧后再开始训练,可以丰富训练数据
if done: # 随机训练时游戏已经结束,则本轮训练结束
continue
state = torch.cat(tuple(state_queue), dim=1) # 将state queue队列打包在一起,拼接成一个输入的state状态
for t in count(): # 计算执行了多少步,计数功能
reward = 0
m_reward = 0 # 每m帧的reward之和
# 每m帧完成一次action
action = select_action(state) # 将state放入select_action函数中,选择一个action
# 作者每m帧只选择一个action,这m帧中一直执行这一个action
for i in range(m):
_, reward, done, _ = env.step(action.item())
if not done:
next_state_queue.append(get_screen()) # 更新next state中的截图
else:
break
m_reward += reward
if not done:
next_state = torch.cat(tuple(next_state_queue), dim=1)
else:
next_state = None
m_reward = -150 # 游戏结束了,reward给负值,惩罚项
m_reward = torch.tensor([m_reward], device=device)
memory.push(state, action, next_state, m_reward)
state = next_state
optimize_model()
if done: # 判断游戏是否结束
episode_durations.append(t + 1) # 查看执行了多少步,并存到数组中
plot_durations() # 可视化步数
break
# Update the target network, copying all weights and biases in DQN
if i_episode % TARGET_UPDATE == 0: # 每执行 TARGET_UPDATE 个轮次,targetnet更新一次数值
target_net.load_state_dict(policy_net.state_dict())
torch.save(policy_net.state_dict(), 'weights/policy_net_weights_{0}.pth'.format(i_episode)) # 每次更新targetnet时,保留一份参数文件
print('Complete')
env.close()
torch.save(policy_net.state_dict(), 'weights/policy_net_weights.pth') # 保存最终参数文件
evaluation的部分的代码
关于卷积神经网络的图片解析
关于 state_action_values 函数的图片解析
原文地址:http://www.cnblogs.com/mario24678/p/16790836.html
1. 本站所有资源来源于用户上传和网络,如有侵权请邮件联系站长!
2. 分享目的仅供大家学习和交流,请务用于商业用途!
3. 如果你也有好源码或者教程,可以到用户中心发布,分享有积分奖励和额外收入!
4. 本站提供的源码、模板、插件等等其他资源,都不包含技术服务请大家谅解!
5. 如有链接无法下载、失效或广告,请联系管理员处理!
6. 本站资源售价只是赞助,收取费用仅维持本站的日常运营所需!
7. 如遇到加密压缩包,默认解压密码为"gltf",如遇到无法解压的请联系管理员!
8. 因为资源和程序源码均为可复制品,所以不支持任何理由的退款兑现,请斟酌后支付下载
声明:如果标题没有注明"已测试"或者"测试可用"等字样的资源源码均未经过站长测试.特别注意没有标注的源码不保证任何可用性