Agent Orchestration and Workflows
Agent Orchestration is the process of coordinating multiple AI Agents, tools, and steps to complete complex, end-to-end workflows. An orchestrator is the central brain that decides what should happen, when, and who (which agent or tool) should do it.
As AI Agent systems grow beyond a single agent, orchestration becomes the skill that determines whether a system actually works reliably in production.
What Is an Orchestration Workflow?
A workflow is a defined sequence (or graph) of steps that a system takes to complete a task. In the context of AI Agents, workflows can be:
- Linear: Steps run one after another in a fixed order
- Conditional: Different paths are taken based on conditions
- Parallel: Multiple steps run at the same time
- Loop-based: Steps repeat until a condition is met
A Real-World Orchestration Example
Task: "Analyse a company's financial report and generate an investor summary"
START │ ▼ [Step 1] Document Loader Agent → Load the PDF financial report ↓ [Step 2] Extraction Agent → Extract key financial data: revenue, profit, expenses, growth % ↓ [Step 3 - Parallel] ───────────────────────────────────── [Trend Analyst Agent] [Risk Analyst Agent] → Identify growth trends → Identify financial risks ───────────────────────────────────────────────────── ↓ (merge results) [Step 4] Report Writer Agent → Combine all insights into a concise investor summary ↓ [Step 5] Formatter Agent → Format the summary as a clean PDF or email-ready text ↓ END — Final investor summary delivered
Building an Orchestration System
# orchestrator.py
import os
import json
from typing import Any
from dotenv import load_dotenv
import openai
load_dotenv()
client = openai.OpenAI(api_key=os.getenv("OPENAI_API_KEY"))
# ─── Step Registry ────────────────────────────────────────────────
class WorkflowStep:
def __init__(self, name: str, description: str, function):
self.name = name
self.description = description
self.function = function
def run(self, input_data: Any) -> Any:
print(f"\n⚙️ Running: {self.name}")
result = self.function(input_data)
print(f"✅ {self.name} complete")
return result
# ─── Workflow State ───────────────────────────────────────────────
class WorkflowState:
"""Holds shared state that passes between steps."""
def __init__(self, initial_input: Any):
self.original_input = initial_input
self.step_results = {}
self.current_step = 0
def add_result(self, step_name: str, result: Any):
self.step_results[step_name] = result
def get_result(self, step_name: str) -> Any:
return self.step_results.get(step_name)
def get_all_context(self) -> str:
lines = [f"Original Input: {self.original_input}\n"]
for step, result in self.step_results.items():
lines.append(f"[{step}]: {str(result)[:300]}")
return "\n".join(lines)
# ─── LLM Helper ───────────────────────────────────────────────────
def llm_call(system: str, user: str, max_tokens: int = 800) -> str:
response = client.chat.completions.create(
model="gpt-4o",
messages=[
{"role": "system", "content": system},
{"role": "user", "content": user}
],
temperature=0.3,
max_tokens=max_tokens
)
return response.choices[0].message.content
# ─── Define Individual Workflow Functions ─────────────────────────
def extract_key_data(state: WorkflowState) -> dict:
"""Extract key data points from the input document."""
result = llm_call(
system="You are a data extraction specialist. Extract key facts from documents.",
user=f"""Extract the following from this document:
- Main topic
- Key numbers or statistics (at least 3)
- Main conclusions
Document: {state.original_input}
Respond as JSON: {{"topic": "...", "statistics": [...], "conclusions": [...]}}"""
)
try:
return json.loads(result.replace("```json", "").replace("```", "").strip())
except Exception:
return {"raw": result}
def analyse_sentiment(state: WorkflowState) -> str:
"""Analyse the overall tone/sentiment of the content."""
extracted = state.get_result("extract_key_data")
return llm_call(
system="You are a sentiment analysis expert.",
user=f"""Analyse the sentiment of this data:
{json.dumps(extracted, indent=2)}
Is the overall tone positive, negative, or neutral?
Explain in 2-3 sentences."""
)
def identify_risks(state: WorkflowState) -> list:
"""Identify potential risks or concerns mentioned."""
result = llm_call(
system="You are a risk assessment analyst.",
user=f"""Based on this data, list the top 3 potential risks or concerns:
{state.get_all_context()}
Return as JSON array: ["Risk 1: ...", "Risk 2: ...", "Risk 3: ..."]"""
)
try:
return json.loads(result.replace("```json", "").replace("```", "").strip())
except Exception:
return [result]
def write_summary(state: WorkflowState) -> str:
"""Write a final executive summary based on all previous steps."""
return llm_call(
system="You are an expert business writer who creates clear executive summaries.",
user=f"""Create a concise executive summary (300 words max) based on this analysis:
{state.get_all_context()}
The summary should include: overview, key findings, sentiment, and top risks.""",
max_tokens=600
)
# ─── Orchestrator Class ───────────────────────────────────────────
class WorkflowOrchestrator:
def __init__(self):
self.steps = [
WorkflowStep("extract_key_data", "Extract key facts", extract_key_data),
WorkflowStep("analyse_sentiment", "Analyse sentiment", analyse_sentiment),
WorkflowStep("identify_risks", "Identify risks", identify_risks),
WorkflowStep("write_summary", "Write final summary", write_summary),
]
def run(self, input_text: str) -> str:
print(f"\n{'='*55}")
print("🚀 STARTING WORKFLOW")
print(f"{'='*55}")
state = WorkflowState(input_text)
for step in self.steps:
result = step.run(state)
state.add_result(step.name, result)
final_summary = state.get_result("write_summary")
print(f"\n{'='*55}")
print("📄 FINAL SUMMARY:")
print(f"{'='*55}")
print(final_summary)
return final_summary
# Test
if __name__ == "__main__":
sample_document = """
Q3 2024 Financial Report — TechCorp India
Revenue grew by 22% year-over-year to ₹5,200 crore.
Net profit increased to ₹820 crore, a 15% improvement.
Operating expenses rose by 18% due to hiring and R&D investments.
The company onboarded 45 new enterprise clients this quarter.
Cloud segment revenue reached ₹1,800 crore — up 38% from last year.
Challenges: global supply chain disruptions affected hardware delivery.
Currency headwinds reduced dollar-denominated revenue by 3%.
Competition from global tech firms intensified in the SaaS segment.
"""
orchestrator = WorkflowOrchestrator()
orchestrator.run(sample_document)
Conditional Branching in Workflows
Workflows can take different paths based on conditions — just like if-else logic in programming:
def route_by_urgency(state: WorkflowState) -> str:
"""Decide which path the workflow should take."""
topic = state.get_result("extract_key_data").get("topic", "").lower()
if "crisis" in topic or "emergency" in topic:
return "urgent_path"
elif "report" in topic or "analysis" in topic:
return "analysis_path"
else:
return "general_path"
class ConditionalOrchestrator:
def run(self, input_text: str):
state = WorkflowState(input_text)
# Step 1: Always extract data first
result = extract_key_data(state)
state.add_result("extract_key_data", result)
# Step 2: Route based on content
path = route_by_urgency(state)
print(f"Routing to: {path}")
if path == "urgent_path":
# Handle urgent content differently
print("⚠️ Urgent content detected — escalating...")
elif path == "analysis_path":
# Full analysis for reports
state.add_result("analyse_sentiment", analyse_sentiment(state))
state.add_result("identify_risks", identify_risks(state))
# Always write summary
state.add_result("write_summary", write_summary(state))
return state.get_result("write_summary")
Workflow Patterns Summary
| Pattern | Description | When to Use |
|---|---|---|
| Sequential | Steps run one after another | Most tasks with clear order |
| Parallel | Multiple steps run at the same time | Independent tasks needing speed |
| Conditional | Path chosen based on a condition | Different content types or priorities |
| Loop | Steps repeat until condition is met | Refinement tasks, iterative generation |
| Map-Reduce | Process many items, merge results | Batch processing large datasets |
Summary
Agent Orchestration is what transforms isolated agents into powerful, production-grade systems. By defining workflows — sequential, parallel, conditional, or looped — complex tasks can be broken down and handled by the right agent or function at the right time. The WorkflowOrchestrator pattern shown here is a foundation that scales from a simple 3-step pipeline to an enterprise-grade multi-agent system with dozens of specialised components.
