Jump to content

Connect SuperML | Leeroopedia MCP: Equip your AI agents with best practices, code verification, and debugging knowledge. Powered by Leeroo — building Organizational Superintelligence. Contact us at founders@leeroo.com.

Implementation:CrewAIInc CrewAI Crew Kickoff In Flow

From Leeroopedia
Revision as of 11:07, 16 February 2026 by Admin (talk | contribs) (Auto-imported from implementations/CrewAIInc_CrewAI_Crew_Kickoff_In_Flow.md)
(diff) ← Older revision | Latest revision (diff) | Newer revision → (diff)

Overview

Crew Kickoff in Flow documents the pattern for calling Crew.kickoff() within Flow method bodies to bridge the crew execution model and the flow execution model. This is a Wrapper Doc -- it describes how the existing Crew.kickoff() API is used in the context of flow methods rather than documenting a single class or function.

Source Reference

Component File Lines
Crew.kickoff() method src/crewai/crew.py L696-773
Flow base class (hosts crew-calling methods) src/crewai/flow/flow.py L690-778
CrewOutput result type src/crewai/crew.py (return type of kickoff)

Signature

# Crew.kickoff() signature
def kickoff(
    self,
    inputs: dict[str, Any] | None = None,
    input_files: dict[str, FileInput] | None = None,
) -> CrewOutput | CrewStreamingOutput:
    """Execute the crew's workflow.

    Args:
        inputs: Optional input dictionary for task interpolation.
        input_files: Optional dict of named file inputs for the crew.

    Returns:
        CrewOutput or CrewStreamingOutput if streaming is enabled.
    """

Import

from crewai import Agent, Task, Crew, Process
from crewai.flow.flow import Flow, FlowState, start, listen

I/O Contract

Stage Type Description
Flow state to crew inputs dict[str, Any] Extract fields from self.state into a dict passed to crew.kickoff(inputs=...)
Crew execution CrewOutput Crew processes tasks using agents, returns CrewOutput
Crew output to flow state str / Pydantic model / dict Extract .raw, .pydantic, or .json_dict from CrewOutput and assign to self.state

Detailed Behavior

Data Flow: State to Crew to State

The integration follows a three-phase data flow:

Phase 1: State to Crew Inputs

Flow methods read from self.state and construct the inputs dictionary. Task descriptions in the crew use {variable_name} placeholders that are interpolated with values from this inputs dict.

@listen(gather_requirements)
def research_topic(self):
    inputs = {
        "topic": self.state.topic,
        "constraints": self.state.constraints,
    }
    # Task descriptions reference {topic} and {constraints}
    result = research_crew.kickoff(inputs=inputs)

Phase 2: Crew Execution

Crew.kickoff() executes the crew's tasks either sequentially (Process.sequential) or hierarchically (Process.hierarchical). The crew is self-contained and does not access the flow's state directly. It returns a CrewOutput object.

Phase 3: Crew Output to State

The flow method extracts results from CrewOutput and stores them back into self.state:

  • result.raw -- The raw string output from the final task
  • result.pydantic -- A Pydantic model if output_pydantic was set on the final task
  • result.json_dict -- A dictionary if output_json was set on the final task

Error Handling

If a crew raises an exception during execution, it propagates up through the flow method. The flow's event system emits a MethodExecutionFailedEvent. Developers can use try/except within the flow method to handle crew failures gracefully:

@listen(prepare_data)
def run_analysis(self):
    try:
        result = analysis_crew.kickoff(inputs={"data": self.state.data})
        self.state.analysis = result.raw
    except Exception as e:
        self.state.analysis = f"Analysis failed: {e}"
        self.state.status = "error"

Example

from crewai import Agent, Task, Crew, Process
from crewai.flow.flow import Flow, FlowState, start, listen
from pydantic import Field


class ContentState(FlowState):
    topic: str = ""
    research: str = ""
    article: str = ""


class ContentPipeline(Flow[ContentState]):

    @start()
    def set_topic(self):
        self.state.topic = "The Future of AI Agents"
        return self.state.topic

    @listen(set_topic)
    def research_topic(self):
        """Use a research crew to gather information."""
        researcher = Agent(
            role="Research Analyst",
            goal=f"Research {self.state.topic} thoroughly",
            backstory="Expert researcher with deep knowledge of AI.",
        )
        research_task = Task(
            description="Research the topic: {topic}. Provide detailed findings.",
            expected_output="Comprehensive research notes",
            agent=researcher,
        )
        research_crew = Crew(
            agents=[researcher],
            tasks=[research_task],
            process=Process.sequential,
        )

        result = research_crew.kickoff(inputs={"topic": self.state.topic})
        self.state.research = result.raw
        return self.state.research

    @listen(research_topic)
    def write_article(self):
        """Use a writing crew to produce the final article."""
        writer = Agent(
            role="Content Writer",
            goal="Write an engaging article based on research",
            backstory="Skilled writer who creates compelling content.",
        )
        writing_task = Task(
            description="Write an article about {topic} using these notes: {research}",
            expected_output="A polished article of 500+ words",
            agent=writer,
        )
        writing_crew = Crew(
            agents=[writer],
            tasks=[writing_task],
            process=Process.sequential,
        )

        result = writing_crew.kickoff(inputs={
            "topic": self.state.topic,
            "research": self.state.research,
        })
        self.state.article = result.raw
        return self.state.article


# Execute the full pipeline
flow = ContentPipeline()
final_article = flow.kickoff()
print(flow.state.article)

Related Pages

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment