Implementation:CrewAIInc CrewAI Crew Kickoff In Flow
Overview
Crew Kickoff in Flow documents the pattern for calling Crew.kickoff() within Flow method bodies to bridge the crew execution model and the flow execution model. This is a Wrapper Doc -- it describes how the existing Crew.kickoff() API is used in the context of flow methods rather than documenting a single class or function.
Source Reference
| Component | File | Lines |
|---|---|---|
Crew.kickoff() method |
src/crewai/crew.py |
L696-773 |
Flow base class (hosts crew-calling methods) |
src/crewai/flow/flow.py |
L690-778 |
CrewOutput result type |
src/crewai/crew.py |
(return type of kickoff) |
Signature
# Crew.kickoff() signature
def kickoff(
self,
inputs: dict[str, Any] | None = None,
input_files: dict[str, FileInput] | None = None,
) -> CrewOutput | CrewStreamingOutput:
"""Execute the crew's workflow.
Args:
inputs: Optional input dictionary for task interpolation.
input_files: Optional dict of named file inputs for the crew.
Returns:
CrewOutput or CrewStreamingOutput if streaming is enabled.
"""
Import
from crewai import Agent, Task, Crew, Process
from crewai.flow.flow import Flow, FlowState, start, listen
I/O Contract
| Stage | Type | Description |
|---|---|---|
| Flow state to crew inputs | dict[str, Any] |
Extract fields from self.state into a dict passed to crew.kickoff(inputs=...)
|
| Crew execution | CrewOutput |
Crew processes tasks using agents, returns CrewOutput
|
| Crew output to flow state | str / Pydantic model / dict |
Extract .raw, .pydantic, or .json_dict from CrewOutput and assign to self.state
|
Detailed Behavior
Data Flow: State to Crew to State
The integration follows a three-phase data flow:
Phase 1: State to Crew Inputs
Flow methods read from self.state and construct the inputs dictionary. Task descriptions in the crew use {variable_name} placeholders that are interpolated with values from this inputs dict.
@listen(gather_requirements)
def research_topic(self):
inputs = {
"topic": self.state.topic,
"constraints": self.state.constraints,
}
# Task descriptions reference {topic} and {constraints}
result = research_crew.kickoff(inputs=inputs)
Phase 2: Crew Execution
Crew.kickoff() executes the crew's tasks either sequentially (Process.sequential) or hierarchically (Process.hierarchical). The crew is self-contained and does not access the flow's state directly. It returns a CrewOutput object.
Phase 3: Crew Output to State
The flow method extracts results from CrewOutput and stores them back into self.state:
result.raw-- The raw string output from the final taskresult.pydantic-- A Pydantic model ifoutput_pydanticwas set on the final taskresult.json_dict-- A dictionary ifoutput_jsonwas set on the final task
Error Handling
If a crew raises an exception during execution, it propagates up through the flow method. The flow's event system emits a MethodExecutionFailedEvent. Developers can use try/except within the flow method to handle crew failures gracefully:
@listen(prepare_data)
def run_analysis(self):
try:
result = analysis_crew.kickoff(inputs={"data": self.state.data})
self.state.analysis = result.raw
except Exception as e:
self.state.analysis = f"Analysis failed: {e}"
self.state.status = "error"
Example
from crewai import Agent, Task, Crew, Process
from crewai.flow.flow import Flow, FlowState, start, listen
from pydantic import Field
class ContentState(FlowState):
topic: str = ""
research: str = ""
article: str = ""
class ContentPipeline(Flow[ContentState]):
@start()
def set_topic(self):
self.state.topic = "The Future of AI Agents"
return self.state.topic
@listen(set_topic)
def research_topic(self):
"""Use a research crew to gather information."""
researcher = Agent(
role="Research Analyst",
goal=f"Research {self.state.topic} thoroughly",
backstory="Expert researcher with deep knowledge of AI.",
)
research_task = Task(
description="Research the topic: {topic}. Provide detailed findings.",
expected_output="Comprehensive research notes",
agent=researcher,
)
research_crew = Crew(
agents=[researcher],
tasks=[research_task],
process=Process.sequential,
)
result = research_crew.kickoff(inputs={"topic": self.state.topic})
self.state.research = result.raw
return self.state.research
@listen(research_topic)
def write_article(self):
"""Use a writing crew to produce the final article."""
writer = Agent(
role="Content Writer",
goal="Write an engaging article based on research",
backstory="Skilled writer who creates compelling content.",
)
writing_task = Task(
description="Write an article about {topic} using these notes: {research}",
expected_output="A polished article of 500+ words",
agent=writer,
)
writing_crew = Crew(
agents=[writer],
tasks=[writing_task],
process=Process.sequential,
)
result = writing_crew.kickoff(inputs={
"topic": self.state.topic,
"research": self.state.research,
})
self.state.article = result.raw
return self.state.article
# Execute the full pipeline
flow = ContentPipeline()
final_article = flow.kickoff()
print(flow.state.article)
Related Pages
- Principle:CrewAIInc_CrewAI_Crew_Integration_In_Flow
- Implementation:CrewAIInc_CrewAI_Flow_State_Model -- State model used to pass data to and from crews
- Implementation:CrewAIInc_CrewAI_Flow_Decorators -- Decorators on the methods that call
crew.kickoff() - Implementation:CrewAIInc_CrewAI_Flow_Kickoff_And_Plot -- Flow execution that triggers crew-calling methods