PyTorch Lightning 1.1 : Getting Started : 基本的な特徴

PyTorch Lightning 1.1: Getting Started : 基本的な特徴 (翻訳/解説)
翻訳 : (株)クラスキャット セールスインフォメーション
作成日時 : 02/04/2021 (1.1.x)

* 本ページは、PyTorch Lightning ドキュメントの以下のページを翻訳した上で適宜、補足説明したものです:

* サンプルコードの動作確認はしておりますが、必要な場合には適宜、追加改変しています。
* ご自由にリンクを張って頂いてかまいませんが、sales-info@classcat.com までご一報いただけると嬉しいです。

 

無料セミナー実施中 クラスキャット主催 人工知能 & ビジネス Web セミナー

人工知能とビジネスをテーマにウェビナー (WEB セミナー) を定期的に開催しています。スケジュールは弊社 公式 Web サイト でご確認頂けます。
  • お住まいの地域に関係なく Web ブラウザからご参加頂けます。事前登録 が必要ですのでご注意ください。
  • Windows PC のブラウザからご参加が可能です。スマートデバイスもご利用可能です。
クラスキャットは人工知能・テレワークに関する各種サービスを提供しております :

人工知能研究開発支援 人工知能研修サービス テレワーク & オンライン授業を支援
PoC(概念実証)を失敗させないための支援 (本支援はセミナーに参加しアンケートに回答した方を対象としています。)

お問合せ : 本件に関するお問い合わせ先は下記までお願いいたします。

株式会社クラスキャット セールス・マーケティング本部 セールス・インフォメーション
E-Mail:sales-info@classcat.com ; WebSite: https://www.classcat.com/
Facebook: https://www.facebook.com/ClassCatJP/

 

Getting Started : 基本的な特徴

基本的な特徴

手動 vs 自動最適化

自動最適化

Lightning では、training_step から装着されたグラフによる損失を返す限りは、いつ grads を有効/無効にするか、backward パスを行なうか、あるいは optimizer を最適化するかについて心配する必要はありません、Lightning が最適化を自動化します。

def training_step(self, batch, batch_idx):
    loss = self.encoder(batch[0])
    return loss

 

手動最適化

けれども、GAN、強化学習、あるいは複数の optimizer や inner ループを持つ何かのような特定の研究については、自動最適化を無効にして訓練ループを貴方自身で完全に制御することができます。

最初に、自動最適化を無効にします :

trainer = Trainer(automatic_optimization=False)

今は貴方は訓練ループを所有しています!

def training_step(self, batch, batch_idx, opt_idx):
    # access your optimizers with use_pl_optimizer=False. Default is True
    (opt_a, opt_b, opt_c) = self.optimizers(use_pl_optimizer=True)

    loss_a = self.generator(batch[0])

    # use this instead of loss.backward so we can automate half precision, etc...
    self.manual_backward(loss_a, opt_a, retain_graph=True)
    self.manual_backward(loss_a, opt_a)
    opt_a.step()
    opt_a.zero_grad()

    loss_b = self.discriminator(batch[0])
    self.manual_backward(loss_b, opt_b)
    ...

 

予測 or 配備する

訓練を終えたとき、予測のために LightningModule を利用するために 3 つのオプションがあります。

 

オプション 1 : サブモデル

予測のためにシステム内の任意のモデルを引き出します。

# ----------------------------------
# to use as embedding extractor
# ----------------------------------
autoencoder = LitAutoEncoder.load_from_checkpoint('path/to/checkpoint_file.ckpt')
encoder_model = autoencoder.encoder
encoder_model.eval()

# ----------------------------------
# to use as image generator
# ----------------------------------
decoder_model = autoencoder.decoder
decoder_model.eval()

 

オプション 2 : Forward

貴方が望む方法で予測を行なうために forward メソッドを追加することもできます。

# ----------------------------------
# using the AE to extract embeddings
# ----------------------------------
class LitAutoEncoder(pl.LightningModule):
    def forward(self, x):
        embedding = self.encoder(x)
        return embedding

autoencoder = LitAutoencoder()
autoencoder = autoencoder(torch.rand(1, 28 * 28))
# ----------------------------------
# or using the AE to generate images
# ----------------------------------
class LitAutoEncoder(pl.LightningModule):
    def forward(self):
        z = torch.rand(1, 3)
        image = self.decoder(z)
        image = image.view(1, 1, 28, 28)
        return image

autoencoder = LitAutoencoder()
image_sample = autoencoder()

 

オプション 3 : プロダクション

プロダクション・システムのためには、onnx や torchscript が遥かに高速です。forward メソッドを追加したり必要なサブモデルだけをトレースすることを確実にしてください。

# ----------------------------------
# torchscript
# ----------------------------------
autoencoder = LitAutoEncoder()
torch.jit.save(autoencoder.to_torchscript(), "model.pt")
os.path.isfile("model.pt")
# ----------------------------------
# onnx
# ----------------------------------
with tempfile.NamedTemporaryFile(suffix='.onnx', delete=False) as tmpfile:
     autoencoder = LitAutoEncoder()
     input_sample = torch.randn((1, 28 * 28))
     autoencoder.to_onnx(tmpfile.name, input_sample, export_params=True)
     os.path.isfile(tmpfile.name)

 

CPU/GPU/TPU を使用する

Lightning で CPU, GPU or TPU を使用することは自明です。貴方のコードを変更する 必要はありません、単に Trainer オプションを変更します。

# train on CPU
trainer = pl.Trainer()
# train on 8 CPUs
trainer = pl.Trainer(num_processes=8)
# train on 1024 CPUs across 128 machines
trainer = pl.Trainer(
    num_processes=8,
    num_nodes=128
)
# train on 1 GPU
trainer = pl.Trainer(gpus=1)
# train on multiple GPUs across nodes (32 gpus here)
trainer = pl.Trainer(
    gpus=4,
    num_nodes=8
)
# train on gpu 1, 3, 5 (3 gpus total)
trainer = pl.Trainer(gpus=[1, 3, 5])
# Multi GPU with mixed precision
trainer = pl.Trainer(gpus=2, precision=16)
# Train on TPUs
trainer = pl.Trainer(tpu_cores=8)

貴方のコードの シングル 行を変更することなく、今では上のコードで次を行なうことができます :

# train on TPUs using 16 bit precision
# using only half the training data and checking validation every quarter of a training epoch
trainer = pl.Trainer(
    tpu_cores=8,
    precision=16,
    limit_train_batches=0.5,
    val_check_interval=0.25
)

 

チェックポイント

Lightning はモデルを自動的にセーブします。ひとたび訓練したのであれば、次のようにチェックポイントをロードできます :

model = LitModel.load_from_checkpoint(path)

上のチェックポイントはモデルを初期化して state dict を設定するために必要な総ての引数を含みます。それを手動で行なうことを好むのであれば、ここに同値があります :

# load the ckpt
ckpt = torch.load('path/to/checkpoint.ckpt')

# equivalent to the above
model = LitModel()
model.load_state_dict(ckpt['state_dict'])

 

データフロー

各ループ (訓練、検証、テスト) は貴方が実装できる 3 つのフックを持ちます :

  • x_step
  • x_step_end
  • x_epoch_end

データがどのようにフローするかを示すために、訓練ループを使用します (ie: x=training) :

outs = []
for batch in data:
    out = training_step(batch)
    outs.append(out)
training_epoch_end(outs)

Lightning の同値は :

def training_step(self, batch, batch_idx):
    prediction = ...
    return prediction

def training_epoch_end(self, training_step_outputs):
    for prediction in predictions:
        # do something with these

DP や DDP2 分散モードを使用するイベントでは (ie: GPU に渡りバッチを分割する)、手動で集めるために x_step_end を利用します (あるいは lightning に自動集積させるためにはそれを実装しません)。

for batch in data:
    model_copies = copy_model_per_gpu(model, num_gpus)
    batch_split = split_batch_per_gpu(batch, num_gpus)

    gpu_outs = []
    for model, batch_part in zip(model_copies, batch_split):
        # LightningModule hook
        gpu_out = model.training_step(batch_part)
        gpu_outs.append(gpu_out)

    # LightningModule hook
    out = training_step_end(gpu_outs)

lightning の等値は :

def training_step(self, batch, batch_idx):
    loss = ...
    return loss

def training_step_end(self, losses):
    gpu_0_loss = losses[0]
    gpu_1_loss = losses[1]
    return (gpu_0_loss + gpu_1_loss) * 1/2

TIP : 検証とテストループは同じ構造を持ちます。

 

ロギング

Tensorboard、貴方のお気に入りのロガー、and/or 進捗バーにロギングするためには、log() メソッドを使用します、これは LightningModule の任意のメソッドから呼び出すことができます。

def training_step(self, batch, batch_idx):
    self.log('my_metric', x)

log() メソッドは幾つかのオプションを持ちます :

  • on_step (訓練のそのステップでメトリックをロギングする)
  • on_epoch (自動的に累算してエポックの終わりにロギングする)
  • prog_bar (進捗バーにロギングする)
  • logger (TensorBoard のようなロガーにロギングする)

log がどこから呼び出されるかに依拠して、Lightning は正しいモードを自動決定します。しかしもちろんフラグを手動で設定することによりデフォルト動作を override することができます。

NOTE : on_epoch=True を設定すると full の訓練エポックに渡りロギングされる値を累算します。

def training_step(self, batch, batch_idx):
    self.log('my_loss', loss, on_step=True, on_epoch=True, prog_bar=True, logger=True)

NOTE : 進捗バーで示される損失値は最後の値(s) に渡り滑らかに (平均) されますので、それは訓練/検証ステップで返される実際の値とは異なります。

貴方のロガーの任意のメソッドを直接利用することもできます :

def training_step(self, batch, batch_idx):
    tensorboard = self.logger.experiment
    tensorboard.any_summary_writer_method_you_want())

ひとたび訓練が始まれば、お気に入りのロガーや Tensorboard logs ををブートアップすることによりログを見ることができます :

tensorboard --logdir ./lightning_logs

NOTE : Lightning は training_step から返された損失値を進捗バーで自動的に示します。そのため、この self.log(‘loss’, loss, prog_bar=True) のような明示的なロギングは必要ありません。

ロガー についてより多く読んでください。

 

オプションの拡張

コールバック

コールバックは任意の自己充足的プログラムで訓練ループの任意の部分で実行できます。

ここにそれほどおかしくはない (= not-so-fancy) 学習率減衰ルールを追加するサンプルがあります :

class DecayLearningRate(pl.callbacks.Callback):

    def __init__(self):
        self.old_lrs = []

    def on_train_start(self, trainer, pl_module):
        # track the initial learning rates
        for opt_idx, optimizer in enumerate(trainer.optimizers):
            group = [param_group['lr'] for param_group in optimizer.param_groups]
            self.old_lrs.append(group)

    def on_train_epoch_end(self, trainer, pl_module, outputs):
        for opt_idx, optimizer in enumerate(trainer.optimizers):
            old_lr_group = self.old_lrs[opt_idx]
            new_lr_group = []
            for p_idx, param_group in enumerate(optimizer.param_groups):
                old_lr = old_lr_group[p_idx]
                new_lr = old_lr * 0.98
                new_lr_group.append(new_lr)
                param_group['lr'] = new_lr
            self.old_lrs[opt_idx] = new_lr_group

そしてコールバックを Trainer に渡します。

decay_callback = DecayLearningRate()
trainer = Trainer(callbacks=[decay_callback])

コールバックで行えることは :

  • 訓練のある点で電子メールを送る
  • モデルを増大する
  • 学習率を更新する
  • 勾配を可視化する
  • 貴方の想像力によってのみ制限されます。

カスタム・コールバックについて更に学習します

 

LightningDataModule

DataLoader とデータ処理コードは散財して終わる傾向があります。データコードを LightningDataModule に体系化することにより再利用可能にします。

class MNISTDataModule(pl.LightningDataModule):

      def __init__(self, batch_size=32):
          super().__init__()
          self.batch_size = batch_size

      # When doing distributed training, Datamodules have two optional arguments for
      # granular control over download/prepare/splitting data:

      # OPTIONAL, called only on 1 GPU/machine
      def prepare_data(self):
          MNIST(os.getcwd(), train=True, download=True)
          MNIST(os.getcwd(), train=False, download=True)

      # OPTIONAL, called for every GPU/machine (assigning state is OK)
      def setup(self, stage):
          # transforms
          transform=transforms.Compose([
              transforms.ToTensor(),
              transforms.Normalize((0.1307,), (0.3081,))
          ])
          # split dataset
          if stage == 'fit':
              mnist_train = MNIST(os.getcwd(), train=True, transform=transform)
              self.mnist_train, self.mnist_val = random_split(mnist_train, [55000, 5000])
          if stage == 'test':
              self.mnist_test = MNIST(os.getcwd(), train=False, transform=transform)

      # return the dataloader for each split
      def train_dataloader(self):
          mnist_train = DataLoader(self.mnist_train, batch_size=self.batch_size)
          return mnist_train

      def val_dataloader(self):
          mnist_val = DataLoader(self.mnist_val, batch_size=self.batch_size)
          return mnist_val

      def test_dataloader(self):
          mnist_test = DataLoader(self.mnist_test, batch_size=self.batch_size)
          return mnist_test

LightningDataModule は異なるプロジェクトに渡りデータ分割と変換を共有して再利用することを可能にするように設計されています。それはデータを処理するために必要な総てのステップをカプセル化します : ダウンロード、トークン化、前処理 etc.

今は LightningDataModule を単純に Trainer に渡すことができます :

# init model
model = LitModel()

# init data
dm = MNISTDataModule()

# train
trainer = pl.Trainer()
trainer.fit(model, dm)

# test
trainer.test(datamodule=dm)

DataModule はデータに基づいてモデルを構築するために特に有用です。LightningDataModule について更に読んでください。

 

デバッグ

Lightning はデバッグのための多くのツールを持ちます。ここにそれらの単に幾つかのサンプルがあります :

# use only 10 train batches and 3 val batches
trainer = pl.Trainer(limit_train_batches=10, limit_val_batches=3)
# Automatically overfit the sane batch of your model for a sanity test
trainer = pl.Trainer(overfit_batches=1)
# unit test all the code- hits every line of your code once to see if you have bugs,
# instead of waiting hours to crash on validation
trainer = pl.Trainer(fast_dev_run=True)
# train only 20% of an epoch
trainer = pl.Trainer(limit_train_batches=0.2)
# run validation every 25% of a training epoch
trainer = pl.Trainer(val_check_interval=0.25)
# Profile your code to find speed/memory bottlenecks
Trainer(profiler=True)

 

他のクールな特徴

貴方の最初の Lightning モデルをひとたび定義して訓練すれば、以下のような他のクールな特徴を試すことを望むかもしれません :

あるいは更に学習するために Step-by-step walk-through を読んでください!

 

以上