flow.sdk.decorators
Decorator-based remote execution.
FlowApp
Flow
with an @function
decorator for remote execution.
FlowApp.function
function(
self,
gpu: str | None = None,
cpu: float | tuple[float, float] | None = None,
memory: int | tuple[int, int] | None = None,
gpu_memory_gb: int | None = None,
image: str | None = None,
retries: int | Retries = 0,
timeout: int | None = None,
volumes: dict[str, Any] | None = None,
environment: dict[str, str] | None = None,
env: dict[str, str] | None = None,
secrets: list[Secret] | None = None,
max_result_size: int | None = 10485760,
kwargs
) -> Callable[[Callable[P, R]], RemoteFunction[P, R]]
Decorator that returns a RemoteFunction
configured with resources.
Parameters:
gpu: GPU spec, for example "a100" or "h100:8". Alternatively set
gpu_memory_gb
to request by memory.cpu: CPU cores as float or (request, limit).
memory: Memory in MB as int or (request, limit).
image: Container image (default: "python:3.11").
retries: Retry policy (int or
Retries
).timeout: Maximum runtime in seconds.
volumes: Mapping of mount_path to volume reference. Each value can be a volume name or a dict with
volume_id
and other fields.environment | env: Environment variables to set.
secrets: Secrets to inject as environment variables.
max_result_size: Maximum allowed size of serialized result (bytes).
**kwargs: Additional
TaskConfig
fields to merge.
Returns:
Callable[[Callable[P, R]], RemoteFunction[P, R]]: A decorator that wraps the function.
Examples:
Minimal configuration:
>>> @app.function(gpu="a100")
... def f(x: int) -> int:
... return x * 2
Full configuration with volumes and env:
>>> @app.function(
... gpu="h100:8",
... cpu=32.0,
... memory=131072,
... image="nvcr.io/nvidia/pytorch:23.10-py3",
... volumes={
... "/data": {"name": "datasets", "size_gb": 500},
... "/ckpts": "model-checkpoints",
... },
... environment={"NCCL_DEBUG": "INFO"},
... )
... def train(cfg: str) -> dict:
... return {"status": "ok"}
RemoteFunction
Wraps a Python function to support .remote()
GPU execution.
Local calls execute the function as-is; .remote()
runs it remotely with an isolated environment and returns the JSON-serializable result.
RemoteFunction.remote
remote(self, args: P.args, kwargs: P.kwargs) -> R
Execute remotely on GPU and return the JSON-serializable result.
Parameters:
*args: Positional arguments for the wrapped function.
**kwargs: Keyword arguments for the wrapped function.
Returns:
R: The function's return value.
Examples:
Simple remote call:
>>> @app.function(gpu="a100")
... def add(x: int, y: int) -> int:
... return x + y
>>> add.remote(2, 3)
Configure retries and timeout:
>>> @app.function(gpu="a100", retries=3, timeout=600)
... def train(cfg_path: str) -> dict:
... return {"ok": True}
>>> train.remote("config.yaml")
RemoteFunction.spawn
spawn(self, args: P.args, kwargs: P.kwargs) -> Task
Submit for async execution and return the Task
immediately.
Parameters:
*args: Positional arguments for the wrapped function.
**kwargs: Keyword arguments for the wrapped function.
Returns:
Task: Handle that can be used to monitor execution.
Examples:
Launch N parallel tasks and stream their last logs:
>>> tasks = [process.spawn(i) for i in range(8)]
>>> for t in tasks:
... t.wait()
... print(t.logs(tail=10))
Last updated