LangChain Reference home pageLangChain ReferenceLangChain Reference
  • GitHub
  • Main Docs
Deep Agents
LangChain
LangGraph
Integrations
LangSmith
  • Overview
    • Overview
    • Graphs
    • Functional API
    • Pregel
    • Checkpointing
    • Storage
    • Caching
    • Types
    • Runtime
    • Config
    • Errors
    • Constants
    • Channels
    • Agents
    LangGraph Checkpoint
    Checkpoint Postgres
    Store Postgres
    Checkpoint SQLite
    LangGraph Prebuilt
    LangGraph CLI
    LangGraph SDK
    LangGraph Supervisor
    LangGraph Swarm
    ⌘I

    LangChain Assistant

    Ask a question to get started

    Enter to send•Shift+Enter new line

    Menu

    OverviewGraphsFunctional APIPregelCheckpointingStorageCachingTypesRuntimeConfigErrorsConstantsChannelsAgents
    LangGraph Checkpoint
    Checkpoint Postgres
    Store Postgres
    Checkpoint SQLite
    LangGraph Prebuilt
    LangGraph CLI
    LangGraph SDK
    LangGraph Supervisor
    LangGraph Swarm
    Language
    Theme
    Pythonlanggraphfunctask
    Function●Since v0.2

    task

    Copy
    task(
      __func_or_none__: Callable[P, Awaitable[T]] | Callable[P, T]

    Used in Docs

    • Choosing between Graph and Functional APIs
    • Deploy other frameworks
    • Fault tolerance
    • Functional API overview
    • Quickstart
    View source on GitHub
    |
    None
    =
    None
    ,
    *
    ,
    name
    :
    str
    |
    None
    =
    None
    ,
    retry_policy
    :
    RetryPolicy
    |
    Sequence
    [
    RetryPolicy
    ]
    |
    None
    =
    None
    ,
    cache_policy
    :
    CachePolicy
    [
    Callable
    [
    P
    ,
    str
    |
    bytes
    ]
    ]
    |
    None
    =
    None
    ,
    timeout
    :
    float
    |
    timedelta
    |
    TimeoutPolicy
    |
    None
    =
    None
    ,
    **
    kwargs
    :
    Unpack
    [
    DeprecatedKwargs
    ]
    =
    {
    }
    )
    ->
    Callable
    [
    [
    Callable
    [
    P
    ,
    Awaitable
    [
    T
    ]
    ]
    |
    Callable
    [
    P
    ,
    T
    ]
    ]
    ,
    _TaskFunction
    [
    P
    ,
    T
    ]
    ]
    |
    _TaskFunction
    [
    P
    ,
    T
    ]

    Parameters

    NameTypeDescription
    namestr | None
    Default:None

    An optional name for the task. If not provided, the function name will be used.

    retry_policyRetryPolicy | Sequence[RetryPolicy] | None
    Default:None

    An optional retry policy (or list of policies) to use for the task in case of a failure.

    cache_policyCachePolicy[Callable[P, str | bytes]] | None
    Default:None
    timeoutfloat | timedelta | TimeoutPolicy | None
    Default:None

    Define a LangGraph task using the task decorator.

    Requires python 3.11 or higher for async functions

    The task decorator supports both sync and async functions. To use async functions, ensure that you are using Python 3.11 or higher.

    Tasks can only be called from within an entrypoint or from within a StateGraph. A task can be called like a regular function with the following differences:

    • When a checkpointer is enabled, the function inputs and outputs must be serializable.
    • The decorated function can only be called from within an entrypoint or StateGraph.
    • Calling the function produces a future. This makes it easy to parallelize tasks.

    Sync Task:

    from langgraph.func import entrypoint, task
    
    @task
    def add_one_task(a: int) -> int:
        return a + 1
    
    @entrypoint()
    def add_one(numbers: list[int]) -> list[int]:
        futures = [add_one_task(n) for n in numbers]
        results = [f.result() for f in futures]
        return results
    
    # Call the entrypoint
    add_one.invoke([1, 2, 3])  # Returns [2, 3, 4]

    Async Task:

    import asyncio
    from langgraph.func import entrypoint, task
    
    @task
    async def add_one_task(a: int) -> int:
        return a + 1
    
    @entrypoint()
    async def add_one(numbers: list[int]) -> list[int]:
        futures = [add_one_task(n) for n in numbers]
        return asyncio.gather(*futures)
    
    # Call the entrypoint
    await add_one.ainvoke([1, 2, 3])  # Returns [2, 3, 4]

    An optional cache policy to use for the task. This allows caching of the task results.

    Timeout for each task attempt. A number or timedelta is a hard wall-clock cap and is not refreshed. Use TimeoutPolicy to configure both a wall-clock run_timeout and an idle_timeout refreshed by progress signals. For long-running work that doesn't naturally emit progress, call runtime.heartbeat() from inside the task. When the timeout fires, NodeTimeoutError is raised and the retry policy (if any) decides whether to retry. Supported only for async tasks; sync tasks cannot be safely cancelled in-process.

    Advertisement
    Advertisement