PyTorch : Tutorial 中級 : PyTorch で分散アプリケーションを書く

PyTorch : Tutorial 中級 : PyTorch で分散アプリケーションを書く (翻訳/解説)
翻訳 : (株)クラスキャット セールスインフォメーション
作成日時 : 05/16/2018 (0.4.0)

* 本ページは、PyTorch Intermidiate Tutorials の – Writing Distributed Applications with PyTorch を
動作確認・翻訳した上で適宜、補足説明したものです:

* サンプルコードの動作確認はしておりますが、適宜、追加改変している場合もあります。
* ご自由にリンクを張って頂いてかまいませんが、sales-info@classcat.com までご一報いただけると嬉しいです。

 

本文

この短いチュートリアルで、PyTorch の分散パッケージを説明していきます。分散設定をどのようにセットアップするか、異なる通信ストラテジーをどのように使用するかを見て、そしてパッケージの内部を説明します。

 

セットアップ

PyTorch に含まれる分散パッケージ (i.e., torch.distributed) は研究者と実践者 (= practitioners) に彼らの計算をマシンのプロセスとクラスタに渡り容易に並列化することを可能にします。それを行なうために、それは各プロセスにデータを任意の他のプロセスに通信することを可能にするメッセージ・パッシング・セマンティクスを利用します。マルチプロセッシング (torch.multiprocessing)・パッケージとは対照的に、プロセスは異なる通信バックエンドが利用可能で同じマシン上で実行されることに制限されません。

始めるためには複数プロセスを同時に実行するアビリティが必要です。貴方がコンピュータ・クラスタへのアクセスを持つ場合ローカル sysadmin に相談するか好みのコーディネーション・ツールを使用するべきです (e.g., pdsh, clustershell, その他)。このチュートリアルのためには、単一マシンを使用して次のテンプレートを使用して複数のプロセスを fork します。

"""run.py:"""
#!/usr/bin/env python
import os
import torch
import torch.distributed as dist
from torch.multiprocessing import Process

def run(rank, size):
    """ Distributed function to be implemented later. """
    pass

def init_processes(rank, size, fn, backend='tcp'):
    """ Initialize the distributed environment. """
    os.environ['MASTER_ADDR'] = '127.0.0.1'
    os.environ['MASTER_PORT'] = '29500'
    dist.init_process_group(backend, rank=rank, world_size=size)
    fn(rank, size)


if __name__ == "__main__":
    size = 2
    processes = []
    for rank in range(size):
        p = Process(target=init_processes, args=(rank, size, run))
        p.start()
        processes.append(p)

    for p in processes:
        p.join()

上のスクリプトは 2 つのプロセスを spawn します、これはそれぞれ分散環境をセットアップして、プロセスグループを初期化して (dist.init_process_group)、そして最後に与えられた run 関数を実行します。

init_processes 関数を見てみましょう。それは同じ ip アドレスとポートを使用して、総てのプロセスがマスターを通してコーディネートできるでしょう。TCP バックエンドを使用しましたが、代わりに MPIGloo を使用することもできました (c.f. セクション 5.1)。このチュートリアルの最後に dist.init_process_group で起きるマジックを説明しますが、本質的にはそれはプロセスにそれらの位置を共有することにより互いに通信することを可能にします。

 

Point-to-Point 通信

1 つのプロセスから他の 1 つへのデータの転送は point-to-point 通信と呼ばれます。これらは send と recv 関数あるいはそれらの即時のカウンターパート isend と irecv を通して達成されます。

"""Blocking point-to-point communication."""

def run(rank, size):
    tensor = torch.zeros(1)
    if rank == 0:
        tensor += 1
        # Send the tensor to process 1
        dist.send(tensor=tensor, dst=1)
    else:
        # Receive tensor from process 0
        dist.recv(tensor=tensor, src=0)
    print('Rank ', rank, ' has data ', tensor[0])

上のサンプルでは、両者のプロセスがゼロ tensor で開始され、それからプロセス 0 が tensor をインクリメントしてそれをプロセス 1 に送り結果としてそれら両者は 1.0 で終わります。プロセス 1 はそれが受け取るデータをストアするためにメモリを割り当てる必要があることに注意してください。

また send/recv は ブロッキング であることにも注意してください : 両者のプロセスは通信が完了するまで停止します。一方で immediates は ノンブロッキングです : スクリプトはその実行を続行してメソッドは DistributedRequest オブジェクトを返し、それの上で wait() するかを選択可能です。

"""Non-blocking point-to-point communication."""

def run(rank, size):
    tensor = torch.zeros(1)
    req = None
    if rank == 0:
        tensor += 1
        # Send the tensor to process 1
        req = dist.isend(tensor=tensor, dst=1)
        print('Rank 0 started sending')
    else:
        # Receive tensor from process 0
        req = dist.irecv(tensor=tensor, src=0)
        print('Rank 1 started receiving')
    req.wait()
    print('Rank ', rank, ' has data ', tensor[0])

immediates を使用するとき送られそして受け取った tensor の使用に注意しなければなりません。私達はデータが他のプロセスにいつ通信されるかを知りませんので、req.wait() が完了する前に送られた tensor を変更したり受け取った tensor にアクセスしたりするべきではありません。換言すれば :

  • dist.isend() 後に tensor に書くことは未定義な挙動の結果になります。
  • dist.irecv() 後に tensor から読むことは未定義な挙動の結果になります。

けれども、req.wait() が実行された後では、通信が発生して tensor[0] にストアされた値が 1.0 であることが保証されます。

Point-to-point 通信はプロセスの通信に渡るきめ細かい制御を望むとき有用です。それらは Baidu の DeepSpeechFacebook の巨大スケール実験 で使用されているもののように、上等なアルゴリズムを実装するために利用できます (c.f. Section 4.1)。

 

Collective 通信

Scatter Gather
Reduce All-Reduce
Broadcast All-Gather

point-to-point 通信とは対照的に、collectives はグループの総てのプロセスに渡る通信パターンを許容します。グループは総てのプロセスのサブセットです。グループを作成するために、rank のリストを dist.new_group(group) に渡すことができます。デフォルトでは、collectives は総てのプロセス上、world としてもまた知られています、で実行されます。例えば、総てのプロセスの総ての tensor の総計を得るために、dist.all_reduce(tensor, op, group) collective を使用できます。

""" All-Reduce example."""
def run(rank, size):
    """ Simple point-to-point communication. """
    group = dist.new_group([0, 1])
    tensor = torch.ones(1)
    dist.all_reduce(tensor, op=dist.reduce_op.SUM, group=group)
    print('Rank ', rank, ' has data ', tensor[0])

グループの総ての tensor の総計 (= sum) を望みますので、reduce 演算子として dist.reduce_op.SUM を使用します。一般的に言えば、任意の可換な (= commutative) 数学演算は演算子として使用できます。創造的に、PyTorch は 4 つのそのような演算子を伴います、総て element-wise レベルで動作します :

  • dist.reduce_op.SUM,
  • dist.reduce_op.PRODUCT,
  • dist.reduce_op.MAX,
  • dist.reduce_op.MIN.

dist.all_reduce(tensor, op, group) に加えて、PyTorch で現在実装されている総計 6 つの collectives があります。

  • dist.broadcast(tensor, src, group): src からの tensor を総ての他のプロセスにコピーします。
  • dist.reduce(tensor, dst, op, group): op を総ての tensor に適用して結果を dst にストアします。
  • dist.all_reduce(tensor, op, group): reduce と同じですが、結果は総てのプロセスにストアされます。
  • dist.scatter(tensor, src, scatter_list, group): $i^{\text{th}}$ tensor scatter_list[i] を $i^{\text{th}}$ プロセスにコピーします。
  • dist.gather(tensor, dst, gather_list, group): 総てのプロセスからの tensor を dst にコピーします。
  • dist.all_gather(tensor_list, tensor, group): 総てのプロセスからの tensor を総てのプロセス上の tensor_list にコピーします。

 

分散訓練

Note: このセクションのサンプル・スクリプトは この GitHub レポジトリ で見つけることができます。

分散モジュールがどのように動作するかを理解した今、それで何か有用なものを書いてみましょう。私達の目標は DistributedDataParallel の機能のレプリカを作成することです。もちろん、これは教育的なサンプルで実際の世界の状況では公式の、良くテストされて良く最適化された (上でリンクされた) バージョンを利用するべきです。

非常に単純に確率的勾配降下の分散バージョンを実装することを望みます。私達のスクリプトは総てのプロセスにデータのバッチ上のモデルの勾配を計算させてそれからそれらの勾配を平均します。同様の収束結果を確かなものにするためにプロセスの数を変更するとき、最初にデータセットを分割しなければならないでしょう (下のスニペットの代わりに、tnt.dataset.SplitDataset を使用することもまたできます)。

""" Dataset partitioning helper """
class Partition(object):

    def __init__(self, data, index):
        self.data = data
        self.index = index

    def __len__(self):
        return len(self.index)

    def __getitem__(self, index):
        data_idx = self.index[index]
        return self.data[data_idx]


class DataPartitioner(object):

    def __init__(self, data, sizes=[0.7, 0.2, 0.1], seed=1234):
        self.data = data
        self.partitions = []
        rng = Random()
        rng.seed(seed)
        data_len = len(data)
        indexes = [x for x in range(0, data_len)]
        rng.shuffle(indexes)

        for frac in sizes:
            part_len = int(frac * data_len)
            self.partitions.append(indexes[0:part_len])
            indexes = indexes[part_len:]

    def use(self, partition):
        return Partition(self.data, self.partitions[partition])

上のスニペットで、今では次の数行を使用して任意のデータセットを単純に分割することができます :

""" Partitioning MNIST """
def partition_dataset():
    dataset = datasets.MNIST('./data', train=True, download=True,
                             transform=transforms.Compose([
                                 transforms.ToTensor(),
                                 transforms.Normalize((0.1307,), (0.3081,))
                             ]))
    size = dist.get_world_size()
    bsz = 128 / float(size)
    partition_sizes = [1.0 / size for _ in range(size)]
    partition = DataPartitioner(dataset, partition_sizes)
    partition = partition.use(dist.get_rank())
    train_set = torch.utils.data.DataLoader(partition,
                                         batch_size=bsz,
                                         shuffle=True)
    return train_set, bsz

2 レプリカを持つと仮定すると、各プロセスは 60000 / 2 = 30000 サンプルの訓練セットを持ちます。全体的なバッチサイズ 128 を維持するためにレプリカの数でバッチサイズを割ることもまたできます。

今では私達は通常の forward-backward-optimize 訓練コードを書いてモデルの勾配を平均する関数呼び出しを追加することができます (次は公式 PyTorch MNIST サンプル から大きくインスパイアされています。)。

""" Distributed Synchronous SGD Example """
def run(rank, size):
    torch.manual_seed(1234)
    train_set, bsz = partition_dataset()
    model = Net()
    optimizer = optim.SGD(model.parameters(),
                          lr=0.01, momentum=0.5)

    num_batches = ceil(len(train_set.dataset) / float(bsz))
    for epoch in range(10):
        epoch_loss = 0.0
        for data, target in train_set:
            optimizer.zero_grad()
            output = model(data)
            loss = F.nll_loss(output, target)
            epoch_loss += loss.item()
            loss.backward()
            average_gradients(model)
            optimizer.step()
        print('Rank ', dist.get_rank(), ', epoch ',
              epoch, ': ', epoch_loss / num_batches)

average_gradients(model) 関数の実装が残っています、これは単純にモデルを取り、world 全体に渡り勾配を平均します。

""" Gradient averaging. """
def average_gradients(model):
    size = float(dist.get_world_size())
    for param in model.parameters():
        dist.all_reduce(param.grad.data, op=dist.reduce_op.SUM)
        param.grad.data /= size

Et voilà! 私達は分散同期 SGD を成功的に実装しました、そして巨大なコンピュータ・クラスタ上でどのようなモデルでも訓練できるでしょう。

Note: 最後のセンテンスは技術的には真実ですが、同期 SGD の製品レベルの実装を遂行するために必要な 更に多くのトリック があります。再度、テストされて最適化された ものを使用してください。

 

私達自身の Ring-Allreduce

追加の挑戦として、DeepSpeech の効率的な allreduce を実装することを望むことを想像してください。これは point-to-point collectives を使用して非常に簡単に実装されます。

""" Implementation of a ring-reduce with addition. """
def allreduce(send, recv):
    rank = dist.get_rank()
    size = dist.get_world_size()
    send_buff = th.zeros(send.size())
    recv_buff = th.zeros(send.size())
    accum = th.zeros(send.size())
    accum[:] = send[:]

    left = ((rank - 1) + size) % size
    right = (rank + 1) % size

    for i in range(size - 1):
        if i % 2 == 0:
            # Send send_buff
            send_req = dist.isend(send_buff, right)
            dist.recv(recv_buff, left)
            accum[:] += recv[:]
        else:
            # Send recv_buff
            send_req = dist.isend(recv_buff, right)
            dist.recv(send_buff, left)
            accum[:] += send[:]
        send_req.wait()
    recv[:] = accum[:]

上のスクリプトでは、allreduce(send, recv) 関数は PyTorch のものとは僅かに異なるシグネチャを持ちます。それは recv tensor を取りそして send tensor の総計をその中にストアします。リーダーに委ねられた課題として、私達のバージョンと DeepSpeech のものはまだ 1 つの違いがあります : 彼らの実装は勾配 tensor を chunks に分割します、通信大域を最適に利用するためにです (ヒント: torch.chunk)。

 

進んだトピック

torch.distributed のより進んだ機能の幾つかを見い出す準備が今できました。カバーすべき多くのものがあるので、このセクションは 2 つのサブセクションに分割されます :

  1. 通信バックエンド: そこでは GPU-GPU 通信のために MPI と Gloo をどのように使用するかを学習します。
  2. 初期化メソッド: そこでは dist.init_process_group() において初期コーディネーション段階を どのようにベストにセットアップするかを理解します。

 

通信バックエンド

torch.distributed の最も洗練された局面は抽象して異なるバックエンド上に構築するアビリティです。前に言及したように、現在 PyTorch で実装される 3 つのバックエンドがあります : TCP, MPI, と Gloo です。それらは望まれるユースケースに依拠して各々異なる仕様とトレードオフを持ちます。サポートされる関数の比較テーブルは ここ で見つかります。

TCP バックエンド

ここまで TCP バックエンドの広範囲に使用してきました。それは開発プラットフォームとしては非常に使いやすく、殆どのマシンとオペレーティングシステム上で動作することが保証されています。それはまた CPU 上で総ての point-to-point と collective 関数をサポートします。けれども GPU のためのサポートはなくその通信ルーチンは MPI のもののようには最適化されていません。

Gloo バックエンド

Gloo バックエンド は CPU と GPU の両者に対して collective 通信手続きの最適化された実装を提供します。それは特に GPU 上で輝きます、何故ならばそれはデータを CPU のメモリに転送することなしに GPUDirect を使用して通信を遂行できるからです。それはまた高速ノード内 (= intra-node) 通信を遂行してノード内ルーチンのための 自身のアルゴリズム を実装するために NCCL を使用することも可能です。

バージョン 0.2.0 から、Gloo バックエンドは PyTorch の事前コンパイルされたバイナリで自動的に含まれています。貴方が確実に気がついているように、分散 SGD サンプルはモデルを GPU 上に置いた場合には動作しません。

最初に init_processes(rank, size, fn, backend=’tcp’) で backend=’gloo’ に置き換えることによりそれを修正しましょう。この時点で、スクリプトは依然として CPU 上で動作しますが、その裏では Gloo バックエンドを使用しています。複数 GPU を使用するためには、次の変更もまた行ないましょう :

  1. init_processes(rank, size, fn, backend=’tcp’) → init_processes(rank, size, fn, backend=’gloo’)
  2. device = torch.device(“cuda:{}”.format(rank)) model = Net() → model = Net().to(device) を使用する
  3. data, target = data.to(device), target.to(device) を使用する

上の修正で、私達のモデルは今では 2 つの GPU 上で訓練されて nvidia-smi の監視でそれらの利用をモニタできます。

MPI バックエンド

Message Passing Interface (MPI) は HPC の領域から標準化されたツールです。それは point-to-point and collective 通信を行なうことを可能にして torch.distributed の API のための主要なインスピレーションでした。MPI の幾つかの実装が存在して (e.g. Open-MPI, MVAPICH2, Intel MPI) それぞれ異なる目的のために最適化されています。MPI バックエンドを使用する優位点は巨大なコンピュータ・クラスタ上の MPI の広範囲な利用可能性 – そして高位な最適化 – にあります。幾つかの最近の実装 はまた CPU を通したメモリ・コピーを回避するために CUDA IPC と GPU Direct 技術を活用できます。

残念なことに、PyTorch のバイナリは MPI 実装を含むことができませんのでそれを手動で再コンパイルしなければならないでしょう。幸い、このプロセスは非常に単純でそれが与えられたときコンパイル時に PyTorch はそれ自身で利用可能な MPI 実装を探します。次のステップは ソースから PyTorch をインストールすることによって MPI バックエンドをインストールします。

  1. Anaconda 環境を作成して活性化し、このガイド をフォローして前要件の総てをインストールしますが、python setup.py install はまだ実行しないでください。
  2. 貴方の好みの MPI 実装を選択してインストールしてください。CUDA-aware MPI を有効にすると幾つかの追加のステップを必要とするかもしれないことに注意してください。私達のケースでは、GPU サポートなしで Open-MPI にこだわります : conda install -c conda-forge openmpi
  3. 今、クローンされた PyTorch レポジトリに行って python setup.py install 実行してください。

新たにインストールされたバックエンドをテストするために、2, 3 の変更が必要です。

  1. if __name__ == ‘__main__’: の下の内容を init_processes(0, 0, run, backend=’mpi’) で置き換える。
  2. mpirun -n 4 python myscript.py を実行する。

これらの変更のための理由は MPI はプロセスを spawn する前に自身の環境を作成する必要があるからです。MPI はまた自身のプロセスも spawn して (後述の) 初期化メソッドで記述されるハンドシェイクを遂行し、init_process_group の rank と size 引数を不要なものにします。これは実際に非常にパワフルです、何故ならば各プロセスに計算リソースを適合させるために mpirun に追加引数を渡すことができるからです。(プロセス毎のコア数のようなもの、特定の rank にマシンを手動割り当て、そして それ以上。) そのように行えば、他の通信バックエンドと同様の知られた出力を得るはずです。

 

初期化メソッド

このチュートリアルを終えるにあたり、呼び出した本当に最初の関数について話しましょう : dist.init_process_group(backend, init_method) 。特に、異なる初期化メソッドを説明します、これは各プロセス間の初期コーディネーション・ステップの責任を負います。これらのメソッドはこのコーディネーションがどのように成されるかを定義することを可能にします。貴方のハードウェア・セットアップに依拠してこれらのメソッドの一つが他よりも自然により適合するはずです。次のセクションに加えて、公式ドキュメント もまた見るべきです。

初期化メソッドに飛び込む前に、C/C++ 観点から init_process_group の裏で何が起きているかを素早く見てみましょう。

  1. 最初に引数が解析されて検証されます。
  2. backend が name2channel.at() 関数を通して解決されます。Channel クラスが返されて、データ送信を遂行するために使用されるでしょう。
  3. GIL (訳注: Global Interpreter Lock) は破棄されて、THDProcessGroupInit() が呼び出されます。これは channel をインスタンス化してマスター・ノードのアドレスを追加します。
  4. rank 0 のプロセスはマスター手続きを実行し、その一方で総ての他の rank はワーカーです。
  5. マスターは
    1. 総てのワーカーのためにソケットを作成します。
    2. 総てのワーカーが接続するのを待ちます。
    3. 他のプロセスの位置についての情報をそれらに送ります。
  6. 各ワーカーは
    • マスターへのソケットを作成します。
    • それら自身の位置情報を送ります。
    • 他のワーカーについての情報を受け取ります。
    • ソケットをオープンして総ての他のワーカーとハンドシェイクします。
    • 初期化は成され、誰もが (everyone) 誰もに (to everyone) 接続されています。

環境変数

私達はこのチュートリアルを通して環境変数初期化メソッドを使用してきました。総てのマシン上で次の 4 つの環境変数を設定することにより、総てのプロセスはマスターに正しく接続し、他のプロセスについての情報を得て、そして最後にそれらとハンドシェイクすることができます。

  • MASTER_PORT: マシンのフリー・ポート、これは rank 0 のプロセスをホストします。
  • MASTER_ADDR: マシンの IP アドレス、これは rank 0 のプロセスをホストします。
  • WORLD_SIZE: プロセスの総計数、その結果マスターは幾つのワーカーが待っているかを知ります。
  • RANK: 各プロセスの Rank、それらはそれがワーカーのマスターであるかを知ります。

共有ファイルシステム

“shared filesystem” は総てのプロセスに共有ファイルシステムへのアクセスを持つことを要求し、そして共有ファイルを通してそれらをコーディネートします。これは各プロセスがファイルをオープンし、その情報を書き、そして総てがそれを行なうまで待つことを意味します。その後総ての必要な情報が準備ができて総てのプロセスで利用可能になります。競合状態を回避するためにファイルシステムは fcntl を通してロッキングをサポートしなければなりません。rank を手動で指定することもプロセスにそれをそれら自身で見つけ出させることもできることに注意してください。ジョブ毎に一意のグループ名を定義することで複数のジョブのために同じファイルパスを使用して衝突を安全に回避することができます。

dist.init_process_group(init_method='file:///mnt/nfs/sharedfile', world_size=4,
                        group_name='mygroup')

TCP Init & マルチキャスト

TCP を通しての初期化は 2 つの異なる方法で達成されます :

  1. rank 0 を持つプロセスの IP アドレスと world サイズを提供することによって。
  2. 任意の打倒な IP マルチキャスト・アドレス と world サイズを提供することによって。

最初のケースでは、総てのワーカーは rank 0 を持つプロセスに接続することができて上で記述された手続きをフォローします。

dist.init_process_group(init_method='tcp://10.1.1.20:23456', rank=args.rank, world_size=4)

2 番目のケースでは、マルチキャスト・アドレスは潜在的にアクティブなノードのグループを指定します、そしてコーディネーションは上の手続きをフォローする前に各プロセスが初期ハンドシェイクを持つことを可能にすることにより処理されます。加えて TCP マルチキャスト初期化はまた (共有ファイル・メソッドと同じように) group_name 引数をサポートし同じクラスタ上で複数のジョブがスケジュールされることを可能にします。

dist.init_process_group(init_method='tcp://[ff15:1e18:5d4c:4cf0:d02d:b659:53ba:b0a7]:23456',
                        world_size=4)

 

 

以上