FlowServer

Build and run LLM agents with tools, hooks, and structured outputs.

FlowServer

FlowServer is a lightweight HTTP server used to expose TaskFlow pipelines as REST API endpoints with zero boilerplate.
It automatically handles:

  • API key authentication
  • Request validation
  • Flow execution (sync or async)
  • Timeout management
  • Structured logging
  • Success & error responses
  • Health checks

FlowServer integrates directly with TaskFlow.serve(), making it extremely simple to publish an AI workflow as an API endpoint.


Installation

FlowServer is included in Chainless:

from chainless.exp.server import FlowServer

Exposing a TaskFlow

The typical workflow is:

  1. Build your TaskFlow
  2. Add agents, steps, tools, etc.
  3. Call .serve(path, name) → returns a FlowEndpoint
  4. Pass that endpoint into FlowServer

Basic Usage

support_endpoint = support_flow.serve(
    path="/support",
    name="Support Flow"
)

server = FlowServer(
    endpoints=[support_endpoint],
    port=8080,
    api_key="example_key"
)

server.run()

This exposes your entire AI workflow as:

POST /support
Authorization: Bearer example_key

Request Format

FlowServer accepts a standard FlowRequest:

{
  "input": "Your input"
}

Response Format

Successful responses follow SuccessResponse:

{
  "success": true,
  "flow": "Support Flow",
  "output": "final TaskFlow output",
  "trace_id": "trace_id",
  "duration": 1.482
}

Errors follow ErrorResponse:

{
  "success": false,
  "code": "INVALID_INPUT",
  "message": "Missing 'input' in request body.",
  "details": null,
  "trace_id": "uuid"
}

Full Real Example (from your project)

Below is the real, canonical FlowServer example from your implementation.

It demonstrates:

  • Tools
  • Agents with response models
  • A multi-step TaskFlow
  • .serve() producing a FlowEndpoint
  • FlowServer running the endpoint

Tools

from chainless import Tool
from enum import Enum
import asyncio, random

class SystemName(str, Enum):
    MAIL_SERVER = "mail_server"
    PAYMENT_GATEWAY = "payment_gateway"
    DATABASE = "database"

def check_system_status(system_name: SystemName):
    statuses = {
        SystemName.MAIL_SERVER: "Mail server OK.",
        SystemName.PAYMENT_GATEWAY: "Payment gateway stable.",
        SystemName.DATABASE: "Database performing well.",
    }
    return statuses.get(system_name, "Unknown system.")

async def get_user_account_info(user_id: str):
    plans = ["Free", "Premium", "Enterprise"]
    status = random.choice(["active", "suspended", "frozen"])
    await asyncio.sleep(0.1)
    return f"User {user_id} is {status} on {random.choice(plans)} plan."

status_tool = Tool("SystemStatusTool", "Checks system health.", check_system_status)
account_tool = Tool("UserAccountTool", "Fetches user account info.", get_user_account_info)

Agents

from chainless import Agent
from pydantic import BaseModel

class ClassifierOutput(BaseModel):
    category: str
    reason: str

class SolutionOutput(BaseModel):
    solution: str

classifier_agent = Agent(
    name="IssueClassifier",
    system_prompt="Classify user issue.",
    response_format=ClassifierOutput,
)

solution_agent = Agent(
    name="SolutionGenerator",
    system_prompt="Generate solution.",
    tools=[status_tool, account_tool],
    response_format=SolutionOutput,
)

report_agent = Agent(
    name="SupportReportAgent",
    system_prompt="Generate final support report."
)

TaskFlow

from chainless import TaskFlow

support_flow = TaskFlow("SupportFlow", verbose=True)
support_flow.add_agent("Classifier", classifier_agent)
support_flow.add_agent("Solution", solution_agent)
support_flow.add_agent("Report", report_agent)

support_flow.step("Classifier", input_map={"input": "{{input}}"})

support_flow.step(
    "Solution",
    step_name="SolutionStep1",
    input_map={"category": "{{Classifier.output.category}}", "details": "{{input}}"},
    prompt_template="""
{{category}} support issue.
Details: {{details}}

Use tools if needed:
1. SystemStatusTool
2. UserAccountTool
"""
)

support_flow.step(
    "Report",
    input_map={
        "category": "{{Classifier.output.category}}",
        "solution": "{{SolutionStep1.output.solution}}",
    },
    prompt_template="""
Support Report:
Category: {{category}}
Solution: {{solution}}
"""
)

Serving the Flow via FlowServer

from chainless.exp.server import FlowServer

support_endpoint = support_flow.serve(path="/support", name="Support Flow")

server = FlowServer(
    endpoints=[support_endpoint],
    port=8080,
    api_key="example_key"
)

if __name__ == "__main__":
    server.run()

This will start:

POST http://localhost:8080/support
Authorization: Bearer example_key

Example Request

curl -X POST http://localhost:8080/support \
  -H "Authorization: Bearer example_key" \
  -H "Content-Type: application/json" \
  -d '{
        "input": "USER_ID: 12345 MESSAGE: Hello My Name Onur, Why My Account freezed"
      }'

Example Final Flow Output (inside success response)

{
  "category": "payment",
  "solution": "Payment gateway is operational. Please try again.",
  "report": "Final generated support report..."
}

Health Check

GET /healthz

returns:

{
  "status": "ok",
  "uptime_seconds": 21.23,
  "version": "0.1.x",
  "endpoints": 1
}

Summary

FlowServer allows you to:

  • Build complex TaskFlows
  • Attach agents, prompts, templates, tools
  • Expose the entire pipeline as an HTTP endpoint
  • Receive structured request → run flow → return final output
  • Handle errors uniformly
  • Monitor execution with trace IDs

It is the easiest way to publish AI workflows as production-ready APIs.