TaskFlow
Orchestrate multi step AI pipelines using composable agents and structured execution logic.
TaskFlow
TaskFlow is the orchestration engine of Chainless. It connects agents into deterministic multi step workflows and gives you full control over execution order, input routing, parallel execution, retry logic, and lifecycle events.
TaskFlow is suitable for:
- Multi agent pipelines
- Customer support flows
- Data extraction or transformation steps
- Tool augmented reasoning pipelines
- Agent microservices exposed over HTTP
Core Idea
A TaskFlow defines:
- A set of registered agents
- A sequence of steps that use those agents
- Optional parallel groups
- Input mapping rules that pass data between steps
- Optional conditions, timeouts, and retry logic
- Optional callbacks for monitoring
At runtime, TaskFlow executes each step, resolves placeholders like {{input}} or {{StepName.output.someField}}, and produces a final structured output.
Basic Usage
from chainless import TaskFlow, Agent
agent = Agent(
name="EchoAgent",
system_prompt="Echo the input"
)
flow = TaskFlow("EchoFlow")
flow.add_agent("echo", agent)
flow.step("echo", input_map={"input": "{{input}}"})
result = flow.run("Hello")
print(result.output)Creating a TaskFlow
flow = TaskFlow(
name="SupportFlow",
verbose=True,
retry_on_fail=1
)| Parameter | Description |
|---|---|
name | Unique flow name |
verbose | Enables logging during execution |
retry_on_fail | Global retry count for failed steps |
on_step_start | Global callback before each step |
on_step_complete | Global callback after each step |
on_step_error | Global callback when a step fails |
Registering Agents
Use add_agent to add an agent with a unique name.
flow.add_agent("Classifier", classifier_agent)
flow.add_agent("Solution", solution_agent)
flow.add_agent("Report", report_agent)An agent must implement either a start() method or a run() method.
Adding Steps
Steps define the execution flow. Each step references an agent and provides an input mapping.
flow.step(
"Classifier",
input_map={"input": "{{input}}"}
)Input Mapping
TaskFlow automatically resolves placeholders:
| Placeholder | Meaning |
|---|---|
{{input}} | The initial input passed to the flow |
{{StepName.output}} | Full output of a previous step |
{{StepName.output.someField}} | A specific field in the structured output |
{{aliasName}} | Custom alias defined using flow.alias() |
Example:
flow.step(
"Solution",
step_name="SolutionStep",
input_map={
"category": "{{Classifier.output.category}}",
"details": "{{input}}"
}
)Using Prompt Templates
You can provide custom prompt templates that inject values dynamically.
flow.step(
"Solution",
step_name="SolutionStep",
input_map={"category": "{{Classifier.output.category}}"},
prompt_template="""
Category: {{category}}
Please generate an appropriate solution.
"""
)The template is passed directly to the agent.
Aliases
Aliases help reuse specific output fields without repeating long expressions.
flow.alias("category", from_step="Classifier", key="category")
flow.step(
"Solution",
input_map={"category": "{{category}}"}
)Conditional Execution
You can skip a step based on a condition function.
def skip_if_not_technical(steps):
return steps["Classifier"].output["category"] == "technical"
flow.step(
"TechnicalAgent",
input_map={"input": "{{input}}"},
condition=skip_if_not_technical
)If the condition returns False, the step is skipped.
Dependencies Between Steps
You can force a step to run only after certain steps have completed.
flow.step(
"Translator",
input_map={"text": "{{Summary.output.text}}"},
depends_on=["Summary"]
)This is useful for controlling order when combined with parallel groups.
Parallel Execution
Steps can run concurrently using parallel.
flow.parallel([
"SummarizerStep",
"SentimentStep"
])Each step must already be defined using flow.step().
When reached during execution, all steps in the group run at the same time. Errors inside a parallel group are collected and raised together.
Timeouts and Retry Logic
Each step can override the global retry or timeout.
flow.step(
"SlowAgent",
input_map={"input": "{{input}}"},
timeout=10,
retry_on_fail=2
)Timeout is in seconds.
Retry logic restarts the step if the agent raises an exception.
Step Callbacks
Step specific callbacks can be attached.
def before(name, input):
print(f"Starting step {name}")
def after(name, output):
print(f"Completed step {name}")
def on_err(name, err):
print(f"Error in {name}: {err}")
flow.step(
"SomeAgent",
input_map={"input": "{{input}}"},
on_start=before,
on_complete=after,
on_error=on_err
)Callbacks can inspect or modify runtime behavior.
Running a Flow
Synchronous
result = flow.run("Hello world")
print(result.output)Asynchronous
result = await flow.run_async("Hello world")Result structure:
{
"flow": {
"steps": {
"StepName": { "output": ... },
...
},
"usage_summary": {
"total_requests": ...,
"total_tokens": ...,
}
},
"output": <final value>
}Serving TaskFlows as API Endpoints
You can expose any TaskFlow as an HTTP endpoint using FlowServer.
from chainless.exp.server import FlowServer
endpoint = flow.serve(path="/support", name="Support API")
server = FlowServer([endpoint], port=8080)
server.run()Each request runs the flow and returns the final structured output.
Full Example
support_flow = TaskFlow("SupportFlow", verbose=True)
support_flow.add_agent("Classifier", classifier_agent)
support_flow.add_agent("Solution", solution_agent)
support_flow.add_agent("Report", report_agent)
support_flow.step(
"Classifier",
input_map={"input": "{{input}}"}
)
support_flow.step(
"Solution",
step_name="SolutionStep",
input_map={
"category": "{{Classifier.output.category}}",
"details": "{{input}}"
},
prompt_template="""
Category: {{category}}
Details: {{details}}
Generate a solution.
"""
)
support_flow.step(
"Report",
input_map={
"category": "{{Classifier.output.category}}",
"solution": "{{SolutionStep.output.solution}}"
}
)
result = support_flow.run("Email is not working")
print(result.output)Summary
TaskFlow provides a structured and predictable way to orchestrate multi agent logic without hidden state or opaque control flow. It offers:
- Step based pipelines
- Dynamic input mapping
- Optional prompt templating
- Parallel execution
- Conditional steps
- Retry and timeout logic
- Lifecycle callbacks
- API server integration
You control the flow. Chainless executes it cleanly and reliably.