Overview
Concrete tool for executing user-supplied Python code as deterministic, zero-cost data transformations in place of LLM calls, provided by DocETL.
Description
This module provides three operation classes: CodeMapOperation (transforms each document via a user-defined function), CodeReduceOperation (groups documents by key and reduces each group), and CodeFilterOperation (keeps documents where the function returns True). Each operation takes a code string or callable from the configuration, executes it via Python's exec() to define a transform function, and applies that function to input documents using a ThreadPoolExecutor for parallel processing. All operations incur zero LLM cost.
Usage
Use these operations when deterministic Python logic is more appropriate, cheaper, or faster than LLM-based processing. Typical scenarios include data cleaning, field extraction with regex, mathematical transformations, key-based aggregation, programmatic filtering by field values, or any transformation that can be expressed as a simple Python function.
Code Reference
Source Location
Signature
class CodeMapOperation(BaseOperation):
class schema(BaseOperation.schema):
type: str = "code_map"
code: Any
concurrent_thread_count: int = os.cpu_count()
drop_keys: list[str] | None = None
limit: int | None = Field(None, gt=0)
def syntax_check(self) -> None: ...
def execute(self, input_data: list[dict]) -> tuple[list[dict], float]: ...
class CodeReduceOperation(BaseOperation):
class schema(BaseOperation.schema):
type: str = "code_reduce"
code: Any
concurrent_thread_count: int = os.cpu_count()
limit: int | None = Field(None, gt=0)
def syntax_check(self) -> None: ...
def execute(self, input_data: list[dict]) -> tuple[list[dict], float]: ...
class CodeFilterOperation(BaseOperation):
class schema(BaseOperation.schema):
type: str = "code_filter"
code: Any
concurrent_thread_count: int = os.cpu_count()
limit: int | None = Field(None, gt=0)
def syntax_check(self) -> None: ...
def execute(self, input_data: list[dict]) -> tuple[list[dict], float]: ...
Import
from docetl.operations.code_operations import CodeMapOperation
from docetl.operations.code_operations import CodeReduceOperation
from docetl.operations.code_operations import CodeFilterOperation
I/O Contract
Inputs (CodeMapOperation)
| Name |
Type |
Required |
Description
|
| input_data |
List[Dict] |
Yes |
Documents to transform
|
| code |
str or Callable |
Yes |
Python code defining a transform function that takes a dict and returns a dict
|
| drop_keys |
List[str] |
No |
Keys to remove from transformed output
|
| concurrent_thread_count |
int |
No |
Number of parallel threads (default: CPU count)
|
| limit |
int |
No |
Maximum number of input documents to process
|
Inputs (CodeReduceOperation)
| Name |
Type |
Required |
Description
|
| input_data |
List[Dict] |
Yes |
Documents to group and reduce
|
| code |
str or Callable |
Yes |
Python code defining a transform function that takes a list of dicts and returns a dict
|
| reduce_key |
str or List[str] |
No |
Key(s) to group by (default "_all" for single group)
|
| pass_through |
bool |
No |
Whether to pass through fields from the first group item
|
| limit |
int |
No |
Maximum number of groups to process
|
Inputs (CodeFilterOperation)
| Name |
Type |
Required |
Description
|
| input_data |
List[Dict] |
Yes |
Documents to filter
|
| code |
str or Callable |
Yes |
Python code defining a transform function that takes a dict and returns a bool
|
| concurrent_thread_count |
int |
No |
Number of parallel threads (default: CPU count)
|
| limit |
int |
No |
Maximum number of documents to keep
|
Outputs
| Name |
Type |
Description
|
| output |
Tuple[List[Dict], float] |
Transformed/filtered/reduced documents and cost (always 0.0)
|
Usage Examples
# YAML pipeline configuration for code_map
operations:
- name: clean_text
type: code_map
code: |
def transform(doc):
doc["text"] = doc["text"].strip().lower()
doc["word_count"] = len(doc["text"].split())
return doc
# YAML pipeline configuration for code_reduce
operations:
- name: aggregate_by_category
type: code_reduce
reduce_key: category
code: |
def transform(items):
return {
"category": items[0]["category"],
"count": len(items),
"avg_score": sum(i["score"] for i in items) / len(items),
}
# YAML pipeline configuration for code_filter
operations:
- name: filter_long_docs
type: code_filter
code: |
def transform(doc):
return len(doc.get("text", "")) > 100
# Python API usage with callable
from docetl.operations.code_operations import CodeMapOperation
def my_transform(doc):
doc["upper_title"] = doc["title"].upper()
return doc
config = {
"name": "uppercase_titles",
"type": "code_map",
"code": my_transform,
}
op = CodeMapOperation(runner, config, default_model, max_threads)
results, cost = op.execute(input_data)
# cost is always 0.0
Related Pages