flow.sdk.decorators

Decorator-based remote execution.

FlowApp

Flow with an @function decorator for remote execution.

FlowApp.function

function(
    self,
    gpu: str | None = None,
    cpu: float | tuple[float, float] | None = None,
    memory: int | tuple[int, int] | None = None,
    gpu_memory_gb: int | None = None,
    image: str | None = None,
    retries: int | Retries = 0,
    timeout: int | None = None,
    volumes: dict[str, Any] | None = None,
    environment: dict[str, str] | None = None,
    env: dict[str, str] | None = None,
    secrets: list[Secret] | None = None,
    max_result_size: int | None = 10485760,
    kwargs
) -> Callable[[Callable[P, R]], RemoteFunction[P, R]]

Decorator that returns a RemoteFunction configured with resources.

Parameters:

  • gpu: GPU spec, for example "a100" or "h100:8". Alternatively set gpu_memory_gb to request by memory.

  • cpu: CPU cores as float or (request, limit).

  • memory: Memory in MB as int or (request, limit).

  • image: Container image (default: "python:3.11").

  • retries: Retry policy (int or Retries).

  • timeout: Maximum runtime in seconds.

  • volumes: Mapping of mount_path to volume reference. Each value can be a volume name or a dict with volume_id and other fields.

  • environment | env: Environment variables to set.

  • secrets: Secrets to inject as environment variables.

  • max_result_size: Maximum allowed size of serialized result (bytes).

  • **kwargs: Additional TaskConfig fields to merge.

Returns:

Callable[[Callable[P, R]], RemoteFunction[P, R]]: A decorator that wraps the function.

Examples:

Minimal configuration:
    >>> @app.function(gpu="a100")
    ... def f(x: int) -> int:
    ...     return x * 2
    Full configuration with volumes and env:
    >>> @app.function(
    ...     gpu="h100:8",
    ...     cpu=32.0,
    ...     memory=131072,
    ...     image="nvcr.io/nvidia/pytorch:23.10-py3",
    ...     volumes={
    ...         "/data": {"name": "datasets", "size_gb": 500},
    ...         "/ckpts": "model-checkpoints",
    ...     },
    ...     environment={"NCCL_DEBUG": "INFO"},
    ... )
    ... def train(cfg: str) -> dict:
    ...     return {"status": "ok"}

RemoteFunction

Wraps a Python function to support .remote() GPU execution.

Local calls execute the function as-is; .remote() runs it remotely with an isolated environment and returns the JSON-serializable result.

RemoteFunction.remote

remote(self, args: P.args, kwargs: P.kwargs) -> R

Execute remotely on GPU and return the JSON-serializable result.

Parameters:

  • *args: Positional arguments for the wrapped function.

  • **kwargs: Keyword arguments for the wrapped function.

Returns:

R: The function's return value.

Examples:

Simple remote call:
    >>> @app.function(gpu="a100")
    ... def add(x: int, y: int) -> int:
    ...     return x + y
    >>> add.remote(2, 3)
    Configure retries and timeout:
    >>> @app.function(gpu="a100", retries=3, timeout=600)
    ... def train(cfg_path: str) -> dict:
    ...     return {"ok": True}
    >>> train.remote("config.yaml")

RemoteFunction.spawn

spawn(self, args: P.args, kwargs: P.kwargs) -> Task

Submit for async execution and return the Task immediately.

Parameters:

  • *args: Positional arguments for the wrapped function.

  • **kwargs: Keyword arguments for the wrapped function.

Returns:

Task: Handle that can be used to monitor execution.

Examples:

Launch N parallel tasks and stream their last logs:
    >>> tasks = [process.spawn(i) for i in range(8)]
    >>> for t in tasks:
    ...     t.wait()
    ...     print(t.logs(tail=10))

Last updated