Implementation:ArroyoSystems Arroyo Python Interpreter
| Knowledge Sources | |
|---|---|
| Domains | Streaming, UDF, Python |
| Last Updated | 2026-02-08 08:00 GMT |
Overview
SubInterpreter provides isolated Python sub-interpreter support for Arroyo's Python UDF system, enabling each UDF to execute in its own GIL-independent Python interpreter via CPython's sub-interpreter API.
Description
The SubInterpreter struct wraps CPython's sub-interpreter functionality (Py_NewInterpreterFromConfig / Py_EndInterpreter) to provide isolated Python execution environments. This is adapted from the RisingWave Labs arrow-udf-python project.
Key design decisions:
- GIL Isolation: Each sub-interpreter owns its own GIL (PyInterpreterConfig_OWN_GIL), allowing multiple Python UDFs to execute concurrently without contention on a single GIL.
- Safety Configuration: Sub-interpreters are configured with restrictive permissions: no forking (allow_fork: 0), no exec (allow_exec: 0), no threads (allow_threads: 0), no daemon threads (allow_daemon_threads: 0), and multi-interpreter extension checking enabled.
- Initialization: Before creating sub-interpreters, the decimal module is imported in the main interpreter to avoid a known SIGABRT bug when importing decimal in the second sub-interpreter.
- with_gil: The main API method acquires the sub-interpreter's GIL via PyEval_RestoreThread, creates a pyo3 GILPool to maintain pyo3's internal GIL reference count, executes the closure, and releases the GIL via PyEval_SaveThread.
The PyError type wraps anyhow::Error and intentionally discards the original PyErr type information (retaining only the message string) to prevent use-after-free issues when PyErr objects are dropped outside their originating sub-interpreter.
Usage
Used internally by the ThreadedUdfInterpreter to execute Python UDFs. Each Python UDF gets its own SubInterpreter running on a dedicated thread, providing complete isolation between UDFs.
Code Reference
Source Location
- Repository: ArroyoSystems_Arroyo
- File: crates/arroyo-udf/arroyo-udf-python/src/interpreter.rs
Signature
#[derive(Debug)]
pub struct SubInterpreter {
state: *mut PyThreadState,
}
impl SubInterpreter {
pub fn new() -> Result<Self, PyError>;
pub fn with_gil<F, R>(&self, f: F) -> Result<R, PyError>
where
F: for<'py> FnOnce(Python<'py>) -> Result<R, PyError>;
}
impl Drop for SubInterpreter {
fn drop(&mut self);
}
#[derive(Debug)]
pub struct PyError {
anyhow: anyhow::Error,
}
impl From<PyErr> for PyError { ... }
impl From<anyhow::Error> for PyError { ... }
impl From<PyError> for anyhow::Error { ... }
Import
use arroyo_udf_python::interpreter::{SubInterpreter, PyError};
I/O Contract
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| closure | FnOnce(Python) -> Result<R, PyError> | Yes | A closure to execute within the sub-interpreter's GIL context |
Outputs
| Name | Type | Description |
|---|---|---|
| result | Result<R, PyError> | The return value from the closure, or a PyError if execution failed |
Usage Examples
use arroyo_udf_python::interpreter::SubInterpreter;
// Create an isolated Python sub-interpreter
let interpreter = SubInterpreter::new()?;
// Execute Python code within the sub-interpreter
let result = interpreter.with_gil(|py| {
py.run_bound("x = 1 + 2", None, None)?;
let x = py.eval_bound("x", None, None)?;
Ok(x.extract::<i64>()?)
})?;
assert_eq!(result, 3);
// SubInterpreter is destroyed on drop, releasing its GIL and state