このノートブックでは、Databricks サーバーレス GPU コンピューティングを使用する 8 H100 GPU の大規模な 120B パラメーター GPT-OSS モデルの監視対象微調整 (SFT) を示します。 このトレーニングでは、次の内容が活用されます。
- FSDP (完全にシャード化されたデータ並列):GPU 全体のモデル パラメーター、グラデーション、オプティマイザーの状態をシャード化して、1 つの GPU に収まらない大規模なモデルのトレーニングを可能にします。
- DDP (分散データ並列):トレーニングをより高速に行う複数の GPU にトレーニングを分散します。
- LoRA (Low-Rank 適応): 小さなアダプター 層を追加してトレーニング可能なパラメーターの数を減らし、微調整をより効率的にします。
- TRL (トランスフォーマー強化学習):監視による微調整のための SFTTrainer を提供します。
remote=Falseを設定し、16 個の GPU を指定することで、16 個の GPU にまたがるマルチノード トレーニングに拡張できます。
必要なパッケージをインストールする
分散トレーニングとモデルの微調整に必要なライブラリをインストールします。
-
trl:SFTトレーニング用トランスフォーマー強化学習ライブラリ -
peft: LoRA アダプターの Parameter-Efficient Fine-Tuning -
transformers:フェイストランスフォーマーライブラリを抱きしめる -
datasets: トレーニング データセットを読み込む場合 -
accelerate: 分散トレーニング オーケストレーションの場合 -
hf_transfer: Hugging Face からのモデルのダウンロードを高速化する
%pip install "trl==1.1.0"
%pip install "peft==0.19.1"
%pip install "transformers==5.5.4"
%pip install "fsspec==2024.9.0"
%pip install "huggingface_hub==1.11.0"
%pip install "datasets==3.2.0"
%pip install "accelerate==1.13.0"
%restart_python
FSDP を使用して分散トレーニング関数を定義する
このセルは、 @distributed デコレーターを使用して 8 H100 GPU で実行されるトレーニング関数を定義します。 この関数には次のものが含まれます。
- モデルの読み込み: bfloat16 精度でモデル GPT-OSS 120B パラメーターを読み込みます
- LoRA 構成: ランク 16 の Low-Rank 適応を適用して、トレーニング可能なパラメーターを減らします
- FSDP セットアップ: 自動レイヤー ラッピングとアクティブ化チェックポイント処理を使用して、完全シャード データ並列を構成する
- トレーニング構成: バッチ サイズ、学習率、勾配累積、およびその他のハイパーパラメーターを設定します
- データセット: 微調整のために HuggingFaceH4/Multilingual-Thinking データセットを使用します
この関数は、FSDP ラッピング用のトランスフォーマー ブロック クラスを自動的に検出し、すべての GPU で分散トレーニング調整を処理します。
dbutils.widgets.text("uc_catalog", "main")
dbutils.widgets.text("uc_schema", "default")
dbutils.widgets.text("uc_model_name", "gpt-oss-120b-peft")
dbutils.widgets.text("uc_volume", "checkpoints")
dbutils.widgets.text("model", "openai/gpt-oss-120b")
dbutils.widgets.text("dataset_path", "HuggingFaceH4/Multilingual-Thinking")
UC_CATALOG = dbutils.widgets.get("uc_catalog")
UC_SCHEMA = dbutils.widgets.get("uc_schema")
UC_MODEL_NAME = dbutils.widgets.get("uc_model_name")
UC_VOLUME = dbutils.widgets.get("uc_volume")
HF_MODEL_NAME = dbutils.widgets.get("model")
DATASET_PATH = dbutils.widgets.get("dataset_path")
print(f"UC_CATALOG: {UC_CATALOG}")
print(f"UC_SCHEMA: {UC_SCHEMA}")
print(f"UC_MODEL_NAME: {UC_MODEL_NAME}")
print(f"UC_VOLUME: {UC_VOLUME}")
print(f"HF_MODEL_NAME: {HF_MODEL_NAME}")
print(f"DATASET_PATH: {DATASET_PATH}")
OUTPUT_DIR = f"/Volumes/{UC_CATALOG}/{UC_SCHEMA}/{UC_VOLUME}/{UC_MODEL_NAME}"
print(f"OUTPUT_DIR: {OUTPUT_DIR}")
from serverless_gpu import distributed
@distributed(gpus=8, gpu_type='H100')
def train_gpt_oss_fsdp_120b():
"""
Fine-tune a 120B-class model with TRL SFTTrainer + FSDP2 on H100s.
Uses LoRA + activation ckpt + full_shard auto_wrap.
"""
# --- imports inside for pickle safety ---
import os, torch, torch.distributed as dist
from transformers import AutoModelForCausalLM, AutoTokenizer, Mxfp4Config
from trl import SFTTrainer, SFTConfig
from datasets import load_dataset
from peft import LoraConfig, get_peft_model
# ---------- DDP / CUDA binding ----------
local_rank = int(os.environ.get("LOCAL_RANK", "0"))
torch.cuda.set_device(local_rank)
os.environ.setdefault("TOKENIZERS_PARALLELISM", "false")
os.environ.setdefault("NCCL_DEBUG", "WARN")
os.environ.setdefault("CUDA_LAUNCH_BLOCKING", "0")
os.environ.setdefault("TORCH_NCCL_ASYNC_ERROR_HANDLING", "1") # replaces NCCL_ASYNC_ERROR_HANDLING
# ---------- Config ----------
MAX_LENGTH = 2048
PER_DEVICE_BATCH = 1 # start conservative for 120B
GRAD_ACCUM = 4 # tune for throughput
LR = 1.5e-4
EPOCHS = 1
is_main = int(os.environ.get("RANK", "0")) == 0
world_size = int(os.environ.get("WORLD_SIZE", "1"))
if is_main:
print("=" * 60)
print("FSDP (full_shard) launch for 120B")
print(f"WORLD_SIZE={world_size} | LOCAL_RANK={local_rank}")
print("=" * 60)
# ---------- Tokenizer ----------
tokenizer = AutoTokenizer.from_pretrained(HF_MODEL_NAME)
if tokenizer.pad_token_id is None and tokenizer.eos_token_id is not None:
tokenizer.pad_token = tokenizer.eos_token
tokenizer.model_max_length = MAX_LENGTH
tokenizer.truncation_side = "right"
# ---------- Model ----------
# IMPORTANT: no device_map, no .to(device) — let Trainer/Accelerate+FSDP handle placement
# low_cpu_mem_usage helps with massive checkpoints (still needs decent host RAM)
quantization_config = Mxfp4Config(dequantize=True)
model = AutoModelForCausalLM.from_pretrained(
HF_MODEL_NAME,
dtype=torch.bfloat16,
attn_implementation="eager",
quantization_config=quantization_config,
use_cache=False, # needed for grad ckpt
low_cpu_mem_usage=True,
)
# ---------- LoRA ----------
# the following config works
# include MoE layers as well.
peft_config = LoraConfig(
r=32,
lora_alpha=32,
target_modules="all-linear",
rank_pattern={
"mlp.experts.gate_up_proj": 8,
"mlp.experts.down_proj": 8
},
target_parameters=["mlp.experts.gate_up_proj", "mlp.experts.down_proj"],
lora_dropout=0.0,
bias="none",
task_type="CAUSAL_LM",
)
model = get_peft_model(model, peft_config)
# Cast all parameters to bfloat16 so FSDP sees a uniform dtype
# (LoRA adapters are initialized in float32 by default)
model = model.to(torch.bfloat16)
if is_main:
model.print_trainable_parameters()
# ---------- Data ----------
dataset = load_dataset("HuggingFaceH4/Multilingual-Thinking", split="train")
if is_main:
print(f"Dataset size: {len(dataset)}")
# ---------- FSDP settings ----------
def infer_transformer_blocks_for_fsdp(model):
COMMON = {
"LlamaDecoderLayer", "MistralDecoderLayer", "MixtralDecoderLayer",
"Qwen2DecoderLayer", "Gemma2DecoderLayer", "Phi3DecoderLayer",
"GPTNeoXLayer", "MPTBlock", "BloomBlock", "FalconDecoderLayer",
"DecoderLayer", "GPTJBlock", "OPTDecoderLayer"
}
hits = set()
for _, m in model.named_modules():
name = m.__class__.__name__
if name in COMMON:
hits.add(name)
# Fallback: grab anything that *looks* like a decoder block
if not hits:
for _, m in model.named_modules():
name = m.__class__.__name__
if any(s in name for s in ["Block", "DecoderLayer", "Layer"]) and "Embedding" not in name:
hits.add(name)
return sorted(hits)
fsdp_wrap_classes = infer_transformer_blocks_for_fsdp(model)
if not fsdp_wrap_classes:
raise RuntimeError("Could not infer transformer block classes for FSDP wrapping; "
"print(model) and add the block class explicitly.")
training_args = SFTConfig(
output_dir=OUTPUT_DIR,
num_train_epochs=EPOCHS,
per_device_train_batch_size=PER_DEVICE_BATCH,
gradient_accumulation_steps=GRAD_ACCUM,
learning_rate=LR,
warmup_ratio=0.03,
lr_scheduler_type="cosine",
bf16=True,
logging_steps=5,
logging_strategy="steps",
save_strategy="no",
report_to="none",
ddp_find_unused_parameters=False,
dataloader_pin_memory=True,
max_length=MAX_LENGTH,
gradient_checkpointing=False,
# ---- FSDP2 knobs ----
fsdp="full_shard auto_wrap",
fsdp_config={
"version": 2,
"fsdp_transformer_layer_cls_to_wrap": fsdp_wrap_classes,
"reshard_after_forward": True,
"activation_checkpointing": True, # <- use activation ckpt (not gradient)
"xla": False,
"limit_all_gathers": True,
},
)
# ---------- Trainer ----------
trainer = SFTTrainer(
model=model,
args=training_args,
train_dataset=dataset,
processing_class=tokenizer,
)
# verify distributed init & FSDP
rank = int(os.getenv("RANK", "0"))
print(f"[rank {rank}] dist.is_initialized() -> {dist.is_initialized()}")
acc = getattr(trainer, "accelerator", None)
print(f"[rank {rank}] accelerator.distributed_type = {getattr(getattr(acc,'state',None),'distributed_type','n/a')}")
print(f"[rank {rank}] accelerator.num_processes = {getattr(acc, 'num_processes', 'n/a')}")
# ---------- Train ----------
result = trainer.train()
if is_main:
print("\nTraining complete (FSDP).")
print(result.metrics)
分散トレーニング ジョブを実行する
8 個の H100 GPU でトレーニング関数を実行します。
@distributed デコレータは、適切な分散セットアップを使用して、すべての GPU でトレーニングを開始するオーケストレーションを処理します。
train_gpt_oss_fsdp_120b.distributed()
次のステップ
- マルチ GPU とマルチノード分散トレーニング
- サーバーレス GPU コンピューティングのベスト プラクティス
- サーバーレス GPU コンピューティングに関する問題のトラブルシューティング
- PEFT ドキュメント
- TRL のドキュメント