Agent Orchestration and Workflows

Agent Orchestration is the process of coordinating multiple AI Agents, tools, and steps to complete complex, end-to-end workflows. An orchestrator is the central brain that decides what should happen, when, and who (which agent or tool) should do it.

As AI Agent systems grow beyond a single agent, orchestration becomes the skill that determines whether a system actually works reliably in production.

What Is an Orchestration Workflow?

A workflow is a defined sequence (or graph) of steps that a system takes to complete a task. In the context of AI Agents, workflows can be:

  • Linear: Steps run one after another in a fixed order
  • Conditional: Different paths are taken based on conditions
  • Parallel: Multiple steps run at the same time
  • Loop-based: Steps repeat until a condition is met

A Real-World Orchestration Example

Task: "Analyse a company's financial report and generate an investor summary"

START
  │
  ▼
[Step 1] Document Loader Agent
  → Load the PDF financial report
  ↓
[Step 2] Extraction Agent
  → Extract key financial data: revenue, profit, expenses, growth %
  ↓
[Step 3 - Parallel] ─────────────────────────────────────
  [Trend Analyst Agent]    [Risk Analyst Agent]
  → Identify growth trends  → Identify financial risks
  ─────────────────────────────────────────────────────
  ↓ (merge results)
[Step 4] Report Writer Agent
  → Combine all insights into a concise investor summary
  ↓
[Step 5] Formatter Agent
  → Format the summary as a clean PDF or email-ready text
  ↓
END — Final investor summary delivered

Building an Orchestration System

# orchestrator.py

import os
import json
from typing import Any
from dotenv import load_dotenv
import openai

load_dotenv()
client = openai.OpenAI(api_key=os.getenv("OPENAI_API_KEY"))


# ─── Step Registry ────────────────────────────────────────────────
class WorkflowStep:
    def __init__(self, name: str, description: str, function):
        self.name        = name
        self.description = description
        self.function    = function

    def run(self, input_data: Any) -> Any:
        print(f"\n⚙️  Running: {self.name}")
        result = self.function(input_data)
        print(f"✅  {self.name} complete")
        return result


# ─── Workflow State ───────────────────────────────────────────────
class WorkflowState:
    """Holds shared state that passes between steps."""
    def __init__(self, initial_input: Any):
        self.original_input = initial_input
        self.step_results   = {}
        self.current_step   = 0

    def add_result(self, step_name: str, result: Any):
        self.step_results[step_name] = result

    def get_result(self, step_name: str) -> Any:
        return self.step_results.get(step_name)

    def get_all_context(self) -> str:
        lines = [f"Original Input: {self.original_input}\n"]
        for step, result in self.step_results.items():
            lines.append(f"[{step}]: {str(result)[:300]}")
        return "\n".join(lines)


# ─── LLM Helper ───────────────────────────────────────────────────
def llm_call(system: str, user: str, max_tokens: int = 800) -> str:
    response = client.chat.completions.create(
        model="gpt-4o",
        messages=[
            {"role": "system", "content": system},
            {"role": "user",   "content": user}
        ],
        temperature=0.3,
        max_tokens=max_tokens
    )
    return response.choices[0].message.content


# ─── Define Individual Workflow Functions ─────────────────────────
def extract_key_data(state: WorkflowState) -> dict:
    """Extract key data points from the input document."""
    result = llm_call(
        system="You are a data extraction specialist. Extract key facts from documents.",
        user=f"""Extract the following from this document:
- Main topic
- Key numbers or statistics (at least 3)
- Main conclusions

Document: {state.original_input}

Respond as JSON: {{"topic": "...", "statistics": [...], "conclusions": [...]}}"""
    )
    try:
        return json.loads(result.replace("```json", "").replace("```", "").strip())
    except Exception:
        return {"raw": result}


def analyse_sentiment(state: WorkflowState) -> str:
    """Analyse the overall tone/sentiment of the content."""
    extracted = state.get_result("extract_key_data")
    return llm_call(
        system="You are a sentiment analysis expert.",
        user=f"""Analyse the sentiment of this data:
{json.dumps(extracted, indent=2)}

Is the overall tone positive, negative, or neutral? 
Explain in 2-3 sentences."""
    )


def identify_risks(state: WorkflowState) -> list:
    """Identify potential risks or concerns mentioned."""
    result = llm_call(
        system="You are a risk assessment analyst.",
        user=f"""Based on this data, list the top 3 potential risks or concerns:
{state.get_all_context()}

Return as JSON array: ["Risk 1: ...", "Risk 2: ...", "Risk 3: ..."]"""
    )
    try:
        return json.loads(result.replace("```json", "").replace("```", "").strip())
    except Exception:
        return [result]


def write_summary(state: WorkflowState) -> str:
    """Write a final executive summary based on all previous steps."""
    return llm_call(
        system="You are an expert business writer who creates clear executive summaries.",
        user=f"""Create a concise executive summary (300 words max) based on this analysis:

{state.get_all_context()}

The summary should include: overview, key findings, sentiment, and top risks.""",
        max_tokens=600
    )


# ─── Orchestrator Class ───────────────────────────────────────────
class WorkflowOrchestrator:
    def __init__(self):
        self.steps = [
            WorkflowStep("extract_key_data",  "Extract key facts",    extract_key_data),
            WorkflowStep("analyse_sentiment", "Analyse sentiment",    analyse_sentiment),
            WorkflowStep("identify_risks",    "Identify risks",       identify_risks),
            WorkflowStep("write_summary",     "Write final summary",  write_summary),
        ]

    def run(self, input_text: str) -> str:
        print(f"\n{'='*55}")
        print("🚀 STARTING WORKFLOW")
        print(f"{'='*55}")

        state = WorkflowState(input_text)

        for step in self.steps:
            result = step.run(state)
            state.add_result(step.name, result)

        final_summary = state.get_result("write_summary")

        print(f"\n{'='*55}")
        print("📄 FINAL SUMMARY:")
        print(f"{'='*55}")
        print(final_summary)

        return final_summary


# Test
if __name__ == "__main__":
    sample_document = """
    Q3 2024 Financial Report — TechCorp India
    
    Revenue grew by 22% year-over-year to ₹5,200 crore.
    Net profit increased to ₹820 crore, a 15% improvement.
    Operating expenses rose by 18% due to hiring and R&D investments.
    
    The company onboarded 45 new enterprise clients this quarter.
    Cloud segment revenue reached ₹1,800 crore — up 38% from last year.
    
    Challenges: global supply chain disruptions affected hardware delivery.
    Currency headwinds reduced dollar-denominated revenue by 3%.
    Competition from global tech firms intensified in the SaaS segment.
    """

    orchestrator = WorkflowOrchestrator()
    orchestrator.run(sample_document)

Conditional Branching in Workflows

Workflows can take different paths based on conditions — just like if-else logic in programming:

def route_by_urgency(state: WorkflowState) -> str:
    """Decide which path the workflow should take."""
    topic = state.get_result("extract_key_data").get("topic", "").lower()

    if "crisis" in topic or "emergency" in topic:
        return "urgent_path"
    elif "report" in topic or "analysis" in topic:
        return "analysis_path"
    else:
        return "general_path"


class ConditionalOrchestrator:
    def run(self, input_text: str):
        state = WorkflowState(input_text)

        # Step 1: Always extract data first
        result = extract_key_data(state)
        state.add_result("extract_key_data", result)

        # Step 2: Route based on content
        path = route_by_urgency(state)
        print(f"Routing to: {path}")

        if path == "urgent_path":
            # Handle urgent content differently
            print("⚠️ Urgent content detected — escalating...")
        elif path == "analysis_path":
            # Full analysis for reports
            state.add_result("analyse_sentiment", analyse_sentiment(state))
            state.add_result("identify_risks",    identify_risks(state))

        # Always write summary
        state.add_result("write_summary", write_summary(state))
        return state.get_result("write_summary")

Workflow Patterns Summary

PatternDescriptionWhen to Use
SequentialSteps run one after anotherMost tasks with clear order
ParallelMultiple steps run at the same timeIndependent tasks needing speed
ConditionalPath chosen based on a conditionDifferent content types or priorities
LoopSteps repeat until condition is metRefinement tasks, iterative generation
Map-ReduceProcess many items, merge resultsBatch processing large datasets

Summary

Agent Orchestration is what transforms isolated agents into powerful, production-grade systems. By defining workflows — sequential, parallel, conditional, or looped — complex tasks can be broken down and handled by the right agent or function at the right time. The WorkflowOrchestrator pattern shown here is a foundation that scales from a simple 3-step pipeline to an enterprise-grade multi-agent system with dozens of specialised components.

Leave a Comment

Your email address will not be published. Required fields are marked *