Multi-node training
Traditional multi-node distributed training requires manual coordination—discovering node IPs, assigning ranks, configuring network communication, and setting up shared storage. Flow eliminates this complexity through automatic coordination.
Instead of managing infrastructure, you focus on your training code:
@app.function(
gpu="h100:8",
num_instances=4,
distributed_mode="auto"
)
def distributed_training():
# Flow automatically sets: RANK, LOCAL_RANK, WORLD_SIZE, MASTER_ADDR, MASTER_PORT
dist.init_process_group(backend="nccl")
# Your training code here...
Flow handles node discovery, rank assignment, environment setup, shared storage, and debugging tools. Here's how each piece works.
Automatic Coordination
Flow's rendezvous system eliminates manual node coordination. When you specify num_instances > 1
, Flow automatically handles node discovery, rank assignment, and environment setup.
From Single-Node to Multi-Node
Convert single-node training to multi-node by adding two parameters:
from flow import FlowApp
import torch
import torch.distributed as dist
from torch.nn.parallel import DistributedDataParallel as DDP
app = FlowApp()
@app.function(
gpu="h100:8",
num_instances=4, # Flow coordinates 4 nodes automatically
distributed_mode="auto", # Enable automatic rendezvous
image="nvcr.io/nvidia/pytorch:23.10-py3"
)
def distributed_training():
"""32-GPU distributed training with automatic coordination."""
# Flow automatically sets these environment variables:
node_rank = int(os.environ["FLOW_NODE_RANK"]) # 0, 1, 2, 3
num_nodes = int(os.environ["FLOW_NUM_NODES"]) # 4
main_ip = os.environ["FLOW_MAIN_IP"] # Master node IP
# Standard PyTorch distributed variables (also set by Flow):
# RANK, LOCAL_RANK, WORLD_SIZE, MASTER_ADDR, MASTER_PORT
print(f"Node {node_rank}/{num_nodes} - Master: {main_ip}")
# Standard PyTorch distributed setup—no changes needed
dist.init_process_group(backend="nccl")
local_rank = int(os.environ["LOCAL_RANK"])
torch.cuda.set_device(local_rank)
# Your existing model training code
model = create_model().cuda()
model = DDP(model, device_ids=[local_rank])
for epoch in range(10):
loss = train_epoch(model)
if dist.get_rank() == 0: # Only master logs
print(f"Epoch {epoch}: loss={loss:.4f}")
return {"node": node_rank, "status": "complete"}
# Launch 32 GPUs with automatic coordination
result = distributed_training.remote()
How Flow's Rendezvous Works
When you submit the task, Flow's coordination system:
Discovers all instances via the provider API
Assigns deterministic ranks based on instance IDs
Elects a leader (rank 0) and resolves its private IP
Sets environment variables for PyTorch distributed training
Synchronizes startup so all nodes begin together
Your training code uses standard PyTorch distributed patterns—dist.init_process_group()
, DDP
, dist.barrier()
—without any Flow-specific modifications.
Environment Variables Set by Flow
Flow automatically configures the distributed training environment:
# Flow-specific coordination
FLOW_NODE_RANK=0 # This node's rank (0, 1, 2, 3)
FLOW_NUM_NODES=4 # Total number of nodes
FLOW_MAIN_IP=10.0.1.5 # Private IP of rank 0 node
# PyTorch distributed training
RANK=0 # Global process rank (0-31 for 32 GPUs)
LOCAL_RANK=0 # GPU index within this node (0-7)
WORLD_SIZE=32 # Total number of processes/GPUs
MASTER_ADDR=10.0.1.5 # Address for process group coordination
MASTER_PORT=29500 # Port for process group coordination
No manual configuration required—Flow handles the entire coordination process automatically.
Shared Storage
Multi-node training requires storage that all nodes can access simultaneously—shared datasets, model checkpoints, and training logs. Flow lets you specify volumes that automatically mount across all nodes in your deployment.
File Storage for Multi-Node Access
When you specify interface="file"
in your volume configuration, Flow ensures the storage is accessible across all nodes:
@app.function(
gpu="h100:8",
num_instances=4,
volumes={
"/shared/datasets": {
"name": "training-data",
"size_gb": 1000,
"interface": "file" # Shared access across nodes
},
"/shared/checkpoints": {
"name": "model-checkpoints",
"size_gb": 500,
"interface": "file" # All nodes can read/write
}
}
)
def training_with_shared_storage():
"""Distributed training with shared datasets and checkpoints."""
import os
from pathlib import Path
node_rank = int(os.environ["FLOW_NODE_RANK"])
# All nodes see the same files
datasets_path = Path("/shared/datasets")
checkpoints_path = Path("/shared/checkpoints")
dist.init_process_group(backend="nccl")
# Load dataset (accessible from all nodes)
dataset = load_dataset_from_path(datasets_path)
model = create_model().cuda()
model = DDP(model)
# Coordinated checkpointing
for epoch in range(10):
loss = train_epoch(model, dataset)
# Only master node saves checkpoints
if node_rank == 0:
checkpoint_path = checkpoints_path / f"epoch_{epoch}.pt"
torch.save(model.state_dict(), checkpoint_path)
print(f"Checkpoint saved: {checkpoint_path}")
# All nodes wait for checkpoint save
dist.barrier()
return {"epochs_completed": 10, "checkpoints_saved": True}
result = training_with_shared_storage.remote()
S3 Dataset Integration
For read-only datasets, you can mount S3 buckets that appear on all nodes:
@app.function(
gpu="h100:8",
num_instances=4,
distributed_mode="auto"
)
def training_with_s3_data():
"""All nodes get identical S3 mount access."""
# S3 data available at /data on all nodes
dataset_path = "/data"
dist.init_process_group(backend="nccl")
# Load dataset from S3 mount
dataset = load_dataset_from_s3_mount(dataset_path)
model = create_model().cuda()
model = DDP(model)
# Train with shared S3 data
for epoch in range(10):
loss = train_epoch(model, dataset)
return {"training": "complete"}
# Submit with S3 mount (automatically available on all nodes)
from flow import Flow
flow = Flow()
task = flow.run(
training_with_s3_data._build_task_config(""),
mounts={"/data": "s3://ml-datasets/imagenet"}
)
Volume Configuration
Flow supports different storage configurations:
File storage (
interface="file"
): Shared access across all nodes. Use for datasets, checkpoints, and logs that need multi-node read/write access.S3 mounts: Read-only access to S3 buckets, mounted on all nodes. Useful for large datasets without persistent storage costs.
When you specify volumes in a multi-node function, Flow ensures they appear simultaneously on every node, eliminating manual synchronization between nodes.
Debugging Multi-Node Training
When distributed training fails, you need to inspect individual nodes—check GPU utilization, NCCL communication, memory usage, and process status. Flow provides node-specific SSH access and log aggregation to simplify multi-node debugging.
Node-Specific SSH Access
Flow lets you SSH directly to any node in your multi-node deployment:
# SSH to specific nodes in your cluster
flow ssh training-task-xyz --node 0 # Master node
flow ssh training-task-xyz --node 1 # Worker node 1
flow ssh training-task-xyz --node 2 # Worker node 2
flow ssh training-task-xyz --node 3 # Worker node 3
# Run commands on specific nodes
flow ssh training-task-xyz --node 1 --command "nvidia-smi"
flow ssh training-task-xyz --node 2 --command "ps aux | grep python"
Debugging Workflows
Check GPU utilization across all nodes to identify performance issues:
# Monitor GPU usage across all nodes
for node in {0..3}; do
echo "=== Node $node GPU Status ==="
flow ssh training-task-xyz --node $node --command "nvidia-smi --query-gpu=utilization.gpu --format=csv,noheader"
done
# Check NCCL communication logs
for node in {0..3}; do
echo "=== Node $node NCCL Status ==="
flow ssh training-task-xyz --node $node --command "grep 'NCCL' /var/log/training.log | tail -5"
done
Log Aggregation
Flow aggregates logs from all nodes with automatic node identification:
# Get logs from all nodes, timestamped and labeled
task = flow.get_task("training-task-xyz")
logs = task.logs()
# Example output:
# [Node 0] [10:30:15] NCCL: Connected to all peers successfully
# [Node 1] [10:30:16] NCCL: Connection timeout to 10.0.1.5:29500
# [Node 2] [10:30:16] NCCL: Connected to all peers successfully
# [Node 3] [10:30:16] NCCL: Connected to all peers successfully
# Get logs from specific nodes
node_1_logs = task.logs(node=1) # Debug problematic node
print("Node 1 specific logs:", node_1_logs)
# Stream logs in real-time
for line in task.logs(follow=True):
print(line)
Common Debugging Scenarios
NCCL Communication Issues: Use node-specific SSH to check network connectivity and NCCL environment variables on individual nodes.
Uneven GPU Utilization: SSH to underutilized nodes to check process status, memory usage, and data loading bottlenecks.
Hanging Training: Check if specific nodes are stuck in barriers or collective operations using per-node process inspection.
Flow's node-specific access eliminates the need to manage separate SSH connections or correlate logs manually across multiple instances.
Production Configuration
Production multi-node training requires cost controls and performance optimization. Flow provides built-in safeguards and configuration options for production workloads.
Cost Protection
Flow includes automatic cost protection mechanisms for large-scale training:
@app.function(
gpu="h100-80gb.sxm.8x", # 8x H100 per node
num_instances=8, # 64 GPUs total
# Cost protection
priority="med", # Balanced pricing tier
max_price_per_hour=50.0, # $50 per instance (total: 8 × $50 = $400/hour)
max_run_time_hours=12, # Auto-terminate after 12 hours
# Production image and environment
image="nvcr.io/nvidia/pytorch:24.01-py3",
environment={
"NCCL_DEBUG": "WARN",
"NCCL_TREE_THRESHOLD": "0", # Force ring algorithms for large clusters
"CUDA_VISIBLE_DEVICES": "0,1,2,3,4,5,6,7"
}
)
def production_training(learning_rate: float = 1e-4, batch_size: int = 32):
# Training code here...
return {"final_loss": epoch_loss, "epochs": 100, "gpus_used": world_size}
# Monitor costs during training
task = production_training.spawn(learning_rate=5e-5)
print(f"Current cost: ${task.estimated_cost:.2f}")
print(f"Cost cap: ${task.max_price_per_hour * task.num_instances:.2f}/hour")
Performance Optimization
Flow automatically provisions high-speed networking for multi-node workloads. For explicit control, specify interconnect requirements:
@app.function(
gpu="h100:8",
num_instances=16, # 128 GPUs
internode_interconnect="InfiniBand", # Force InfiniBand networking
intranode_interconnect="SXM5", # Force SXM5 within nodes
environment={
"NCCL_IB_DISABLE": "0", # Enable InfiniBand for NCCL
"NCCL_NET_GDR_LEVEL": "3" # GPU Direct RDMA
}
)
def high_performance_training():
"""Optimized for maximum multi-node performance."""
pass
The complete source code for this guide and all examples are available in the Flow repository on GitHub.
Last updated