PyTorch 1.3 Tutorials : 強化学習 : 強化学習 (DQN) チュートリアル (翻訳/解説)
翻訳 : (株)クラスキャット セールスインフォメーション
作成日時 : 01/14/2020 (1.3.1)
* 本ページは、PyTorch 1.3 Tutorials の以下のページを翻訳した上で適宜、補足説明したものです:
- Reinforcement Learning : Reinforcement Learning (DQN) Tutorial
* サンプルコードの動作確認はしておりますが、必要な場合には適宜、追加改変しています。
* ご自由にリンクを張って頂いてかまいませんが、sales-info@classcat.com までご一報いただけると嬉しいです。
強化学習 : 強化学習 (DQN) チュートリアル
このチュートリアルは OpenAI Gym からの CartPole-v0 タスク上で深層 Q 学習 (DQN) エージェントを訓練するために PyTorch をどのように使用するかを示します。
タスク
エージェントは 2 つのアクションの間で決定しなければなりません – カートを左か右へ動かしていきます – その結果それに取り付けられたポールが直立したままになるようにです。Gym web サイト で様々なアルゴリズムと可視化による公式リーダーボードを見つけることができます。
カートポール
エージェントが環境の現在の状態を観察してアクションを選択するとき、環境は新しい状態に移行し、そしてまたアクションの結果を示す報酬を返します。このタスクでは、総ての増分の時間ステップに対して報酬は +1 でそしてポールが極端に倒れるかカートが中心から離れて 2.4 ユニット以上移動すれば環境は停止します。これはシナリオのより良い遂行はより長い時間動作し、より大きなリターンを累積することを意味します。
CartPole タスクはエージェントへの入力が、環境状態 (位置, 速度, etc.) を表している 4 実数値であるように設計されています。けれども、ニューラルネットワークは単にシーン (= scene) を見ることによりタスクを解くことができますので、入力としてカートを中心とするスクリーンのパッチを使用します。このため、私達の結果は公式リーダーボードからのものとは直接には比較できません – 私達のタスクは遥かにより困難です。あいにくこれは訓練の速度を落とさせます、何故ならば総てのフレームをレンダリングしなければならないからです。
厳密に言えば、状態を現在のスクリーン・パッチと前のものとの間の違いとして提示します。これはエージェントに一つの画像からポールの速度を考慮に入れることを可能にします。
パッケージ
最初に、必要なパッケージをインポートしましょう。
第一に、環境のための gym が必要です (pip install gym を使用してインストールします)。また PyTorch からの次も使用します :
- ニューラルネットワーク (torch.nn)
- 最適化 (torch.optim)
- 自動微分 (torch.autograd)
- vision タスクのためのユティリティ (torchvision – 別パッケージ)
import gym import math import random import numpy as np import matplotlib import matplotlib.pyplot as plt from collections import namedtuple from itertools import count from PIL import Image import torch import torch.nn as nn import torch.optim as optim import torch.nn.functional as F import torchvision.transforms as T env = gym.make('CartPole-v0').unwrapped # set up matplotlib is_ipython = 'inline' in matplotlib.get_backend() if is_ipython: from IPython import display plt.ion() # if gpu is to be used device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
再生メモリ
私達の DQN を訓練するために経験 (= experience) 再生メモリを使用します。それはエージェントが観測する遷移をストアし、このデータを後で再利用することを可能にします。それからランダムにサンプリングすることにより、バッチを構築する遷移は非相関的 (= decorrelated) です。これは DQN 訓練手続きを大きく安定させて改良することが示されています。
このために、2 つのクラスが必要となります :
- Transition – 環境内の単一の遷移を表わす名前付きタプル。それは後で記述されるようにスクリーン差異画像である状態を伴い、本質的には (state, action) ペアをそれらの (next_state, reward) 結果にマップします。
- ReplayMemory – 最近観測された遷移を保持する有界 (= bounded) サイズの循環バッファ。それはまた訓練のための遷移のランダムバッチの選択のための .sample() メソッドも実装します。
Transition = namedtuple('Transition', ('state', 'action', 'next_state', 'reward')) class ReplayMemory(object): def __init__(self, capacity): self.capacity = capacity self.memory = [] self.position = 0 def push(self, *args): """Saves a transition.""" if len(self.memory) < self.capacity: self.memory.append(None) self.memory[self.position] = Transition(*args) self.position = (self.position + 1) % self.capacity def sample(self, batch_size): return random.sample(self.memory, batch_size) def __len__(self): return len(self.memory)
さて、私達のモデルを定義しましょう。しかし最初に、DQN が何であるかを素早くまとめておきましょう。
DQN アルゴリズム
私達の環境は決定論的ですので、ここで提示される等式もまた単純化のために決定論的に定式化されます。強化学習の文献では、それらはまた環境内の確率的遷移 (= stochastic transitions) に渡る期待値も含みます。
私達の目的は割引され (= discounted)、累積された報酬 $R_{t_0} = \sum_{t=t_0}^{\infty} \gamma^{t - t_0} r_t$ を最大化しようとするポリシーを訓練することです、ここで $R_{t_0}$ はリターン (= return) としても知られています。割引 (率), $\gamma$, は 0 と 1 の間の定数で総計が収束することを確かにします。それははっきりしない遠い未来からの報酬を十分に信頼できる近い未来のものよりもエージェントのための重要性を少ないものにします。
Q-学習の裏にある主要な考え方は、リターンが何になるかを私達に教えることができるような関数 $Q^*: State \times Action \rightarrow \mathbb{R}$ を持つのであれば、与えられた状態内でアクションを取る場合に、報酬を最大化するポリシーを簡単に構築できるということです :
\[
\pi^*(s) = \arg\!\max_a \ Q^*(s, a)
\]
けれども、私達は世界の総てについては知りませんので、$Q^*$ へのアクセスは持ちません。しかし、ニューラルネットワークはユニバーサル関数近似器ですから、単純に一つ作成してそれを $Q^*$ に似るように訓練することはできます。
訓練の更新ルールのために、あるポリシーのための総ての Q 関数はベルマン方程式に従うという事実を利用します :
\[
Q^{\pi}(s, a) = r + \gamma Q^{\pi}(s', \pi(s'))
\]
等式の両側の間の差は TD 誤差 (temporal difference error) $\delta$ として知られています :
\[
\delta = Q(s, a) - (r + \gamma \max_a Q(s', a))
\]
この誤差を最小化するために、Huber 損失 を使用します。Huber 損失は誤差が小さいときは平均二乗誤差 (mean squared error) のように振る舞いますが、誤差が大きいときには平均絶対誤差 (mean absolute error) のように振る舞います - $Q$ の推定 (= estimates) が非常にノイズが多いときにこれは外れ値に対してそれをより堅固にします。再生メモリからサンプリングされた遷移のバッチ, $B$ に渡りこれを計算します :
\[
\mathcal{L} = \frac{1}{|B|}\sum_{(s, a, s', r) \ \in \ B} \mathcal{L}(\delta) \\
\begin{split}\text{where} \quad \mathcal{L}(\delta) = \begin{cases}
\frac{1}{2}{\delta^2} & \text{for } |\delta| \le 1, \\
|\delta| - \frac{1}{2} & \text{otherwise.}
\end{cases}\end{split}
\]
Q-ネットワーク
私達のモデルは現在と前のスクリーンのパッチの間の差異を取る畳み込みニューラルネットワークです。それは 2 つの出力を持ち、$Q(s, \mathrm{left})$ と $Q(s, \mathrm{right})$ を表します (ここで $s$ はネットワークへの入力です)。要するに、ネットワークは現在の入力が与えられた場合に各アクションを取ることの期待されるリターンを予測しようとします。
class DQN(nn.Module): def __init__(self, h, w, outputs): super(DQN, self).__init__() self.conv1 = nn.Conv2d(3, 16, kernel_size=5, stride=2) self.bn1 = nn.BatchNorm2d(16) self.conv2 = nn.Conv2d(16, 32, kernel_size=5, stride=2) self.bn2 = nn.BatchNorm2d(32) self.conv3 = nn.Conv2d(32, 32, kernel_size=5, stride=2) self.bn3 = nn.BatchNorm2d(32) # Number of Linear input connections depends on output of conv2d layers # and therefore the input image size, so compute it. def conv2d_size_out(size, kernel_size = 5, stride = 2): return (size - (kernel_size - 1) - 1) // stride + 1 convw = conv2d_size_out(conv2d_size_out(conv2d_size_out(w))) convh = conv2d_size_out(conv2d_size_out(conv2d_size_out(h))) linear_input_size = convw * convh * 32 self.head = nn.Linear(linear_input_size, outputs) # Called with either one element to determine next action, or a batch # during optimization. Returns tensor([[left0exp,right0exp]...]). def forward(self, x): x = F.relu(self.bn1(self.conv1(x))) x = F.relu(self.bn2(self.conv2(x))) x = F.relu(self.bn3(self.conv3(x))) return self.head(x.view(x.size(0), -1))
入力抽出
下のコードは環境からレンダリングされた画像を抽出して処理するためのユティリティです。それは torchvision パッケージを使用します、これは画像変換を構成することを容易にします。ひとたびセルを実行すればそれはそれが抽出したサンプル・パッチを表示するでしょう。
resize = T.Compose([T.ToPILImage(), T.Resize(40, interpolation=Image.CUBIC), T.ToTensor()]) def get_cart_location(screen_width): world_width = env.x_threshold * 2 scale = screen_width / world_width return int(env.state[0] * scale + screen_width / 2.0) # MIDDLE OF CART def get_screen(): # Returned screen requested by gym is 400x600x3, but is sometimes larger # such as 800x1200x3. Transpose it into torch order (CHW). screen = env.render(mode='rgb_array').transpose((2, 0, 1)) # Cart is in the lower half, so strip off the top and bottom of the screen _, screen_height, screen_width = screen.shape screen = screen[:, int(screen_height*0.4):int(screen_height * 0.8)] view_width = int(screen_width * 0.6) cart_location = get_cart_location(screen_width) if cart_location < view_width // 2: slice_range = slice(view_width) elif cart_location > (screen_width - view_width // 2): slice_range = slice(-view_width, None) else: slice_range = slice(cart_location - view_width // 2, cart_location + view_width // 2) # Strip off the edges, so that we have a square image centered on a cart screen = screen[:, :, slice_range] # Convert to float, rescale, convert to torch tensor # (this doesn't require a copy) screen = np.ascontiguousarray(screen, dtype=np.float32) / 255 screen = torch.from_numpy(screen) # Resize, and add a batch dimension (BCHW) return resize(screen).unsqueeze(0).to(device) env.reset() plt.figure() plt.imshow(get_screen().cpu().squeeze(0).permute(1, 2, 0).numpy(), interpolation='none') plt.title('Example extracted screen') plt.show()
訓練する
ハイパーパラメータとユティリティ
このセルは私達のモデルとその optimizer をインスタンス化して、幾つかのユティリティを定義します :
- select_action - は epsilon greedy ポリシーに従ってアクションを選択します。簡単に言えば、アクションを選択するために時にモデルを使用して、時に一つを一様にサンプリングします。ランダム・アクションを選択する確率は EPS_START で始まって EPS_END に向けて指数関数的に減衰します。EPS_DECAY は減衰率を制御します。
- plot_durations - 最後の 100 エピソードに渡る平均 (公式評価で使用される尺度) と一緒にエピソードの存続期間をプロットするためのヘルパーです。プロットはメインの訓練ループを含むセルの下部にあり、総てのエピソード後に更新します。
BATCH_SIZE = 128 GAMMA = 0.999 EPS_START = 0.9 EPS_END = 0.05 EPS_DECAY = 200 TARGET_UPDATE = 10 # Get screen size so that we can initialize layers correctly based on shape # returned from AI gym. Typical dimensions at this point are close to 3x40x90 # which is the result of a clamped and down-scaled render buffer in get_screen() init_screen = get_screen() _, _, screen_height, screen_width = init_screen.shape # Get number of actions from gym action space n_actions = env.action_space.n policy_net = DQN(screen_height, screen_width, n_actions).to(device) target_net = DQN(screen_height, screen_width, n_actions).to(device) target_net.load_state_dict(policy_net.state_dict()) target_net.eval() optimizer = optim.RMSprop(policy_net.parameters()) memory = ReplayMemory(10000) steps_done = 0 def select_action(state): global steps_done sample = random.random() eps_threshold = EPS_END + (EPS_START - EPS_END) * \ math.exp(-1. * steps_done / EPS_DECAY) steps_done += 1 if sample > eps_threshold: with torch.no_grad(): # t.max(1) will return largest column value of each row. # second column on max result is index of where max element was # found, so we pick action with the larger expected reward. return policy_net(state).max(1)[1].view(1, 1) else: return torch.tensor([[random.randrange(n_actions)]], device=device, dtype=torch.long) episode_durations = [] def plot_durations(): plt.figure(2) plt.clf() durations_t = torch.tensor(episode_durations, dtype=torch.float) plt.title('Training...') plt.xlabel('Episode') plt.ylabel('Duration') plt.plot(durations_t.numpy()) # Take 100 episode averages and plot them too if len(durations_t) >= 100: means = durations_t.unfold(0, 100, 1).mean(1).view(-1) means = torch.cat((torch.zeros(99), means)) plt.plot(means.numpy()) plt.pause(0.001) # pause a bit so that plots are updated if is_ipython: display.clear_output(wait=True) display.display(plt.gcf())
訓練ループ
最後に、モデルを訓練するためのコードです。
ここで、貴方は最適化の単一ステップを遂行する optimize_model 関数を見つけることができます。それは最初にバッチをサンプリングし、総ての tensor を単一の一つに連結して、$Q(s_t, a_t)$ と $V(s_{t+1}) = \max_a Q(s_{t+1}, a)$ を計算し、そしてそれらを損失に結合します。定義から $s$ が終端状態の場合には $V(s) = 0$ を設定します。追加された安定性のための $V(s_{t+1})$ を計算するためにターゲットネットワークもまた使用します。ターゲット・ネットワークはその重みを殆どの時間凍結し続けますが、ポリシー・ネットワークの重みで時々更新されます。これは通常はステップのセット数ですが、単純化のためにエピソードを使用しましょう。
def optimize_model(): if len(memory) < BATCH_SIZE: return transitions = memory.sample(BATCH_SIZE) # Transpose the batch (see https://stackoverflow.com/a/19343/3343043 for # detailed explanation). This converts batch-array of Transitions # to Transition of batch-arrays. batch = Transition(*zip(*transitions)) # Compute a mask of non-final states and concatenate the batch elements # (a final state would've been the one after which simulation ended) non_final_mask = torch.tensor(tuple(map(lambda s: s is not None, batch.next_state)), device=device, dtype=torch.uint8) non_final_next_states = torch.cat([s for s in batch.next_state if s is not None]) state_batch = torch.cat(batch.state) action_batch = torch.cat(batch.action) reward_batch = torch.cat(batch.reward) # Compute Q(s_t, a) - the model computes Q(s_t), then we select the # columns of actions taken. These are the actions which would've been taken # for each batch state according to policy_net state_action_values = policy_net(state_batch).gather(1, action_batch) # Compute V(s_{t+1}) for all next states. # Expected values of actions for non_final_next_states are computed based # on the "older" target_net; selecting their best reward with max(1)[0]. # This is merged based on the mask, such that we'll have either the expected # state value or 0 in case the state was final. next_state_values = torch.zeros(BATCH_SIZE, device=device) next_state_values[non_final_mask] = target_net(non_final_next_states).max(1)[0].detach() # Compute the expected Q values expected_state_action_values = (next_state_values * GAMMA) + reward_batch # Compute Huber loss loss = F.smooth_l1_loss(state_action_values, expected_state_action_values.unsqueeze(1)) # Optimize the model optimizer.zero_grad() loss.backward() for param in policy_net.parameters(): param.grad.data.clamp_(-1, 1) optimizer.step()
下で、メイン訓練ループを見つけられます。はじめに環境をリセットして状態 Tensor を初期化します。それから、アクションをサンプリングし、それを実行して、次のスクリーンと報酬 (常に 1) を観測し、そしてモデルを一度最適化します。エピソードが終わるとき (モデルが失敗するとき)、ループを再スタートします。
下で、num_episodes は小さく設定されています。ノートブックをダウンロードして、意味のある存続時間の改良のためには 300+ のような、更に多くのエピソードを実行してください。
num_episodes = 50 for i_episode in range(num_episodes): # Initialize the environment and state env.reset() last_screen = get_screen() current_screen = get_screen() state = current_screen - last_screen for t in count(): # Select and perform an action action = select_action(state) _, reward, done, _ = env.step(action.item()) reward = torch.tensor([reward], device=device) # Observe new state last_screen = current_screen current_screen = get_screen() if not done: next_state = current_screen - last_screen else: next_state = None # Store the transition in memory memory.push(state, action, next_state, reward) # Move to the next state state = next_state # Perform one step of the optimization (on the target network) optimize_model() if done: episode_durations.append(t + 1) plot_durations() break # Update the target network, copying all weights and biases in DQN if i_episode % TARGET_UPDATE == 0: target_net.load_state_dict(policy_net.state_dict()) print('Complete') env.render() env.close() plt.ioff() plt.show()
全体の結果としてのデータフローを示す図がここにあります。
ランダムあるいはポリシーに基づいてアクションが選択され、gym 環境から次のステップのサンプルを得ます。結果を再生メモリに記録して総ての反復で最適化ステップも実行します。最適化は新しいポリシーの訓練を行なうために再生メモリからランダムバッチを選びます。「より古い」target_net はまた期待される Q 値を計算するために最適化で使用されます ; それはそれを現在に保つために時々更新されます。
以上