Implementation:Apache Paimon FileIO
| Knowledge Sources | |
|---|---|
| Domains | File System, I/O Operations |
| Last Updated | 2026-02-08 00:00 GMT |
Overview
FileIO provides an abstract file system interface for reading and writing files across different storage backends including local file systems, HDFS, S3, and OSS.
Description
The FileIO abstract base class defines a comprehensive interface for file system operations that works across multiple storage backends. It provides core operations like reading and writing files, listing directories, checking existence, creating directories, deleting files and directories, and renaming files. The interface abstracts away the differences between local file systems, cloud object stores, and distributed file systems.
The class includes convenience methods built on top of the core operations: read_file_utf8() and write_file() for text file operations, copy_file() and copy_files() for file copying with optional overwrite, try_to_write_atomic() for atomic file creation using temporary files and rename, and delete_quietly() methods that suppress errors for more resilient deletion operations.
FileIO supports operations specific to table formats like write_parquet(), write_orc(), write_avro(), write_lance(), and write_blob() for different file formats, though these are marked for implementation by subclasses. The static get() factory method automatically selects between LocalFileIO for file:// URIs and PyArrowFileIO for remote storage (s3://, hdfs://, oss://, etc.), enabling transparent file system abstraction throughout Paimon.
Usage
Use FileIO when implementing storage operations that need to work across multiple file system types, abstracting file system differences in catalog and table implementations, or building portable file access layers.
Code Reference
Source Location
- Repository: Apache_Paimon
- File: paimon-python/pypaimon/common/file_io.py
Signature
class FileIO(ABC):
"""File IO interface to read and write files."""
@abstractmethod
def new_input_stream(self, path: str):
pass
@abstractmethod
def new_output_stream(self, path: str):
pass
@abstractmethod
def get_file_status(self, path: str):
pass
@abstractmethod
def list_status(self, path: str):
pass
@abstractmethod
def exists(self, path: str) -> bool:
pass
@abstractmethod
def delete(self, path: str, recursive: bool = False) -> bool:
pass
@abstractmethod
def mkdirs(self, path: str) -> bool:
pass
@abstractmethod
def rename(self, src: str, dst: str) -> bool:
pass
def read_file_utf8(self, path: str) -> str:
pass
def write_file(self, path: str, content: str, overwrite: bool = False):
pass
def try_to_write_atomic(self, path: str, content: str) -> bool:
pass
def copy_file(self, source_path: str, target_path: str, overwrite: bool = False):
pass
def delete_quietly(self, path: str):
pass
def delete_directory_quietly(self, directory: str):
pass
@staticmethod
def get(path: str, catalog_options: Optional[Options] = None) -> 'FileIO':
pass
Import
from pypaimon.common.file_io import FileIO
I/O Contract
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| path | str | Yes | File or directory path (can be URI) |
| src | str | Yes | Source path for rename/copy operations |
| dst | str | Yes | Destination path for rename/copy operations |
| recursive | bool | No | Whether to delete recursively (default: False) |
| overwrite | bool | No | Whether to overwrite existing files (default: False) |
| content | str | Yes | Text content to write to file |
| catalog_options | Options | No | Catalog options for FileIO configuration |
Outputs
| Name | Type | Description |
|---|---|---|
| stream | InputStream/OutputStream | File stream for reading or writing |
| exists | bool | Whether file or directory exists |
| success | bool | Whether operation succeeded |
| content | str | UTF-8 decoded file content |
| file_io | FileIO | FileIO instance for the given path |
Usage Examples
from pypaimon.common.file_io import FileIO
from pypaimon.common.options import Options
# Get appropriate FileIO for different paths
local_io = FileIO.get("/tmp/data") # Returns LocalFileIO
s3_io = FileIO.get("s3://bucket/data") # Returns PyArrowFileIO
hdfs_io = FileIO.get("hdfs://namenode:9000/data") # Returns PyArrowFileIO
# Read and write text files
file_io = FileIO.get("/tmp/data")
file_io.write_file("/tmp/data/config.txt", "key=value\n")
content = file_io.read_file_utf8("/tmp/data/config.txt")
print(content) # "key=value\n"
# Atomic write with temporary file
success = file_io.try_to_write_atomic("/tmp/data/important.txt", "critical data")
if success:
print("File written atomically")
# Copy files
file_io.copy_file(
"/tmp/source/file.txt",
"/tmp/dest/file.txt",
overwrite=True
)
# Directory operations
file_io.mkdirs("/tmp/data/subdir")
if file_io.exists("/tmp/data/subdir"):
print("Directory created")
# List files
file_infos = file_io.list_status("/tmp/data")
for info in file_infos:
print(f"{info.path}: {info.size} bytes")
# Safe deletion with error suppression
file_io.delete_quietly("/tmp/data/may_not_exist.txt")
file_io.delete_directory_quietly("/tmp/data/old_dir")
# Stream operations
with file_io.new_input_stream("/tmp/data/input.bin") as in_stream:
data = in_stream.read()
with file_io.new_output_stream("/tmp/data/output.bin") as out_stream:
out_stream.write(b"binary data")
# Check file properties
if file_io.exists("/tmp/data/file.txt"):
size = file_io.get_file_size("/tmp/data/file.txt")
is_directory = file_io.is_dir("/tmp/data")
print(f"File size: {size}, Is directory: {is_directory}")
# Rename files
file_io.rename("/tmp/data/old.txt", "/tmp/data/new.txt")