Overview

TaskFlow is a flexible, agent-based orchestration utility designed for chaining and executing multiple AI agents in a structured, sequential, or parallel fashion.

It supports:

  • Step-based and parallel execution of agents
  • Dynamic input mapping with references to prior outputs
  • Retry policies per step or globally
  • Output tracking and structured result return
  • Optional callbacks on step completion

TaskFlow is a powerful orchestration engine designed to coordinate multiple AI agents and tools in a structured sequence or parallel execution pattern. It enables complex LLM pipelines with dynamic input resolution, retries, and step-by-step control.


✨ Features

  • Step-by-step agent orchestration
  • Input templating using {{agent_name.key}}
  • Parallel execution support
  • Retry policy for each step
  • Optional callbacks after steps
  • Full flow trace access

🧠 Example Use Case

Let’s define a flow to extract information and summarize it.

1. Define Agents

from chainless import Agent, Tool

def fetch_data(topic: str):
    return f"Data about {topic}"

fetch_tool = Tool("Fetcher", "Fetches raw info", fetch_data)

fetch_agent = Agent(
    name="FetcherAgent",
    llm=llm,
    tools=[fetch_tool],
    system_prompt="Use the tool to fetch data about a topic."
)

summary_agent = Agent(
    name="SummaryAgent",
    llm=llm,
    system_prompt="Summarize the input in 3 lines."
)

2. Compose the TaskFlow

from chainless import TaskFlow

flow = TaskFlow(name="InfoFlow")

flow.add_agent("fetch", fetch_agent)
flow.add_agent("summary", summary_agent)

flow.step("fetch", {"topic": "{{input}}"})
flow.step("summary", {"input": "{{fetch.output}}"})

3. Run It

result = flow.run("quantum computing")
print(result["output"])

🧩 Dynamic Input Mapping

You can reference any previous step’s output using:

  • {{input}}: Original input
  • {{agent_name.key}}: Output from a specific agent

Example:

flow.step("analyze", {"text": "{{fetch.result}}"})

πŸ” Retry on Failure

Set retry policy for resilience:

flow.step("summary", {"input": "{{fetch.output}}"}, retry_on_fail=2)

Or globally:

flow = TaskFlow(name="RobustFlow", retry_on_fail=3)

⚑ Parallel Execution

Run steps concurrently if independent:

flow.parallel(["fetch_news", "fetch_social"])

πŸ”” Callbacks

Hook into each step:

def log_step(name, output):
    print(f"[{name}] -> {output}")

flow = TaskFlow(name="LoggedFlow", on_step_complete=log_step)

πŸ§ͺ Output Structure

{
  "flow": {
    "fetch": { "output": "Data about quantum computing" },
    "summary": { "output": "Quantum computing is a..." }
  },
  "output": "Quantum computing is a..."
}

βœ… Best Practices

  • Keep agents focused (1 purpose per agent)
  • Reuse tools through agents
  • Use retries on weak/volatile steps
  • Combine TaskFlow with UI or APIs

πŸ›  Tool-Based Agent Example

from chainless import Tool, Agent

add_tool = Tool("Adder", "Adds two numbers", lambda a, b: a + b)

math_agent = Agent(
    name="MathAgent",
    llm=llm,
    tools=[add_tool],
    system_prompt="Use the Adder tool to calculate the sum."
)

flow = TaskFlow("MathFlow")
flow.add_agent("math", math_agent)
flow.step("math", {"a": 5, "b": 10})

πŸ“˜ Interface Summary (TS)

interface TaskFlow {
  name: string;
  add_agent(name: string, agent: AgentProtocol): void;
  step(agent_name: string, input_map: Record<string, any>, retry_on_fail?: number): void;
  parallel(agent_names: string[]): void;
  run(user_input: string): {
    flow: Record<string, any>;
    output: any;
  };
}

🎯 Use Case Ideas

Use CaseDescription
Research BotSearch β†’ summarize β†’ report
Support AssistantClassify β†’ fetch KB β†’ reply
Market AnalysisScrape β†’ analyze β†’ visualize
Workflow EngineMulti-agent automation with retries

TaskFlow brings composability and robustness to AI-based systems. Use it to turn isolated agents into scalable intelligent workflows.