Jump to content

Connect Leeroopedia MCP: Equip your AI agents to search best practices, build plans, verify code, diagnose failures, and look up hyperparameter defaults.

Implementation:Ucbepic Docetl CodeOperations

From Leeroopedia


Knowledge Sources
Domains Data_Processing, Code_Execution
Last Updated 2026-02-08 00:00 GMT

Overview

Concrete tool for executing user-supplied Python code as deterministic, zero-cost data transformations in place of LLM calls, provided by DocETL.

Description

This module provides three operation classes: CodeMapOperation (transforms each document via a user-defined function), CodeReduceOperation (groups documents by key and reduces each group), and CodeFilterOperation (keeps documents where the function returns True). Each operation takes a code string or callable from the configuration, executes it via Python's exec() to define a transform function, and applies that function to input documents using a ThreadPoolExecutor for parallel processing. All operations incur zero LLM cost.

Usage

Use these operations when deterministic Python logic is more appropriate, cheaper, or faster than LLM-based processing. Typical scenarios include data cleaning, field extraction with regex, mathematical transformations, key-based aggregation, programmatic filtering by field values, or any transformation that can be expressed as a simple Python function.

Code Reference

Source Location

Signature

class CodeMapOperation(BaseOperation):
    class schema(BaseOperation.schema):
        type: str = "code_map"
        code: Any
        concurrent_thread_count: int = os.cpu_count()
        drop_keys: list[str] | None = None
        limit: int | None = Field(None, gt=0)

    def syntax_check(self) -> None: ...
    def execute(self, input_data: list[dict]) -> tuple[list[dict], float]: ...


class CodeReduceOperation(BaseOperation):
    class schema(BaseOperation.schema):
        type: str = "code_reduce"
        code: Any
        concurrent_thread_count: int = os.cpu_count()
        limit: int | None = Field(None, gt=0)

    def syntax_check(self) -> None: ...
    def execute(self, input_data: list[dict]) -> tuple[list[dict], float]: ...


class CodeFilterOperation(BaseOperation):
    class schema(BaseOperation.schema):
        type: str = "code_filter"
        code: Any
        concurrent_thread_count: int = os.cpu_count()
        limit: int | None = Field(None, gt=0)

    def syntax_check(self) -> None: ...
    def execute(self, input_data: list[dict]) -> tuple[list[dict], float]: ...

Import

from docetl.operations.code_operations import CodeMapOperation
from docetl.operations.code_operations import CodeReduceOperation
from docetl.operations.code_operations import CodeFilterOperation

I/O Contract

Inputs (CodeMapOperation)

Name Type Required Description
input_data List[Dict] Yes Documents to transform
code str or Callable Yes Python code defining a transform function that takes a dict and returns a dict
drop_keys List[str] No Keys to remove from transformed output
concurrent_thread_count int No Number of parallel threads (default: CPU count)
limit int No Maximum number of input documents to process

Inputs (CodeReduceOperation)

Name Type Required Description
input_data List[Dict] Yes Documents to group and reduce
code str or Callable Yes Python code defining a transform function that takes a list of dicts and returns a dict
reduce_key str or List[str] No Key(s) to group by (default "_all" for single group)
pass_through bool No Whether to pass through fields from the first group item
limit int No Maximum number of groups to process

Inputs (CodeFilterOperation)

Name Type Required Description
input_data List[Dict] Yes Documents to filter
code str or Callable Yes Python code defining a transform function that takes a dict and returns a bool
concurrent_thread_count int No Number of parallel threads (default: CPU count)
limit int No Maximum number of documents to keep

Outputs

Name Type Description
output Tuple[List[Dict], float] Transformed/filtered/reduced documents and cost (always 0.0)

Usage Examples

# YAML pipeline configuration for code_map
operations:
  - name: clean_text
    type: code_map
    code: |
      def transform(doc):
          doc["text"] = doc["text"].strip().lower()
          doc["word_count"] = len(doc["text"].split())
          return doc

# YAML pipeline configuration for code_reduce
operations:
  - name: aggregate_by_category
    type: code_reduce
    reduce_key: category
    code: |
      def transform(items):
          return {
              "category": items[0]["category"],
              "count": len(items),
              "avg_score": sum(i["score"] for i in items) / len(items),
          }

# YAML pipeline configuration for code_filter
operations:
  - name: filter_long_docs
    type: code_filter
    code: |
      def transform(doc):
          return len(doc.get("text", "")) > 100
# Python API usage with callable
from docetl.operations.code_operations import CodeMapOperation

def my_transform(doc):
    doc["upper_title"] = doc["title"].upper()
    return doc

config = {
    "name": "uppercase_titles",
    "type": "code_map",
    "code": my_transform,
}
op = CodeMapOperation(runner, config, default_model, max_threads)
results, cost = op.execute(input_data)
# cost is always 0.0

Related Pages

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment