flow.sdk.client
Unified GPU workload orchestration.
A concise, explicit interface for submitting, monitoring, and managing GPU workloads across providers. Designed for the common path with clear escape hatches.
CatalogEntry
Instance catalog entry dictionary.
Client
Clean SDK facade for Flow - simple for 90%, powerful for 10%.
This client provides a simplified interface to Flow's functionality, abstracting away complexity while maintaining power user access.
Client.cancel
cancel(self, task_id: str, force: bool = False) -> bool
Cancel a running task.
Parameters:
task_id: Task to cancel
force: Force termination
Returns:
True if cancelled successfully
Client.provider_port
provider_port(self)
Get direct access to provider port (power users).
Returns:
Provider port for advanced operations
Client.reserve
reserve(
self,
gpu: str,
duration_hours: int = 1,
auto_renew: bool = False
) -> str
Reserve GPU capacity.
Parameters:
gpu: GPU type to reserve
duration_hours: Reservation duration
auto_renew: Auto-renew reservation
Returns:
Reservation ID
Client.run
run(
self,
command: str | list[str] | TaskSpec | TaskConfig,
gpu: Optional[str] = None,
cpus: Optional[int] = None,
memory_gb: Optional[int] = None,
wait: bool = False,
kwargs
) -> Task
Run a task with specified resources.
Parameters:
command: Command to run or task specification
gpu: GPU type (e.g., "a100", "v100")
cpus: Number of CPUs
memory_gb: Memory in GB
wait: Wait for task completion
**kwargs: Additional task configuration
Returns:
Task handle for monitoring and management
Client.ssh
ssh(self, task_id: str) -> str
Get SSH command for task.
Parameters:
task_id: Task to connect to
Returns:
SSH command string
Client.status
status(self, task_id: Optional[str] = None) -> list[Task]
Get status of task(s).
Parameters:
task_id: Specific task ID or None for all tasks
Returns:
List of tasks with current status
Client.volumes
volumes(self) -> list[Volume]
List available volumes.
Returns:
List of volumes
Flow
Primary client for submitting and managing GPU jobs.
Simple for 90% of use cases (one obvious way to run a task)
Explicit configuration via
TaskConfig
when neededClean access to logs, SSH, volumes, and instance discovery
Flow.cancel
cancel(self, task_id: str) -> None
Request cancellation of a running or pending task.
Flow.close
close(self) -> None
Release provider resources (idempotent).
Flow.create_platform_ssh_key
create_platform_ssh_key(self, name: str, public_key: str) -> dict[str, str]
Create a platform SSH key via provider (preferred).
Falls back to SSH key manager ensure path when available.
Flow.create_volume
create_volume(
self,
size_gb: int,
name: str | None = None,
interface: Literal['block', 'file'] = 'block',
region: str | None = None
) -> Volume
Create a persistent volume.
Parameters:
size_gb: Capacity in GB.
name: Optional display name (used in
volume://name
).interface: "block" (exclusive attach) or "file" (multi-attach).
region: Optional region to create the volume in. When omitted, the provider's configured/default region is used.
Returns:
Volume.
Examples:
Create and attach volumes to a task:
>>> data = flow.create_volume(500, name="datasets")
>>> ckpt = flow.create_volume(100, name="checkpoints")
>>> cfg = TaskConfig(
... name="train",
... instance_type="a100",
... command="python train.py",
... volumes=[
... {"volume_id": data.volume_id, "mount_path": "/data"},
... {"volume_id": ckpt.volume_id, "mount_path": "/ckpts"},
... ],
... )
>>> task = flow.run(cfg)
Flow.delete_platform_ssh_key
delete_platform_ssh_key(self, key_id: str) -> bool
Delete a platform SSH key via provider (preferred), else manager.
Flow.delete_volume
delete_volume(self, volume_id: str) -> None
Delete a volume permanently (no recovery).
Examples:
flow.delete_volume("vol_abc123")
Flow.dev_context
dev_context(self, auto_stop: bool = False) -> DevEnvironment
Context manager for the dev VM.
Parameters:
auto_stop: Stop the VM on context exit.
Returns:
DevEnvironment.
Flow.ensure_default_ssh_key
ensure_default_ssh_key(self) -> str | None
Ensure a default SSH key exists for the current project when supported.
Returns the key id when created or already present; None when unsupported.
Flow.find_instances
find_instances(self, requirements: InstanceRequirements, limit: int = 10) -> list[AvailableInstance]
Return available instances that match the given constraints.
Examples:
flow.find_instances({"gpu_type": "a100", "max_price": 8.0}, limit=5)
Flow.get_provider_init
get_provider_init(self) -> IProviderInit
Return the provider's initialization interface.
Flow.get_remote_operations
get_remote_operations(self) -> object
Return the provider's remote operations interface.
Flow.get_reservation
get_reservation(self, reservation_id: str) -> Any | None
Get a reservation if supported by the provider.
Returns None when unsupported.
Flow.get_ssh_key_manager
get_ssh_key_manager(self)
Return the provider's SSH key manager interface.
Flow.get_ssh_tunnel_manager
get_ssh_tunnel_manager(self) -> object
Return the provider's SSH tunnel manager.
Flow.get_task
get_task(self, task_id: str) -> Task
Return a Task
handle for an existing job.
Examples:
t = flow.get_task(task_id)
>>> print(t.status)
Flow.get_task_ssh_connection_info
get_task_ssh_connection_info(self, task_id: str) -> tuple[Path | None, str]
Return (ssh_key_path, error_message) for a task, if provider supports it.
Flow.get_user
get_user(self, user_id: str) -> Any | None
Get user info if provider exposes it; otherwise None.
Falls back to raw HTTP via provider.http when available.
Flow.get_user_teammates
get_user_teammates(self, user_id: str) -> Any
Return teammates for a given user when supported; else [].
Flow.list_platform_ssh_keys
list_platform_ssh_keys(self) -> list[dict[str, str]]
List platform SSH keys via provider (preferred).
Falls back to provider init interface if provider does not implement the SSH key methods. If the current provider lacks SSH key support (e.g., local), fall back to calling the Mithril API directly using the configured auth token and base URL. This keeps SSH key management available regardless of the active compute provider.
Flow.list_projects
list_projects(self) -> list[dict[str, str]]
List provider projects accessible to the current credentials.
Flow.list_reservations
list_reservations(self, params: dict[str, Any] | None = None) -> list[Any]
List reservations if supported by the provider.
Facet-first; falls back to provider method when available. Returns an empty list when unsupported.
Flow.list_ssh_keys
list_ssh_keys(self, project_id: str | None = None) -> list[dict[str, str]]
List SSH keys (optionally filtered by project).
Flow.list_tasks
list_tasks(
self,
status: TaskStatus | list[TaskStatus] | None = None,
limit: int = 10,
force_refresh: bool = False
) -> list[Task]
List recent tasks, optionally filtered by status.
Examples:
List running tasks and print their names:
>>> from flow.sdk.models import TaskStatus
>>> for t in flow.list_tasks(status=TaskStatus.RUNNING):
... print(t.name)
Flow.list_volumes
list_volumes(self, limit: int = 100) -> list[Volume]
List volumes for the current project (newest first).
Examples:
for v in flow.list_volumes():
... print(v.name, v.size_gb)
Flow.logs
logs(
self,
task_id: str,
follow: bool = False,
tail: int = 100,
stderr: bool = False,
source: str | None = None,
stream: str | None = None
) -> str | Iterator[str]
Return recent logs or stream them in real time.
Parameters:
task_id: The task to read logs from.
follow: If True, stream logs until the task completes.
tail: Number of trailing lines to fetch when
follow
is False.stderr: If True, select stderr (may be merged by some providers).
Returns:
str | Iterator[str]: A string (when follow=False
) or an iterator of lines.
Examples:
Fetch and print the last 50 lines:
>>> print(flow.logs(task_id, tail=50))
Stream logs and stop after an error:
>>> for line in flow.logs(task_id, follow=True):
... if "ERROR" in line:
... break
Flow.mount_volume
mount_volume(
self,
volume_id: str,
task_id: str,
mount_point: str | None = None
) -> bool
Attach a volume to a task's configuration at an optional mount point.
Examples:
Mount by name to the default path:
>>> flow.mount_volume("datasets", task_id)
Mount by ID to a custom path:
>>> flow.mount_volume("vol_abc123", task_id, mount_point="/volumes/inputs")
Flow.normalize_instance_request
normalize_instance_request(self, gpu_count: int, gpu_type: str | None = None) -> tuple[str, int, str | None]
Normalize a GPU request to an instance_type and instance count.
Provider-first: attempts provider.normalize_instance_request(). Accepts both legacy tuple shape
(instance_type, num_instances, warning)
and dict shape{"instance_type": str, "num_instances": int, ...}
.Robust fallback: uses a simple heuristic (2x/4x/8x) when unsupported or when provider returns an unexpected shape.
Flow.reservation_availability
reservation_availability(self, params: dict[str, Any]) -> list[dict[str, Any]]
Return reservation availability if supported by the provider.
Parameters:
params: Keys include instance_type, region, earliest_start_time, latest_end_time
Returns:
A list of slots (as dicts). Empty list when unsupported or on error.
Flow.resolve_ssh_endpoint
resolve_ssh_endpoint(self, task_id: str, node: int | None = None) -> tuple[str, int]
Resolve SSH endpoint (host, port) for a task (provider fallback).
Flow.run
run(
self,
task: TaskConfig | str | Path,
wait: bool = False,
mounts: str | dict[str, str] | None = None
) -> Task
Submit a task.
Parameters:
task:
TaskConfig
, path to YAML, or string path for YAML.wait: If True, block until the task is running before returning.
mounts: Optional data mounts; string or mapping of target->source.
Returns:
Task: Handle for status, logs, SSH, cancel, etc.
Examples:
Command as a string with an explicit instance type:
>>> task = flow.run("python train.py --epochs 10", instance_type="a100", wait=True)
Full TaskConfig with volumes and limit price:
>>> from flow.sdk.models import TaskConfig, VolumeSpec
>>> cfg = TaskConfig(
... name="train",
... instance_type="4xa100",
... command=["python", "-m", "torch.distributed.run", "--nproc_per_node=4", "train.py"],
... volumes=[VolumeSpec(size_gb=200, mount_path="/data", name="train-data")],
... max_price_per_hour=12.0,
... )
>>> task = flow.run(cfg)
Capability-based selection (cheapest GPU with >= 40GB):
>>> cfg = TaskConfig(name="infer", min_gpu_memory_gb=40, command="python serve.py")
>>> task = flow.run(cfg)
Flow.shell
shell(
self,
task_id: str,
command: str | None = None,
node: int | None = None,
progress_context = None,
record: bool = False
) -> None
Open an interactive shell or run a command on the task instance.
Examples:
Open a shell:
>>> flow.shell(task_id)
Run a one-off command:
>>> flow.shell(task_id, command="nvidia-smi")
Flow.status
status(self, task_id: str) -> str
Return the task status string (pending, running, completed, failed, cancelled).
Flow.submit
submit(
self,
command: str,
gpu: str | None = None,
mounts: str | dict[str, str] | None = None,
instance_type: str | None = None,
wait: bool = False
) -> Task
Submit a shell command with minimal configuration.
Parameters:
command: Passed to the container shell.
gpu: e.g. "a100", "a100:4", or "gpu:40gb". Ignored if
instance_type
is set.mounts: Optional data mounts (string or mapping of target->source).
instance_type: Explicit override of the instance type.
wait: If True, block until the task completes.
Returns:
Task.
Examples:
Quick usage with GPU shorthand:
>>> task = flow.submit("python train.py", gpu="a100")
Multiple mounts:
>>> task = flow.submit(
... "torchrun --nproc_per_node=4 train.py",
... gpu="a100:4",
... mounts={
... "/data": "volume://datasets",
... "/models": "s3://bucket/pretrained/",
... },
... wait=True,
... )
Flow.upload_code_to_task
upload_code_to_task(
self,
task_id: str,
source_dir: Path | None = None,
timeout: int = 600,
console: object | None = None,
target_dir: str = '/workspace',
progress_reporter: object | None = None,
git_incremental: bool | None = None
) -> object
Upload code to an existing task (facet/provider fallback).
Prefers provider's richer upload (with progress). Falls back to storage facet upload_directory
if available.
Flow.wait_for_ssh
wait_for_ssh(
self,
task_id: str,
timeout: int = 600,
show_progress: bool = True,
progress_adapter: object | None = None
) -> Task
Block until SSH is ready for the task or time out.
GPUInstanceDict
GPU instance dictionary returned by _find_gpus_by_memory().
InstanceRequirements
Instance requirements dictionary for find_instances().
TaskDict
Task dictionary returned by list() method.
Last updated