上級者向けディープラーニング実装ガイド:PyTorchでTransformer・CNN・最適化テクニック完全解説

Tech Trends AI
- 13 minutes read - 2611 words上級者向けディープラーニング実装ガイド:PyTorchでTransformer・CNN・最適化テクニック完全解説
本記事では、PyTorchを使った高度なディープラーニング実装について、理論的背景から実装の詳細、性能最適化まで包括的に解説します。Transformerアーキテクチャの自作実装、CNNの高度な設計パターン、数学的基礎の深い理解を通じて、産業レベルのAIシステム構築に必要なスキルを習得できます。
目次
理論的基礎と数学的背景
Attention機構の数学的定式化
Transformerの核心であるAttention機構について、数学的基礎から解説します。
import torch
import torch.nn as nn
import torch.nn.functional as F
import math
from typing import Optional, Tuple
class ScaledDotProductAttention(nn.Module):
"""
Scaled Dot-Product Attention の完全実装
数学的定式化:
Attention(Q, K, V) = softmax(QK^T / √d_k)V
Args:
d_model: モデル次元
n_heads: マルチヘッド数
dropout: ドロップアウト率
"""
def __init__(self, d_model: int, n_heads: int, dropout: float = 0.1):
super().__init__()
assert d_model % n_heads == 0
self.d_model = d_model
self.n_heads = n_heads
self.d_k = d_model // n_heads
self.scaling_factor = math.sqrt(self.d_k)
# 線形変換レイヤー
self.w_q = nn.Linear(d_model, d_model, bias=False)
self.w_k = nn.Linear(d_model, d_model, bias=False)
self.w_v = nn.Linear(d_model, d_model, bias=False)
self.w_o = nn.Linear(d_model, d_model)
self.dropout = nn.Dropout(dropout)
# Xavier初期化
self._init_weights()
def _init_weights(self):
"""重み初期化"""
for module in [self.w_q, self.w_k, self.w_v, self.w_o]:
nn.init.xavier_uniform_(module.weight)
def forward(
self,
query: torch.Tensor,
key: torch.Tensor,
value: torch.Tensor,
mask: Optional[torch.Tensor] = None,
return_attention: bool = False
) -> Tuple[torch.Tensor, Optional[torch.Tensor]]:
"""
Args:
query: [batch_size, seq_len, d_model]
key: [batch_size, seq_len, d_model]
value: [batch_size, seq_len, d_model]
mask: [batch_size, seq_len, seq_len]
Returns:
output: [batch_size, seq_len, d_model]
attention_weights: [batch_size, n_heads, seq_len, seq_len]
"""
batch_size, seq_len = query.size(0), query.size(1)
# 1. 線形変換 & マルチヘッド分割
Q = self.w_q(query).view(batch_size, seq_len, self.n_heads, self.d_k).transpose(1, 2)
K = self.w_k(key).view(batch_size, seq_len, self.n_heads, self.d_k).transpose(1, 2)
V = self.w_v(value).view(batch_size, seq_len, self.n_heads, self.d_k).transpose(1, 2)
# 2. Scaled Dot-Product Attention計算
attention_scores = torch.matmul(Q, K.transpose(-2, -1)) / self.scaling_factor
# 3. マスク適用
if mask is not None:
mask = mask.unsqueeze(1).expand(-1, self.n_heads, -1, -1)
attention_scores.masked_fill_(mask == 0, -1e9)
# 4. Softmax適用
attention_weights = F.softmax(attention_scores, dim=-1)
attention_weights = self.dropout(attention_weights)
# 5. 値との重み付き和
context = torch.matmul(attention_weights, V)
# 6. ヘッド結合 & 出力線形変換
context = context.transpose(1, 2).contiguous().view(
batch_size, seq_len, self.d_model
)
output = self.w_o(context)
if return_attention:
return output, attention_weights
return output, None
Position Encoding の高度な実装
class AdvancedPositionalEncoding(nn.Module):
"""
高度な位置エンコーディング実装
数学的定式化:
PE(pos, 2i) = sin(pos / 10000^(2i/d_model))
PE(pos, 2i+1) = cos(pos / 10000^(2i/d_model))
"""
def __init__(self, d_model: int, max_len: int = 5000, dropout: float = 0.1):
super().__init__()
self.d_model = d_model
self.dropout = nn.Dropout(dropout)
# 位置エンコーディング行列を事前計算
pe = torch.zeros(max_len, d_model)
position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1)
# 偶数次元:sin、奇数次元:cos
div_term = torch.exp(torch.arange(0, d_model, 2).float() *
(-math.log(10000.0) / d_model))
pe[:, 0::2] = torch.sin(position * div_term)
pe[:, 1::2] = torch.cos(position * div_term)
pe = pe.unsqueeze(0).transpose(0, 1)
self.register_buffer('pe', pe)
def forward(self, x: torch.Tensor) -> torch.Tensor:
"""
Args:
x: [seq_len, batch_size, d_model]
Returns:
位置エンコーディングが追加されたテンソル
"""
x = x + self.pe[:x.size(0), :]
return self.dropout(x)
Transformerアーキテクチャの完全実装
Encoder層の実装
class TransformerEncoderLayer(nn.Module):
"""
Transformer Encoder層の完全実装
構成要素:
1. Multi-Head Self-Attention
2. Position-wise Feed-Forward Network
3. Layer Normalization & Residual Connection
"""
def __init__(
self,
d_model: int,
n_heads: int,
d_ff: int,
dropout: float = 0.1,
layer_norm_eps: float = 1e-5
):
super().__init__()
self.self_attention = ScaledDotProductAttention(d_model, n_heads, dropout)
# Position-wise FFN
self.ffn = nn.Sequential(
nn.Linear(d_model, d_ff),
nn.ReLU(),
nn.Dropout(dropout),
nn.Linear(d_ff, d_model)
)
# Layer Normalization
self.norm1 = nn.LayerNorm(d_model, eps=layer_norm_eps)
self.norm2 = nn.LayerNorm(d_model, eps=layer_norm_eps)
self.dropout = nn.Dropout(dropout)
def forward(
self,
x: torch.Tensor,
mask: Optional[torch.Tensor] = None
) -> torch.Tensor:
"""
Args:
x: [batch_size, seq_len, d_model]
mask: [batch_size, seq_len, seq_len]
"""
# 1. Multi-Head Self-Attention + Residual + LayerNorm
attn_output, _ = self.self_attention(x, x, x, mask)
x = self.norm1(x + self.dropout(attn_output))
# 2. Position-wise FFN + Residual + LayerNorm
ffn_output = self.ffn(x)
x = self.norm2(x + self.dropout(ffn_output))
return x
class TransformerEncoder(nn.Module):
"""完全なTransformer Encoder"""
def __init__(
self,
vocab_size: int,
d_model: int = 512,
n_heads: int = 8,
n_layers: int = 6,
d_ff: int = 2048,
max_len: int = 5000,
dropout: float = 0.1
):
super().__init__()
self.d_model = d_model
# 埋め込み層
self.embedding = nn.Embedding(vocab_size, d_model)
self.pos_encoding = AdvancedPositionalEncoding(d_model, max_len, dropout)
# エンコーダ層のスタック
self.layers = nn.ModuleList([
TransformerEncoderLayer(d_model, n_heads, d_ff, dropout)
for _ in range(n_layers)
])
self.dropout = nn.Dropout(dropout)
def forward(
self,
src: torch.Tensor,
src_mask: Optional[torch.Tensor] = None
) -> torch.Tensor:
"""
Args:
src: [batch_size, seq_len]
src_mask: [batch_size, seq_len, seq_len]
"""
# 埋め込み & 位置エンコーディング
x = self.embedding(src) * math.sqrt(self.d_model)
x = x.transpose(0, 1) # [seq_len, batch_size, d_model]
x = self.pos_encoding(x)
x = x.transpose(0, 1) # [batch_size, seq_len, d_model]
# エンコーダ層を順次通す
for layer in self.layers:
x = layer(x, src_mask)
return x
Decoder層とコンプリートなTransformerモデル
class TransformerDecoderLayer(nn.Module):
"""Transformer Decoder層"""
def __init__(self, d_model: int, n_heads: int, d_ff: int, dropout: float = 0.1):
super().__init__()
self.self_attention = ScaledDotProductAttention(d_model, n_heads, dropout)
self.cross_attention = ScaledDotProductAttention(d_model, n_heads, dropout)
self.ffn = nn.Sequential(
nn.Linear(d_model, d_ff),
nn.ReLU(),
nn.Dropout(dropout),
nn.Linear(d_ff, d_model)
)
self.norm1 = nn.LayerNorm(d_model)
self.norm2 = nn.LayerNorm(d_model)
self.norm3 = nn.LayerNorm(d_model)
self.dropout = nn.Dropout(dropout)
def forward(
self,
tgt: torch.Tensor,
memory: torch.Tensor,
tgt_mask: Optional[torch.Tensor] = None,
memory_mask: Optional[torch.Tensor] = None
) -> torch.Tensor:
# 1. Masked Self-Attention
self_attn, _ = self.self_attention(tgt, tgt, tgt, tgt_mask)
tgt = self.norm1(tgt + self.dropout(self_attn))
# 2. Cross-Attention with Encoder output
cross_attn, _ = self.cross_attention(tgt, memory, memory, memory_mask)
tgt = self.norm2(tgt + self.dropout(cross_attn))
# 3. Position-wise FFN
ffn_output = self.ffn(tgt)
tgt = self.norm3(tgt + self.dropout(ffn_output))
return tgt
class CompleteTransformer(nn.Module):
"""完全なTransformerモデル(Encoder-Decoder)"""
def __init__(
self,
src_vocab_size: int,
tgt_vocab_size: int,
d_model: int = 512,
n_heads: int = 8,
n_layers: int = 6,
d_ff: int = 2048,
max_len: int = 5000,
dropout: float = 0.1
):
super().__init__()
# Encoder
self.encoder = TransformerEncoder(
src_vocab_size, d_model, n_heads, n_layers, d_ff, max_len, dropout
)
# Decoder
self.tgt_embedding = nn.Embedding(tgt_vocab_size, d_model)
self.pos_encoding = AdvancedPositionalEncoding(d_model, max_len, dropout)
self.decoder_layers = nn.ModuleList([
TransformerDecoderLayer(d_model, n_heads, d_ff, dropout)
for _ in range(n_layers)
])
# 出力投影層
self.output_projection = nn.Linear(d_model, tgt_vocab_size)
self.d_model = d_model
self.dropout = nn.Dropout(dropout)
def forward(
self,
src: torch.Tensor,
tgt: torch.Tensor,
src_mask: Optional[torch.Tensor] = None,
tgt_mask: Optional[torch.Tensor] = None
) -> torch.Tensor:
"""
完全な順伝播
Args:
src: [batch_size, src_len]
tgt: [batch_size, tgt_len]
src_mask: [batch_size, src_len, src_len]
tgt_mask: [batch_size, tgt_len, tgt_len]
"""
# Encode
memory = self.encoder(src, src_mask)
# Decode
tgt_emb = self.tgt_embedding(tgt) * math.sqrt(self.d_model)
tgt_emb = tgt_emb.transpose(0, 1)
tgt_emb = self.pos_encoding(tgt_emb)
tgt_emb = tgt_emb.transpose(0, 1)
decoder_output = tgt_emb
for layer in self.decoder_layers:
decoder_output = layer(decoder_output, memory, tgt_mask, src_mask)
# 出力投影
output = self.output_projection(decoder_output)
return output
def generate_square_subsequent_mask(self, sz: int) -> torch.Tensor:
"""Causal mask生成(デコーダ用)"""
mask = torch.triu(torch.ones(sz, sz), diagonal=1)
mask = mask.masked_fill(mask == 1, float('-inf'))
return mask
高度なCNNアーキテクチャ設計
ResNet with Advanced Techniques
class AdvancedResidualBlock(nn.Module):
"""
高度なResidualブロック実装
改良点:
- Pre-activation design
- Squeeze-and-Excitation attention
- Stochastic depth
- Anti-alias downsampling
"""
def __init__(
self,
in_channels: int,
out_channels: int,
stride: int = 1,
se_ratio: float = 0.25,
drop_path_rate: float = 0.0,
anti_alias: bool = True
):
super().__init__()
self.stride = stride
self.drop_path_rate = drop_path_rate
# Pre-activation design
self.bn1 = nn.BatchNorm2d(in_channels)
self.conv1 = nn.Conv2d(in_channels, out_channels // 4, 1, bias=False)
self.bn2 = nn.BatchNorm2d(out_channels // 4)
# Anti-aliasing convolution
if stride > 1 and anti_alias:
self.conv2 = nn.Conv2d(out_channels // 4, out_channels // 4, 3,
padding=1, bias=False)
self.pool = nn.AvgPool2d(kernel_size=2, stride=2)
else:
self.conv2 = nn.Conv2d(out_channels // 4, out_channels // 4, 3,
stride=stride, padding=1, bias=False)
self.pool = None
self.bn3 = nn.BatchNorm2d(out_channels // 4)
self.conv3 = nn.Conv2d(out_channels // 4, out_channels, 1, bias=False)
# Squeeze-and-Excitation
self.se = SEBlock(out_channels, int(out_channels * se_ratio))
# Shortcut connection
if stride != 1 or in_channels != out_channels:
self.shortcut = nn.Sequential(
nn.BatchNorm2d(in_channels),
nn.Conv2d(in_channels, out_channels, 1, stride=stride, bias=False)
)
else:
self.shortcut = nn.Identity()
self.relu = nn.ReLU(inplace=True)
def forward(self, x: torch.Tensor) -> torch.Tensor:
identity = x
# Pre-activation
out = self.relu(self.bn1(x))
out = self.conv1(out)
out = self.relu(self.bn2(out))
out = self.conv2(out)
if self.pool is not None:
out = self.pool(out)
out = self.relu(self.bn3(out))
out = self.conv3(out)
# SE attention
out = self.se(out)
# Shortcut
identity = self.shortcut(identity)
# Stochastic depth
if self.training and self.drop_path_rate > 0:
if torch.rand(1) < self.drop_path_rate:
return identity
out = out + identity
return out
class SEBlock(nn.Module):
"""Squeeze-and-Excitation Block"""
def __init__(self, channels: int, reduction: int):
super().__init__()
self.avg_pool = nn.AdaptiveAvgPool2d(1)
self.fc = nn.Sequential(
nn.Linear(channels, reduction, bias=False),
nn.ReLU(inplace=True),
nn.Linear(reduction, channels, bias=False),
nn.Sigmoid()
)
def forward(self, x: torch.Tensor) -> torch.Tensor:
b, c, _, _ = x.size()
y = self.avg_pool(x).view(b, c)
y = self.fc(y).view(b, c, 1, 1)
return x * y.expand_as(x)
class AdvancedResNet(nn.Module):
"""高度なResNetアーキテクチャ"""
def __init__(
self,
layers: list,
num_classes: int = 1000,
se_ratio: float = 0.25,
drop_path_rate: float = 0.1
):
super().__init__()
self.in_channels = 64
# Stem
self.stem = nn.Sequential(
nn.Conv2d(3, 32, 3, stride=2, padding=1, bias=False),
nn.BatchNorm2d(32),
nn.ReLU(inplace=True),
nn.Conv2d(32, 32, 3, padding=1, bias=False),
nn.BatchNorm2d(32),
nn.ReLU(inplace=True),
nn.Conv2d(32, 64, 3, padding=1, bias=False),
nn.BatchNorm2d(64),
nn.ReLU(inplace=True),
nn.MaxPool2d(3, stride=2, padding=1)
)
# Stochastic depth schedule
total_blocks = sum(layers)
drop_rates = [x.item() for x in torch.linspace(0, drop_path_rate, total_blocks)]
# Residual layers
self.layer1 = self._make_layer(256, layers[0], stride=1,
drop_rates=drop_rates[0:layers[0]], se_ratio=se_ratio)
self.layer2 = self._make_layer(512, layers[1], stride=2,
drop_rates=drop_rates[layers[0]:layers[0]+layers[1]],
se_ratio=se_ratio)
self.layer3 = self._make_layer(1024, layers[2], stride=2,
drop_rates=drop_rates[layers[0]+layers[1]:layers[0]+layers[1]+layers[2]],
se_ratio=se_ratio)
self.layer4 = self._make_layer(2048, layers[3], stride=2,
drop_rates=drop_rates[layers[0]+layers[1]+layers[2]:],
se_ratio=se_ratio)
# Head
self.avgpool = nn.AdaptiveAvgPool2d((1, 1))
self.fc = nn.Linear(2048, num_classes)
self._init_weights()
def _make_layer(self, out_channels: int, blocks: int, stride: int,
drop_rates: list, se_ratio: float):
layers = []
layers.append(AdvancedResidualBlock(
self.in_channels, out_channels, stride, se_ratio, drop_rates[0]
))
self.in_channels = out_channels
for i in range(1, blocks):
layers.append(AdvancedResidualBlock(
self.in_channels, out_channels, 1, se_ratio, drop_rates[i]
))
return nn.Sequential(*layers)
def _init_weights(self):
for m in self.modules():
if isinstance(m, nn.Conv2d):
nn.init.kaiming_normal_(m.weight, mode='fan_out', nonlinearity='relu')
elif isinstance(m, nn.BatchNorm2d):
nn.init.constant_(m.weight, 1)
nn.init.constant_(m.bias, 0)
def forward(self, x: torch.Tensor) -> torch.Tensor:
x = self.stem(x)
x = self.layer1(x)
x = self.layer2(x)
x = self.layer3(x)
x = self.layer4(x)
x = self.avgpool(x)
x = torch.flatten(x, 1)
x = self.fc(x)
return x
性能最適化とメモリ効率化
Mixed Precision Training
class MixedPrecisionTrainer:
"""
混合精度学習の実装
利点:
- メモリ使用量削減
- 学習速度向上
- 数値安定性の保持
"""
def __init__(self, model: nn.Module, optimizer: torch.optim.Optimizer):
self.model = model
self.optimizer = optimizer
self.scaler = torch.cuda.amp.GradScaler()
def train_step(self, inputs: torch.Tensor, targets: torch.Tensor) -> float:
"""1ステップの学習"""
self.optimizer.zero_grad()
# 自動混合精度での順伝播
with torch.cuda.amp.autocast():
outputs = self.model(inputs)
loss = F.cross_entropy(outputs, targets)
# スケールされた勾配計算
self.scaler.scale(loss).backward()
# 勾配クリッピング(オプション)
self.scaler.unscale_(self.optimizer)
torch.nn.utils.clip_grad_norm_(self.model.parameters(), max_norm=1.0)
# オプティマイザステップ
self.scaler.step(self.optimizer)
self.scaler.update()
return loss.item()
class GradientCheckpointing(nn.Module):
"""
勾配チェックポイントを使用したメモリ効率的なモデル
メモリ使用量をO(√n)に削減(nは層数)
"""
def __init__(self, layers: nn.ModuleList):
super().__init__()
self.layers = layers
def forward(self, x: torch.Tensor) -> torch.Tensor:
def create_custom_forward(module):
def custom_forward(*inputs):
return module(*inputs)
return custom_forward
# チェックポイント機能を使用
for layer in self.layers:
if self.training:
x = torch.utils.checkpoint.checkpoint(
create_custom_forward(layer), x
)
else:
x = layer(x)
return x
Advanced Optimization Techniques
class AdamWWithWarmup(torch.optim.Optimizer):
"""
ウォームアップとコサインアニーリングを含むAdamW最適化器
"""
def __init__(
self,
params,
lr: float = 1e-3,
betas: Tuple[float, float] = (0.9, 0.999),
eps: float = 1e-8,
weight_decay: float = 0.01,
warmup_steps: int = 1000,
total_steps: int = 10000
):
defaults = dict(lr=lr, betas=betas, eps=eps, weight_decay=weight_decay)
super().__init__(params, defaults)
self.warmup_steps = warmup_steps
self.total_steps = total_steps
self.base_lrs = [group['lr'] for group in self.param_groups]
self.step_count = 0
def step(self, closure=None):
loss = None
if closure is not None:
loss = closure()
self.step_count += 1
# 学習率スケジューリング
if self.step_count <= self.warmup_steps:
# ウォームアップ期間
lr_scale = min(1.0, self.step_count / self.warmup_steps)
else:
# コサインアニーリング
progress = (self.step_count - self.warmup_steps) / (self.total_steps - self.warmup_steps)
lr_scale = 0.5 * (1 + math.cos(math.pi * progress))
for group, base_lr in zip(self.param_groups, self.base_lrs):
group['lr'] = base_lr * lr_scale
for p in group['params']:
if p.grad is None:
continue
grad = p.grad.data
if grad.is_sparse:
raise RuntimeError('AdamW does not support sparse gradients')
state = self.state[p]
# 状態初期化
if len(state) == 0:
state['step'] = 0
state['exp_avg'] = torch.zeros_like(p.data)
state['exp_avg_sq'] = torch.zeros_like(p.data)
exp_avg, exp_avg_sq = state['exp_avg'], state['exp_avg_sq']
beta1, beta2 = group['betas']
state['step'] += 1
# 重み減衰
p.data.add_(p.data, alpha=-group['weight_decay'] * group['lr'])
# 指数移動平均更新
exp_avg.mul_(beta1).add_(grad, alpha=1 - beta1)
exp_avg_sq.mul_(beta2).addcmul_(grad, grad, value=1 - beta2)
# バイアス補正
bias_correction1 = 1 - beta1 ** state['step']
bias_correction2 = 1 - beta2 ** state['step']
step_size = group['lr'] / bias_correction1
bias_correction2_sqrt = math.sqrt(bias_correction2)
# パラメータ更新
denom = (exp_avg_sq.sqrt() / bias_correction2_sqrt).add_(group['eps'])
p.data.addcdiv_(exp_avg, denom, value=-step_size)
return loss
class AdvancedLRScheduler:
"""高度な学習率スケジューラー"""
def __init__(self, optimizer: torch.optim.Optimizer, schedule_type: str = 'cosine'):
self.optimizer = optimizer
self.schedule_type = schedule_type
self.base_lrs = [group['lr'] for group in optimizer.param_groups]
def step(self, epoch: int, total_epochs: int):
if self.schedule_type == 'cosine':
self._cosine_annealing(epoch, total_epochs)
elif self.schedule_type == 'polynomial':
self._polynomial_decay(epoch, total_epochs)
elif self.schedule_type == 'exponential':
self._exponential_decay(epoch, total_epochs)
def _cosine_annealing(self, epoch: int, total_epochs: int):
"""コサインアニーリング"""
factor = 0.5 * (1 + math.cos(math.pi * epoch / total_epochs))
self._update_lr(factor)
def _polynomial_decay(self, epoch: int, total_epochs: int, power: float = 0.9):
"""多項式減衰"""
factor = (1 - epoch / total_epochs) ** power
self._update_lr(factor)
def _exponential_decay(self, epoch: int, decay_rate: float = 0.95):
"""指数減衰"""
factor = decay_rate ** epoch
self._update_lr(factor)
def _update_lr(self, factor: float):
"""学習率更新"""
for group, base_lr in zip(self.optimizer.param_groups, self.base_lrs):
group['lr'] = base_lr * factor
分散学習と大規模モデル対応
データ並列とモデル並列
class DistributedTrainer:
"""
分散学習トレーナー
対応:
- DataParallel (DP)
- DistributedDataParallel (DDP)
- Model Parallel
- Pipeline Parallel
"""
def __init__(
self,
model: nn.Module,
rank: int,
world_size: int,
backend: str = 'nccl'
):
self.rank = rank
self.world_size = world_size
# 分散プロセスグループ初期化
torch.distributed.init_process_group(
backend=backend,
rank=rank,
world_size=world_size
)
# GPU設定
torch.cuda.set_device(rank)
device = torch.device(f'cuda:{rank}')
# モデルをDDPでラップ
self.model = nn.parallel.DistributedDataParallel(
model.to(device),
device_ids=[rank],
output_device=rank,
find_unused_parameters=True
)
def train_epoch(self, train_loader, optimizer, scheduler, scaler):
"""分散学習の1エポック"""
self.model.train()
total_loss = 0
# サンプラー設定
train_loader.sampler.set_epoch(epoch)
for batch_idx, (data, target) in enumerate(train_loader):
data = data.cuda(non_blocking=True)
target = target.cuda(non_blocking=True)
optimizer.zero_grad()
# 混合精度学習
with torch.cuda.amp.autocast():
output = self.model(data)
loss = F.cross_entropy(output, target)
scaler.scale(loss).backward()
scaler.step(optimizer)
scaler.update()
total_loss += loss.item()
if batch_idx % 100 == 0 and self.rank == 0:
print(f'Batch {batch_idx}, Loss: {loss.item():.4f}')
scheduler.step()
return total_loss / len(train_loader)
def cleanup(self):
"""分散プロセス終了処理"""
torch.distributed.destroy_process_group()
class PipelineParallelModel(nn.Module):
"""パイプライン並列モデル"""
def __init__(self, model_parts: list, devices: list):
super().__init__()
self.model_parts = nn.ModuleList(model_parts)
self.devices = devices
# 各部分を対応するデバイスに移動
for part, device in zip(self.model_parts, self.devices):
part.to(device)
def forward(self, x: torch.Tensor) -> torch.Tensor:
"""パイプライン実行"""
for part, device in zip(self.model_parts, self.devices):
x = x.to(device)
x = part(x)
return x
実践的な学習テクニック
Advanced Data Augmentation
class AdvancedAugmentation:
"""
高度なデータ拡張技術
含まれる技術:
- MixUp
- CutMix
- AutoAugment
- RandAugment
"""
def __init__(self, policy: str = 'randaugment'):
self.policy = policy
def mixup_data(self, x: torch.Tensor, y: torch.Tensor, alpha: float = 1.0):
"""MixUp データ拡張"""
if alpha > 0:
lam = np.random.beta(alpha, alpha)
else:
lam = 1
batch_size = x.size(0)
index = torch.randperm(batch_size).cuda()
mixed_x = lam * x + (1 - lam) * x[index, :]
y_a, y_b = y, y[index]
return mixed_x, y_a, y_b, lam
def cutmix_data(self, x: torch.Tensor, y: torch.Tensor, alpha: float = 1.0):
"""CutMix データ拡張"""
lam = np.random.beta(alpha, alpha)
batch_size = x.size(0)
index = torch.randperm(batch_size).cuda()
_, _, H, W = x.shape
cut_rat = np.sqrt(1. - lam)
cut_w = int(W * cut_rat)
cut_h = int(H * cut_rat)
# ランダムな切り取り領域
cx = np.random.randint(W)
cy = np.random.randint(H)
bbx1 = np.clip(cx - cut_w // 2, 0, W)
bby1 = np.clip(cy - cut_h // 2, 0, H)
bbx2 = np.clip(cx + cut_w // 2, 0, W)
bby2 = np.clip(cy + cut_h // 2, 0, H)
x[:, :, bby1:bby2, bbx1:bbx2] = x[index, :, bby1:bby2, bbx1:bbx2]
# 混合比率を領域面積で調整
lam = 1 - ((bbx2 - bbx1) * (bby2 - bby1) / (W * H))
return x, y, y[index], lam
class EarlyStopping:
"""早期停止の実装"""
def __init__(self, patience: int = 7, min_delta: float = 0, restore_best_weights: bool = True):
self.patience = patience
self.min_delta = min_delta
self.restore_best_weights = restore_best_weights
self.best_score = None
self.counter = 0
self.best_weights = None
def __call__(self, val_loss: float, model: nn.Module) -> bool:
score = -val_loss
if self.best_score is None:
self.best_score = score
self.save_checkpoint(model)
elif score < self.best_score + self.min_delta:
self.counter += 1
if self.counter >= self.patience:
if self.restore_best_weights:
model.load_state_dict(self.best_weights)
return True
else:
self.best_score = score
self.save_checkpoint(model)
self.counter = 0
return False
def save_checkpoint(self, model: nn.Module):
"""ベストモデルの重み保存"""
self.best_weights = model.state_dict().copy()
# 実用的な学習ループ
def advanced_training_loop(
model: nn.Module,
train_loader: torch.utils.data.DataLoader,
val_loader: torch.utils.data.DataLoader,
epochs: int = 100
):
"""高度な学習ループの実装例"""
# 最適化器とスケジューラー
optimizer = AdamWWithWarmup(
model.parameters(),
lr=1e-3,
weight_decay=0.01,
warmup_steps=1000,
total_steps=epochs * len(train_loader)
)
scheduler = AdvancedLRScheduler(optimizer, 'cosine')
scaler = torch.cuda.amp.GradScaler()
early_stopping = EarlyStopping(patience=10)
augmentation = AdvancedAugmentation()
best_val_acc = 0
for epoch in range(epochs):
# Training
model.train()
train_loss = 0
for batch_idx, (data, target) in enumerate(train_loader):
data, target = data.cuda(), target.cuda()
# データ拡張
if np.random.rand() < 0.5:
data, target_a, target_b, lam = augmentation.mixup_data(data, target)
optimizer.zero_grad()
with torch.cuda.amp.autocast():
output = model(data)
loss = lam * F.cross_entropy(output, target_a) + \
(1 - lam) * F.cross_entropy(output, target_b)
else:
data, target_a, target_b, lam = augmentation.cutmix_data(data, target)
optimizer.zero_grad()
with torch.cuda.amp.autocast():
output = model(data)
loss = lam * F.cross_entropy(output, target_a) + \
(1 - lam) * F.cross_entropy(output, target_b)
scaler.scale(loss).backward()
scaler.step(optimizer)
scaler.update()
train_loss += loss.item()
# Validation
model.eval()
val_loss = 0
correct = 0
with torch.no_grad():
for data, target in val_loader:
data, target = data.cuda(), target.cuda()
with torch.cuda.amp.autocast():
output = model(data)
val_loss += F.cross_entropy(output, target).item()
pred = output.argmax(dim=1)
correct += pred.eq(target).sum().item()
val_acc = correct / len(val_loader.dataset)
avg_val_loss = val_loss / len(val_loader)
print(f'Epoch {epoch}: Train Loss: {train_loss/len(train_loader):.4f}, '
f'Val Loss: {avg_val_loss:.4f}, Val Acc: {val_acc:.4f}')
# 学習率更新
scheduler.step(epoch, epochs)
# 早期停止チェック
if early_stopping(avg_val_loss, model):
print(f'Early stopping at epoch {epoch}')
break
# ベストモデル保存
if val_acc > best_val_acc:
best_val_acc = val_acc
torch.save({
'epoch': epoch,
'model_state_dict': model.state_dict(),
'optimizer_state_dict': optimizer.state_dict(),
'val_acc': val_acc,
}, 'best_model.pth')
print(f'Training completed. Best validation accuracy: {best_val_acc:.4f}')
まとめ
本記事では、PyTorchを使った上級者向けディープラーニング実装について包括的に解説しました。
習得したスキル
- 理論的理解: Attention機構の数学的定式化
- アーキテクチャ設計: TransformerとCNNの高度な実装
- 最適化技術: 混合精度学習、勾配チェックポイント
- 分散学習: DataParallel、DistributedDataParallel
- 実践技術: 高度なデータ拡張、早期停止
本格運用への道筋
- スケールアップ: より大規模なデータセット・モデルへの対応
- 本番デプロイ: TorchScript、ONNX変換による推論最適化
- MLOps統合: モデル管理、バージョニング、A/Bテスト
- カスタム実装: 業務要件に応じたアーキテクチャ設計
産業レベルのAIシステム構築には、これらの基礎技術の深い理解と実装経験が不可欠です。継続的な学習と実践を通じて、より高度なAIエンジニアリングスキルを身につけていきましょう。
この記事は最新のPyTorch 2.0+に対応した実装例を含んでいます。実際の利用時は、環境に応じて適切なバージョンをご使用ください。