PyTorch 1.8 チュートリアル : 強化学習 : マリオをプレーする RL エージェントを訓練する (翻訳/解説)
翻訳 : (株)クラスキャット セールスインフォメーション
作成日時 : 04/26/2021 (1.8.1+cu102)
* 本ページは、PyTorch 1.8 Tutorials の以下のページを翻訳した上で適宜、補足説明したものです:
- Reinforcement Learning : Train a Mario-playing RL Agent
* サンプルコードの動作確認はしておりますが、必要な場合には適宜、追加改変しています。
* ご自由にリンクを張って頂いてかまいませんが、sales-info@classcat.com までご一報いただけると嬉しいです。
スケジュールは弊社 公式 Web サイト でご確認頂けます。
- お住まいの地域に関係なく Web ブラウザからご参加頂けます。事前登録 が必要ですのでご注意ください。
- ウェビナー運用には弊社製品「ClassCat® Webinar」を利用しています。
人工知能研究開発支援 | 人工知能研修サービス | テレワーク & オンライン授業を支援 |
PoC(概念実証)を失敗させないための支援 (本支援はセミナーに参加しアンケートに回答した方を対象としています。) |
◆ お問合せ : 本件に関するお問い合わせ先は下記までお願いいたします。
株式会社クラスキャット セールス・マーケティング本部 セールス・インフォメーション |
E-Mail:sales-info@classcat.com ; WebSite: https://www.classcat.com/ ; Facebook |
強化学習 : マリオをプレーする RL エージェントを訓練する
このチュートリアルは深層強化学習の基礎を貴方に一通り説明します。最後には、(DDQN, Double 深層 Q-ネットワーク を利用して) 自身でゲームをプレーできる AI-powered マリオを実装します。
このチュートリアルのために RL の事前知識は必要ありませんが、これらの RL コンセプト に慣れて手引きとしてこの手軽な チートシート を持つことができます。full コードは ここ で利用可能です。
# !pip install gym-super-mario-bros==7.3.0
import torch
from torch import nn
from torchvision import transforms as T
from PIL import Image
import numpy as np
from pathlib import Path
from collections import deque
import random, datetime, os, copy
# Gym is an OpenAI toolkit for RL
import gym
from gym.spaces import Box
from gym.wrappers import FrameStack
# NES Emulator for OpenAI Gym
from nes_py.wrappers import JoypadSpace
# Super Mario environment for OpenAI Gym
import gym_super_mario_bros
RL 定義
- 環境 : エージェントが相互作用してそれから学習する世界。
- アクション $a$ : エージェントが環境にどのように応答するか。総ての可能なアクションのセットはアクション空間と呼ばれます。
- 状態 $s$ : 環境の現在の特質。環境がなり得る総ての可能な状態のセットは状態空間と呼ばれます。
- 報酬 (Reward) $r$ : 報酬は環境からエージェントへのキーとなるフィードバックです。それはエージェントを学習させてその未来のアクションを変更するように駆動させるものです。マルチ時間ステップに渡る報酬の集合は リターン (= Return) と呼ばれます。
- 最適アクション値関数 $(Q^*(s,a)$ : 状態 $s$ から始めて、任意のアクション $a$ を取り、それから各未来の時間ステップに対してリターンを最大化するアクションを取る場合に、期待されるリターンを与えます。$Q$ は状態のアクションの「品質 (= quality)」を表すと言われます。私達はこの関数を近似することを試みます。
環境
環境を初期化する
マリオでは、環境はチューブ (= tubes)、キノコ (= mushrooms) と他のコンポーネントから成ります。
マリオがアクションを行なうとき、環境は変更された (次の) 状態、報酬と他の情報とともに応答します。
# Initialize Super Mario environment
env = gym_super_mario_bros.make("SuperMarioBros-1-1-v0")
# Limit the action-space to
# 0. walk right
# 1. jump right
env = JoypadSpace(env, [["right"], ["right", "A"]])
env.reset()
next_state, reward, done, info = env.step(action=0)
print(f"{next_state.shape},\n {reward},\n {done},\n {info}")
(240, 256, 3), 0, False, {'coins': 0, 'flag_get': False, 'life': 2, 'score': 0, 'stage': 1, 'status': 'small', 'time': 400, 'world': 1, 'x_pos': 40, 'x_pos_screen': 40, 'y_pos': 79}
環境を前処理する
環境データは next_state でエージェントに返されます。上で見たように、各状態は [3, 240, 256] サイズ配列で表されます。しばしばそれはエージェントが必要とするよりも多くの情報です ; 例えば、マリオのアクションはパイプや空の色に依拠しません!
環境データをそれをエージェントに送る前に前処理するために Wrapper を使用します。
GrayScaleObservation は RGB 画像をグレースケールに変換するための一般的なラッパーです ; それを行なうことは有用な情報を失うことなく状態表現のサイズを減じます。今は各状態のサイズは : [1, 240, 256] です。
ResizeObservation は各観測を正方形画像にダウンサンプリングします。新しいサイズは : [1, 84, 84] です。
SkipFrame は gym.Wrapper から継承して step() 関数を実装するカスタム・ラッパーです。連続するフレームはそれほど変化しませんから、n-中間フレームをそれほど多くの情報を失うことなくスキップできます。n-th フレームは各スキップされたフレームに渡り累積された報酬を集積します。
FrameStack は環境の連続するフレームを学習モデルに供給するために単一の観測ポイントにスカッシュすることを可能にするラッパーです。この方法で、前の幾つかのフレーム内の移動の方向に基づいてマリオが着地したかジャンプしたかを識別することができます。
class SkipFrame(gym.Wrapper):
def __init__(self, env, skip):
"""Return only every `skip`-th frame"""
super().__init__(env)
self._skip = skip
def step(self, action):
"""Repeat action, and sum reward"""
total_reward = 0.0
done = False
for i in range(self._skip):
# Accumulate reward and repeat the same action
obs, reward, done, info = self.env.step(action)
total_reward += reward
if done:
break
return obs, total_reward, done, info
class GrayScaleObservation(gym.ObservationWrapper):
def __init__(self, env):
super().__init__(env)
obs_shape = self.observation_space.shape[:2]
self.observation_space = Box(low=0, high=255, shape=obs_shape, dtype=np.uint8)
def permute_orientation(self, observation):
# permute [H, W, C] array to [C, H, W] tensor
observation = np.transpose(observation, (2, 0, 1))
observation = torch.tensor(observation.copy(), dtype=torch.float)
return observation
def observation(self, observation):
observation = self.permute_orientation(observation)
transform = T.Grayscale()
observation = transform(observation)
return observation
class ResizeObservation(gym.ObservationWrapper):
def __init__(self, env, shape):
super().__init__(env)
if isinstance(shape, int):
self.shape = (shape, shape)
else:
self.shape = tuple(shape)
obs_shape = self.shape + self.observation_space.shape[2:]
self.observation_space = Box(low=0, high=255, shape=obs_shape, dtype=np.uint8)
def observation(self, observation):
transforms = T.Compose(
[T.Resize(self.shape), T.Normalize(0, 255)]
)
observation = transforms(observation).squeeze(0)
return observation
# Apply Wrappers to environment
env = SkipFrame(env, skip=4)
env = GrayScaleObservation(env)
env = ResizeObservation(env, shape=84)
env = FrameStack(env, num_stack=4)
環境に上のラッパーを適用した後、最終的なラップされた状態は一緒にスタックされた 4 グレースケール連続フレームから成ります、左側の画像で下で示されるようにです。マリオがアクションを行なう度に、環境はこの構造の状態で応答します。構造はサイズ [4, 84, 84] の 3-D 配列により表されます。
エージェント
ゲームのエージェントを表すためにクラス Mario を作成します。Mario は以下ができるべきです :
- (環境の) 現在の状態に基づいて最適なアクションポリシーに従って 行動する (= act)。
- 経験を 記憶する (= Remember)。Experience = (current state, current action, reward, next state) i.e. 経験 = (現在の状態、現在のアクション、報酬、次の状態)。Mario はアクションポリシーを更新するために経験をキャッシュして後で recall します。
- 時間につれてより良いアクションポリシーを 学習する。
class Mario:
def __init__():
pass
def act(self, state):
"""Given a state, choose an epsilon-greedy action"""
pass
def cache(self, experience):
"""Add the experience to memory"""
pass
def recall(self):
"""Sample experiences from memory"""
pass
def learn(self):
"""Update online action value (Q) function with a batch of experiences"""
pass
以下のセクションでは、Mario のパラメータを投入して関数を定義します。
Act
与えられた状態のために、エージェントは最も最適なアクション (exploit) かランダムアクション (explore) を行なうかを選択できます。
Mario は self.exploration_rate の可能性でランダムに探求します (= explore) ; exploit を選択するとき、彼は最も最適なアクションを提供するために (Learn セクションで実装された) MarioNet に依拠します。
class Mario:
def __init__(self, state_dim, action_dim, save_dir):
self.state_dim = state_dim
self.action_dim = action_dim
self.save_dir = save_dir
self.use_cuda = torch.cuda.is_available()
# Mario's DNN to predict the most optimal action - we implement this in the Learn section
self.net = MarioNet(self.state_dim, self.action_dim).float()
if self.use_cuda:
self.net = self.net.to(device="cuda")
self.exploration_rate = 1
self.exploration_rate_decay = 0.99999975
self.exploration_rate_min = 0.1
self.curr_step = 0
self.save_every = 5e5 # no. of experiences between saving Mario Net
def act(self, state):
"""
Given a state, choose an epsilon-greedy action and update value of step.
Inputs:
state(LazyFrame): A single observation of the current state, dimension is (state_dim)
Outputs:
action_idx (int): An integer representing which action Mario will perform
"""
# EXPLORE
if np.random.rand() < self.exploration_rate:
action_idx = np.random.randint(self.action_dim)
# EXPLOIT
else:
state = state.__array__()
if self.use_cuda:
state = torch.tensor(state).cuda()
else:
state = torch.tensor(state)
state = state.unsqueeze(0)
action_values = self.net(state, model="online")
action_idx = torch.argmax(action_values, axis=1).item()
# decrease exploration_rate
self.exploration_rate *= self.exploration_rate_decay
self.exploration_rate = max(self.exploration_rate_min, self.exploration_rate)
# increment step
self.curr_step += 1
return action_idx
キャッシュと Recall
これらの 2 つの関数はマリオの「メモリ」プロセスとしてサーブします。
cache(): マリオがアクションを遂行するたびに、経験をメモリにストアします。経験は現在の状態、遂行されたアクション、アクションからの報酬、次の状態、そしてゲームが終了したかどうかを含みます。
recall(): マリオはメモリからランダムに経験のバッチをサンプリングして、それをゲームを学習するために利用します。
class Mario(Mario): # subclassing for continuity
def __init__(self, state_dim, action_dim, save_dir):
super().__init__(state_dim, action_dim, save_dir)
self.memory = deque(maxlen=100000)
self.batch_size = 32
def cache(self, state, next_state, action, reward, done):
"""
Store the experience to self.memory (replay buffer)
Inputs:
state (LazyFrame),
next_state (LazyFrame),
action (int),
reward (float),
done(bool))
"""
state = state.__array__()
next_state = next_state.__array__()
if self.use_cuda:
state = torch.tensor(state).cuda()
next_state = torch.tensor(next_state).cuda()
action = torch.tensor([action]).cuda()
reward = torch.tensor([reward]).cuda()
done = torch.tensor([done]).cuda()
else:
state = torch.tensor(state)
next_state = torch.tensor(next_state)
action = torch.tensor([action])
reward = torch.tensor([reward])
done = torch.tensor([done])
self.memory.append((state, next_state, action, reward, done,))
def recall(self):
"""
Retrieve a batch of experiences from memory
"""
batch = random.sample(self.memory, self.batch_size)
state, next_state, action, reward, done = map(torch.stack, zip(*batch))
return state, next_state, action.squeeze(), reward.squeeze(), done.squeeze()
学習する
Mario は内部的には DDQN アルゴリズム を利用しています。DDQN は 2 つの ConvNet - \(Q_{online}\) と \(Q_{target}\) - を利用します、これらは最適アクション値関数を独立に近似します。
私達の実装では、\(Q_{online}\) と \(Q_{target}\) に渡る特徴 generator features を共有しますが、各々のための分離した FC 分類器を維持します。\(\theta_{target}\) (\(Q_{target}\) のパラメータ) は backprop による更新処理を防ぐために凍結されます。代わりに、\(\theta_{online}\) で定期的に同期されます (これについて後で更に)。
ニューラルネットワーク
class MarioNet(nn.Module):
"""mini cnn structure
input -> (conv2d + relu) x 3 -> flatten -> (dense + relu) x 2 -> output
"""
def __init__(self, input_dim, output_dim):
super().__init__()
c, h, w = input_dim
if h != 84:
raise ValueError(f"Expecting input height: 84, got: {h}")
if w != 84:
raise ValueError(f"Expecting input width: 84, got: {w}")
self.online = nn.Sequential(
nn.Conv2d(in_channels=c, out_channels=32, kernel_size=8, stride=4),
nn.ReLU(),
nn.Conv2d(in_channels=32, out_channels=64, kernel_size=4, stride=2),
nn.ReLU(),
nn.Conv2d(in_channels=64, out_channels=64, kernel_size=3, stride=1),
nn.ReLU(),
nn.Flatten(),
nn.Linear(3136, 512),
nn.ReLU(),
nn.Linear(512, output_dim),
)
self.target = copy.deepcopy(self.online)
# Q_target parameters are frozen.
for p in self.target.parameters():
p.requires_grad = False
def forward(self, input, model):
if model == "online":
return self.online(input)
elif model == "target":
return self.target(input)
TD Estimate & TD ターゲット
学習には 2 つの値が伴います :
TD Estimate - 与えられた状態 $s$ のための予測された最適な \(Q^*\)
\[{TD}_e = Q_{online}^*(s,a)\]
TD ターゲット - 現在の報酬と次の状態 \(s'\) の推定された \(Q^*\) の集合
\[a' = argmax_{a} Q_{online}(s', a) \\
{TD}_t = r + \gamma Q_{target}^*(s',a')\]
次のアクション \(a'\) が何になるかを私達は知りませんので、次の状態 \(s'\) で \(Q_{online}\) を最大化するアクション \(a'\) を使用します。
ここでは勾配計算を無効にするために td_target() 上で @torch.no_grad() デコレータを使用していることに注意してください (何故ならば \(\theta_{target}\) 上で逆伝播する必要がないからです)。
class Mario(Mario):
def __init__(self, state_dim, action_dim, save_dir):
super().__init__(state_dim, action_dim, save_dir)
self.gamma = 0.9
def td_estimate(self, state, action):
current_Q = self.net(state, model="online")[
np.arange(0, self.batch_size), action
] # Q_online(s,a)
return current_Q
@torch.no_grad()
def td_target(self, reward, next_state, done):
next_state_Q = self.net(next_state, model="online")
best_action = torch.argmax(next_state_Q, axis=1)
next_Q = self.net(next_state, model="target")[
np.arange(0, self.batch_size), best_action
]
return (reward + (1 - done.float()) * self.gamma * next_Q).float()
モデルを更新する
Marioが再生バッファから入力をサンプリングするとき、\(TD_t\) と \(TD_e\) を計算してパラメータ \(\theta_{online}\) を更新するために損失 \(Q_{online}\) を逆伝播します (\(\alpha\) は optimizer に渡される学習率 lr です)。
\[\theta_{online} \leftarrow \theta_{online} + \alpha \nabla(TD_e - TD_t)\]
\(\theta_{target}\) は逆伝播を通して更新されません。代わりに、\(\theta_{online}\) を \(\theta_{target}\) に定期的にコピーします
\[\theta_{target} \leftarrow \theta_{online}\]
class Mario(Mario):
def __init__(self, state_dim, action_dim, save_dir):
super().__init__(state_dim, action_dim, save_dir)
self.optimizer = torch.optim.Adam(self.net.parameters(), lr=0.00025)
self.loss_fn = torch.nn.SmoothL1Loss()
def update_Q_online(self, td_estimate, td_target):
loss = self.loss_fn(td_estimate, td_target)
self.optimizer.zero_grad()
loss.backward()
self.optimizer.step()
return loss.item()
def sync_Q_target(self):
self.net.target.load_state_dict(self.net.online.state_dict())
チェックポイントをセーブする
class Mario(Mario):
def save(self):
save_path = (
self.save_dir / f"mario_net_{int(self.curr_step // self.save_every)}.chkpt"
)
torch.save(
dict(model=self.net.state_dict(), exploration_rate=self.exploration_rate),
save_path,
)
print(f"MarioNet saved to {save_path} at step {self.curr_step}")
総てをまとめます
class Mario(Mario):
def __init__(self, state_dim, action_dim, save_dir):
super().__init__(state_dim, action_dim, save_dir)
self.burnin = 1e4 # min. experiences before training
self.learn_every = 3 # no. of experiences between updates to Q_online
self.sync_every = 1e4 # no. of experiences between Q_target & Q_online sync
def learn(self):
if self.curr_step % self.sync_every == 0:
self.sync_Q_target()
if self.curr_step % self.save_every == 0:
self.save()
if self.curr_step < self.burnin:
return None, None
if self.curr_step % self.learn_every != 0:
return None, None
# Sample from memory
state, next_state, action, reward, done = self.recall()
# Get TD Estimate
td_est = self.td_estimate(state, action)
# Get TD Target
td_tgt = self.td_target(reward, next_state, done)
# Backpropagate loss through Q_online
loss = self.update_Q_online(td_est, td_tgt)
return (td_est.mean().item(), loss)
ロギング
import numpy as np
import time, datetime
import matplotlib.pyplot as plt
class MetricLogger:
def __init__(self, save_dir):
self.save_log = save_dir / "log"
with open(self.save_log, "w") as f:
f.write(
f"{'Episode':>8}{'Step':>8}{'Epsilon':>10}{'MeanReward':>15}"
f"{'MeanLength':>15}{'MeanLoss':>15}{'MeanQValue':>15}"
f"{'TimeDelta':>15}{'Time':>20}\n"
)
self.ep_rewards_plot = save_dir / "reward_plot.jpg"
self.ep_lengths_plot = save_dir / "length_plot.jpg"
self.ep_avg_losses_plot = save_dir / "loss_plot.jpg"
self.ep_avg_qs_plot = save_dir / "q_plot.jpg"
# History metrics
self.ep_rewards = []
self.ep_lengths = []
self.ep_avg_losses = []
self.ep_avg_qs = []
# Moving averages, added for every call to record()
self.moving_avg_ep_rewards = []
self.moving_avg_ep_lengths = []
self.moving_avg_ep_avg_losses = []
self.moving_avg_ep_avg_qs = []
# Current episode metric
self.init_episode()
# Timing
self.record_time = time.time()
def log_step(self, reward, loss, q):
self.curr_ep_reward += reward
self.curr_ep_length += 1
if loss:
self.curr_ep_loss += loss
self.curr_ep_q += q
self.curr_ep_loss_length += 1
def log_episode(self):
"Mark end of episode"
self.ep_rewards.append(self.curr_ep_reward)
self.ep_lengths.append(self.curr_ep_length)
if self.curr_ep_loss_length == 0:
ep_avg_loss = 0
ep_avg_q = 0
else:
ep_avg_loss = np.round(self.curr_ep_loss / self.curr_ep_loss_length, 5)
ep_avg_q = np.round(self.curr_ep_q / self.curr_ep_loss_length, 5)
self.ep_avg_losses.append(ep_avg_loss)
self.ep_avg_qs.append(ep_avg_q)
self.init_episode()
def init_episode(self):
self.curr_ep_reward = 0.0
self.curr_ep_length = 0
self.curr_ep_loss = 0.0
self.curr_ep_q = 0.0
self.curr_ep_loss_length = 0
def record(self, episode, epsilon, step):
mean_ep_reward = np.round(np.mean(self.ep_rewards[-100:]), 3)
mean_ep_length = np.round(np.mean(self.ep_lengths[-100:]), 3)
mean_ep_loss = np.round(np.mean(self.ep_avg_losses[-100:]), 3)
mean_ep_q = np.round(np.mean(self.ep_avg_qs[-100:]), 3)
self.moving_avg_ep_rewards.append(mean_ep_reward)
self.moving_avg_ep_lengths.append(mean_ep_length)
self.moving_avg_ep_avg_losses.append(mean_ep_loss)
self.moving_avg_ep_avg_qs.append(mean_ep_q)
last_record_time = self.record_time
self.record_time = time.time()
time_since_last_record = np.round(self.record_time - last_record_time, 3)
print(
f"Episode {episode} - "
f"Step {step} - "
f"Epsilon {epsilon} - "
f"Mean Reward {mean_ep_reward} - "
f"Mean Length {mean_ep_length} - "
f"Mean Loss {mean_ep_loss} - "
f"Mean Q Value {mean_ep_q} - "
f"Time Delta {time_since_last_record} - "
f"Time {datetime.datetime.now().strftime('%Y-%m-%dT%H:%M:%S')}"
)
with open(self.save_log, "a") as f:
f.write(
f"{episode:8d}{step:8d}{epsilon:10.3f}"
f"{mean_ep_reward:15.3f}{mean_ep_length:15.3f}{mean_ep_loss:15.3f}{mean_ep_q:15.3f}"
f"{time_since_last_record:15.3f}"
f"{datetime.datetime.now().strftime('%Y-%m-%dT%H:%M:%S'):>20}\n"
)
for metric in ["ep_rewards", "ep_lengths", "ep_avg_losses", "ep_avg_qs"]:
plt.plot(getattr(self, f"moving_avg_{metric}"))
plt.savefig(getattr(self, f"{metric}_plot"))
plt.clf()
Let’s play!
このサンプルでは 10 エピソードの間訓練ループを実行しますが、Mario が彼の世界の方法を真に学習するためには、少なくとも 40,000 エピソードの間ループを実行することを提案します!
use_cuda = torch.cuda.is_available()
print(f"Using CUDA: {use_cuda}")
print()
save_dir = Path("checkpoints") / datetime.datetime.now().strftime("%Y-%m-%dT%H-%M-%S")
save_dir.mkdir(parents=True)
mario = Mario(state_dim=(4, 84, 84), action_dim=env.action_space.n, save_dir=save_dir)
logger = MetricLogger(save_dir)
episodes = 10
for e in range(episodes):
state = env.reset()
# Play the game!
while True:
# Run agent on the state
action = mario.act(state)
# Agent performs action
next_state, reward, done, info = env.step(action)
# Remember
mario.cache(state, next_state, action, reward, done)
# Learn
q, loss = mario.learn()
# Logging
logger.log_step(reward, loss, q)
# Update state
state = next_state
# Check if end of game
if done or info["flag_get"]:
break
logger.log_episode()
if e % 20 == 0:
logger.record(episode=e, epsilon=mario.exploration_rate, step=mario.curr_step)
以上