Implementation:Apache Paimon UriReader
| Knowledge Sources | |
|---|---|
| Domains | File I/O, URI Handling |
| Last Updated | 2026-02-08 00:00 GMT |
Overview
UriReader provides abstraction for reading data from URIs with support for HTTP/HTTPS endpoints and file system paths through pluggable reader implementations with caching.
Description
The UriReader module defines an abstract interface for reading data from URIs regardless of the underlying protocol. The base UriReader class provides factory methods for creating protocol-specific readers: from_http() for HTTP/HTTPS URLs and from_file() for file system access. The get_file_path() utility method normalizes URIs to file system paths by stripping scheme information.
FileUriReader wraps a FileIO instance to handle file:// URIs and local paths, delegating to the underlying file system implementation. HttpUriReader uses the requests library to fetch content from HTTP/HTTPS endpoints, validating response status codes and wrapping content in BytesIO streams for consistent interface.
UriReaderFactory provides centralized reader creation with caching through an LRU cache. It maintains a thread-safe cache of readers keyed by UriKey (scheme + authority), avoiding redundant FileIO instantiation for repeated URI patterns. The factory handles URI parsing, dispatches to appropriate reader implementations, and automatically instantiates FileIO instances for non-HTTP schemes. This design enables efficient resource sharing while supporting diverse URI schemes including file://, http://, https://, s3://, hdfs://, and oss://.
Usage
Use UriReader when implementing components that need to read data from various URI sources, abstracting away protocol differences in blob storage or data lake integrations, or building systems with cached URI access patterns.
Code Reference
Source Location
- Repository: Apache_Paimon
- File: paimon-python/pypaimon/common/uri_reader.py
Signature
class UriReader(ABC):
@classmethod
def from_http(cls) -> 'HttpUriReader':
pass
@classmethod
def from_file(cls, file_io: Any) -> 'FileUriReader':
pass
@classmethod
def get_file_path(cls, uri: str):
pass
@abstractmethod
def new_input_stream(self, uri: str):
pass
class FileUriReader(UriReader):
def __init__(self, file_io: Any):
pass
def new_input_stream(self, uri: str):
pass
class HttpUriReader(UriReader):
def new_input_stream(self, uri: str):
pass
class UriKey:
def __init__(self, scheme: Optional[str], authority: Optional[str]) -> None:
pass
class UriReaderFactory:
def __init__(self, catalog_options: Union[Options, dict]) -> None:
pass
def create(self, input_uri: str) -> UriReader:
pass
def clear_cache(self) -> None:
pass
def get_cache_size(self) -> int:
pass
Import
from pypaimon.common.uri_reader import UriReader, UriReaderFactory, FileUriReader, HttpUriReader
I/O Contract
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| uri | str | Yes | URI to read from (file://, http://, s3://, etc.) |
| file_io | FileIO | Yes | FileIO instance for file-based readers |
| catalog_options | Options | Yes | Catalog options for factory configuration |
Outputs
| Name | Type | Description |
|---|---|---|
| stream | InputStream | Input stream for reading URI content |
| reader | UriReader | UriReader implementation for the URI scheme |
| file_path | str | Normalized file system path |
Usage Examples
from pypaimon.common.uri_reader import UriReader, UriReaderFactory
from pypaimon.common.options import Options
from pypaimon.common.file_io import FileIO
# Create HTTP reader
http_reader = UriReader.from_http()
with http_reader.new_input_stream("https://example.com/data.json") as stream:
content = stream.read()
print(f"Downloaded {len(content)} bytes")
# Create file reader
file_io = FileIO.get("/tmp/data")
file_reader = UriReader.from_file(file_io)
with file_reader.new_input_stream("/tmp/data/file.txt") as stream:
data = stream.read()
# Use factory for automatic reader selection
options = Options({"warehouse": "/path/to/warehouse"})
factory = UriReaderFactory(options)
# HTTP URI - creates HttpUriReader
reader = factory.create("https://storage.example.com/manifest.json")
with reader.new_input_stream("https://storage.example.com/manifest.json") as stream:
manifest = stream.read()
# File URI - creates FileUriReader with appropriate FileIO
reader = factory.create("file:///tmp/local/data.parquet")
with reader.new_input_stream("file:///tmp/local/data.parquet") as stream:
parquet_data = stream.read()
# S3 URI - creates FileUriReader with PyArrowFileIO
reader = factory.create("s3://bucket/path/to/file.orc")
with reader.new_input_stream("s3://bucket/path/to/file.orc") as stream:
orc_data = stream.read()
# Factory caches readers by scheme and authority
reader1 = factory.create("s3://bucket-a/file1.txt")
reader2 = factory.create("s3://bucket-a/file2.txt")
# Both use same cached reader for s3://bucket-a
reader3 = factory.create("s3://bucket-b/file.txt")
# Different bucket, creates new reader
# Cache management
print(f"Cache size: {factory.get_cache_size()}")
factory.clear_cache() # Clear all cached readers
# Extract file path from URI
path = UriReader.get_file_path("file:///tmp/data/table.parquet")
print(path) # "/tmp/data/table.parquet"
path = UriReader.get_file_path("s3://bucket/key/file.txt")
print(path) # "bucket/key/file.txt"
path = UriReader.get_file_path("/local/path/file.txt")
print(path) # "/local/path/file.txt"
# Error handling
try:
reader = UriReader.from_http()
with reader.new_input_stream("https://invalid-url.example.com/404") as stream:
data = stream.read()
except RuntimeError as e:
print(f"HTTP error: {e}")
try:
file_io = FileIO.get("/nonexistent")
reader = UriReader.from_file(file_io)
with reader.new_input_stream("/nonexistent/file.txt") as stream:
data = stream.read()
except IOError as e:
print(f"File error: {e}")
# Thread-safe factory usage
import concurrent.futures
def read_uri(uri: str):
reader = factory.create(uri)
with reader.new_input_stream(uri) as stream:
return len(stream.read())
uris = [
"https://example.com/file1.txt",
"https://example.com/file2.txt",
"s3://bucket/file3.txt"
]
with concurrent.futures.ThreadPoolExecutor() as executor:
sizes = list(executor.map(read_uri, uris))
print(f"File sizes: {sizes}")