TaskFlow

Orchestrate multi step AI pipelines using composable agents and structured execution logic.

TaskFlow

TaskFlow is the orchestration engine of Chainless. It connects agents into deterministic multi step workflows and gives you full control over execution order, input routing, parallel execution, retry logic, and lifecycle events.

TaskFlow is suitable for:

  • Multi agent pipelines
  • Customer support flows
  • Data extraction or transformation steps
  • Tool augmented reasoning pipelines
  • Agent microservices exposed over HTTP

Core Idea

A TaskFlow defines:

  1. A set of registered agents
  2. A sequence of steps that use those agents
  3. Optional parallel groups
  4. Input mapping rules that pass data between steps
  5. Optional conditions, timeouts, and retry logic
  6. Optional callbacks for monitoring

At runtime, TaskFlow executes each step, resolves placeholders like {{input}} or {{StepName.output.someField}}, and produces a final structured output.


Basic Usage

from chainless import TaskFlow, Agent

agent = Agent(
    name="EchoAgent",
    system_prompt="Echo the input"
)

flow = TaskFlow("EchoFlow")
flow.add_agent("echo", agent)

flow.step("echo", input_map={"input": "{{input}}"})

result = flow.run("Hello")
print(result.output)

Creating a TaskFlow

flow = TaskFlow(
    name="SupportFlow",
    verbose=True,
    retry_on_fail=1
)
ParameterDescription
nameUnique flow name
verboseEnables logging during execution
retry_on_failGlobal retry count for failed steps
on_step_startGlobal callback before each step
on_step_completeGlobal callback after each step
on_step_errorGlobal callback when a step fails

Registering Agents

Use add_agent to add an agent with a unique name.

flow.add_agent("Classifier", classifier_agent)
flow.add_agent("Solution", solution_agent)
flow.add_agent("Report", report_agent)

An agent must implement either a start() method or a run() method.


Adding Steps

Steps define the execution flow. Each step references an agent and provides an input mapping.

flow.step(
    "Classifier",
    input_map={"input": "{{input}}"}
)

Input Mapping

TaskFlow automatically resolves placeholders:

PlaceholderMeaning
{{input}}The initial input passed to the flow
{{StepName.output}}Full output of a previous step
{{StepName.output.someField}}A specific field in the structured output
{{aliasName}}Custom alias defined using flow.alias()

Example:

flow.step(
    "Solution",
    step_name="SolutionStep",
    input_map={
        "category": "{{Classifier.output.category}}",
        "details": "{{input}}"
    }
)

Using Prompt Templates

You can provide custom prompt templates that inject values dynamically.

flow.step(
    "Solution",
    step_name="SolutionStep",
    input_map={"category": "{{Classifier.output.category}}"},
    prompt_template="""
Category: {{category}}
Please generate an appropriate solution.
"""
)

The template is passed directly to the agent.


Aliases

Aliases help reuse specific output fields without repeating long expressions.

flow.alias("category", from_step="Classifier", key="category")

flow.step(
    "Solution",
    input_map={"category": "{{category}}"}
)

Conditional Execution

You can skip a step based on a condition function.

def skip_if_not_technical(steps):
    return steps["Classifier"].output["category"] == "technical"

flow.step(
    "TechnicalAgent",
    input_map={"input": "{{input}}"},
    condition=skip_if_not_technical
)

If the condition returns False, the step is skipped.


Dependencies Between Steps

You can force a step to run only after certain steps have completed.

flow.step(
    "Translator",
    input_map={"text": "{{Summary.output.text}}"},
    depends_on=["Summary"]
)

This is useful for controlling order when combined with parallel groups.


Parallel Execution

Steps can run concurrently using parallel.

flow.parallel([
    "SummarizerStep",
    "SentimentStep"
])

Each step must already be defined using flow.step().

When reached during execution, all steps in the group run at the same time. Errors inside a parallel group are collected and raised together.


Timeouts and Retry Logic

Each step can override the global retry or timeout.

flow.step(
    "SlowAgent",
    input_map={"input": "{{input}}"},
    timeout=10,
    retry_on_fail=2
)

Timeout is in seconds.

Retry logic restarts the step if the agent raises an exception.


Step Callbacks

Step specific callbacks can be attached.

def before(name, input):
    print(f"Starting step {name}")

def after(name, output):
    print(f"Completed step {name}")

def on_err(name, err):
    print(f"Error in {name}: {err}")

flow.step(
    "SomeAgent",
    input_map={"input": "{{input}}"},
    on_start=before,
    on_complete=after,
    on_error=on_err
)

Callbacks can inspect or modify runtime behavior.


Running a Flow

Synchronous

result = flow.run("Hello world")
print(result.output)

Asynchronous

result = await flow.run_async("Hello world")

Result structure:

{
    "flow": {
        "steps": {
          "StepName": { "output": ... },
           ...
     },
        "usage_summary": {
          "total_requests": ...,
          "total_tokens": ...,
     }
    },
    "output": <final value>
}

Serving TaskFlows as API Endpoints

You can expose any TaskFlow as an HTTP endpoint using FlowServer.

from chainless.exp.server import FlowServer

endpoint = flow.serve(path="/support", name="Support API")
server = FlowServer([endpoint], port=8080)
server.run()

Each request runs the flow and returns the final structured output.


Full Example

support_flow = TaskFlow("SupportFlow", verbose=True)

support_flow.add_agent("Classifier", classifier_agent)
support_flow.add_agent("Solution", solution_agent)
support_flow.add_agent("Report", report_agent)

support_flow.step(
    "Classifier",
    input_map={"input": "{{input}}"}
)

support_flow.step(
    "Solution",
    step_name="SolutionStep",
    input_map={
        "category": "{{Classifier.output.category}}",
        "details": "{{input}}"
    },
    prompt_template="""
Category: {{category}}
Details: {{details}}
Generate a solution.
"""
)

support_flow.step(
    "Report",
    input_map={
        "category": "{{Classifier.output.category}}",
        "solution": "{{SolutionStep.output.solution}}"
    }
)

result = support_flow.run("Email is not working")
print(result.output)

Summary

TaskFlow provides a structured and predictable way to orchestrate multi agent logic without hidden state or opaque control flow. It offers:

  • Step based pipelines
  • Dynamic input mapping
  • Optional prompt templating
  • Parallel execution
  • Conditional steps
  • Retry and timeout logic
  • Lifecycle callbacks
  • API server integration

You control the flow. Chainless executes it cleanly and reliably.