Jump to content

Connect Leeroopedia MCP: Equip your AI agents to search best practices, build plans, verify code, diagnose failures, and look up hyperparameter defaults.

Implementation:CrewAIInc CrewAI Crew Kickoff In Flow

From Leeroopedia

Overview

Crew Kickoff in Flow documents the pattern for calling Crew.kickoff() within Flow method bodies to bridge the crew execution model and the flow execution model. This is a Wrapper Doc -- it describes how the existing Crew.kickoff() API is used in the context of flow methods rather than documenting a single class or function.

Source Reference

Component File Lines
Crew.kickoff() method src/crewai/crew.py L696-773
Flow base class (hosts crew-calling methods) src/crewai/flow/flow.py L690-778
CrewOutput result type src/crewai/crew.py (return type of kickoff)

Signature

# Crew.kickoff() signature
def kickoff(
    self,
    inputs: dict[str, Any] | None = None,
    input_files: dict[str, FileInput] | None = None,
) -> CrewOutput | CrewStreamingOutput:
    """Execute the crew's workflow.

    Args:
        inputs: Optional input dictionary for task interpolation.
        input_files: Optional dict of named file inputs for the crew.

    Returns:
        CrewOutput or CrewStreamingOutput if streaming is enabled.
    """

Import

from crewai import Agent, Task, Crew, Process
from crewai.flow.flow import Flow, FlowState, start, listen

I/O Contract

Stage Type Description
Flow state to crew inputs dict[str, Any] Extract fields from self.state into a dict passed to crew.kickoff(inputs=...)
Crew execution CrewOutput Crew processes tasks using agents, returns CrewOutput
Crew output to flow state str / Pydantic model / dict Extract .raw, .pydantic, or .json_dict from CrewOutput and assign to self.state

Detailed Behavior

Data Flow: State to Crew to State

The integration follows a three-phase data flow:

Phase 1: State to Crew Inputs

Flow methods read from self.state and construct the inputs dictionary. Task descriptions in the crew use {variable_name} placeholders that are interpolated with values from this inputs dict.

@listen(gather_requirements)
def research_topic(self):
    inputs = {
        "topic": self.state.topic,
        "constraints": self.state.constraints,
    }
    # Task descriptions reference {topic} and {constraints}
    result = research_crew.kickoff(inputs=inputs)

Phase 2: Crew Execution

Crew.kickoff() executes the crew's tasks either sequentially (Process.sequential) or hierarchically (Process.hierarchical). The crew is self-contained and does not access the flow's state directly. It returns a CrewOutput object.

Phase 3: Crew Output to State

The flow method extracts results from CrewOutput and stores them back into self.state:

  • result.raw -- The raw string output from the final task
  • result.pydantic -- A Pydantic model if output_pydantic was set on the final task
  • result.json_dict -- A dictionary if output_json was set on the final task

Error Handling

If a crew raises an exception during execution, it propagates up through the flow method. The flow's event system emits a MethodExecutionFailedEvent. Developers can use try/except within the flow method to handle crew failures gracefully:

@listen(prepare_data)
def run_analysis(self):
    try:
        result = analysis_crew.kickoff(inputs={"data": self.state.data})
        self.state.analysis = result.raw
    except Exception as e:
        self.state.analysis = f"Analysis failed: {e}"
        self.state.status = "error"

Example

from crewai import Agent, Task, Crew, Process
from crewai.flow.flow import Flow, FlowState, start, listen
from pydantic import Field


class ContentState(FlowState):
    topic: str = ""
    research: str = ""
    article: str = ""


class ContentPipeline(Flow[ContentState]):

    @start()
    def set_topic(self):
        self.state.topic = "The Future of AI Agents"
        return self.state.topic

    @listen(set_topic)
    def research_topic(self):
        """Use a research crew to gather information."""
        researcher = Agent(
            role="Research Analyst",
            goal=f"Research {self.state.topic} thoroughly",
            backstory="Expert researcher with deep knowledge of AI.",
        )
        research_task = Task(
            description="Research the topic: {topic}. Provide detailed findings.",
            expected_output="Comprehensive research notes",
            agent=researcher,
        )
        research_crew = Crew(
            agents=[researcher],
            tasks=[research_task],
            process=Process.sequential,
        )

        result = research_crew.kickoff(inputs={"topic": self.state.topic})
        self.state.research = result.raw
        return self.state.research

    @listen(research_topic)
    def write_article(self):
        """Use a writing crew to produce the final article."""
        writer = Agent(
            role="Content Writer",
            goal="Write an engaging article based on research",
            backstory="Skilled writer who creates compelling content.",
        )
        writing_task = Task(
            description="Write an article about {topic} using these notes: {research}",
            expected_output="A polished article of 500+ words",
            agent=writer,
        )
        writing_crew = Crew(
            agents=[writer],
            tasks=[writing_task],
            process=Process.sequential,
        )

        result = writing_crew.kickoff(inputs={
            "topic": self.state.topic,
            "research": self.state.research,
        })
        self.state.article = result.raw
        return self.state.article


# Execute the full pipeline
flow = ContentPipeline()
final_article = flow.kickoff()
print(flow.state.article)

Related Pages

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment