Jump to content

Connect Leeroopedia MCP: Equip your AI agents to search best practices, build plans, verify code, diagnose failures, and look up hyperparameter defaults.

Implementation:Apache Paimon UriReader

From Leeroopedia


Knowledge Sources
Domains File I/O, URI Handling
Last Updated 2026-02-08 00:00 GMT

Overview

UriReader provides abstraction for reading data from URIs with support for HTTP/HTTPS endpoints and file system paths through pluggable reader implementations with caching.

Description

The UriReader module defines an abstract interface for reading data from URIs regardless of the underlying protocol. The base UriReader class provides factory methods for creating protocol-specific readers: from_http() for HTTP/HTTPS URLs and from_file() for file system access. The get_file_path() utility method normalizes URIs to file system paths by stripping scheme information.

FileUriReader wraps a FileIO instance to handle file:// URIs and local paths, delegating to the underlying file system implementation. HttpUriReader uses the requests library to fetch content from HTTP/HTTPS endpoints, validating response status codes and wrapping content in BytesIO streams for consistent interface.

UriReaderFactory provides centralized reader creation with caching through an LRU cache. It maintains a thread-safe cache of readers keyed by UriKey (scheme + authority), avoiding redundant FileIO instantiation for repeated URI patterns. The factory handles URI parsing, dispatches to appropriate reader implementations, and automatically instantiates FileIO instances for non-HTTP schemes. This design enables efficient resource sharing while supporting diverse URI schemes including file://, http://, https://, s3://, hdfs://, and oss://.

Usage

Use UriReader when implementing components that need to read data from various URI sources, abstracting away protocol differences in blob storage or data lake integrations, or building systems with cached URI access patterns.

Code Reference

Source Location

Signature

class UriReader(ABC):
    @classmethod
    def from_http(cls) -> 'HttpUriReader':
        pass

    @classmethod
    def from_file(cls, file_io: Any) -> 'FileUriReader':
        pass

    @classmethod
    def get_file_path(cls, uri: str):
        pass

    @abstractmethod
    def new_input_stream(self, uri: str):
        pass

class FileUriReader(UriReader):
    def __init__(self, file_io: Any):
        pass

    def new_input_stream(self, uri: str):
        pass

class HttpUriReader(UriReader):
    def new_input_stream(self, uri: str):
        pass

class UriKey:
    def __init__(self, scheme: Optional[str], authority: Optional[str]) -> None:
        pass

class UriReaderFactory:
    def __init__(self, catalog_options: Union[Options, dict]) -> None:
        pass

    def create(self, input_uri: str) -> UriReader:
        pass

    def clear_cache(self) -> None:
        pass

    def get_cache_size(self) -> int:
        pass

Import

from pypaimon.common.uri_reader import UriReader, UriReaderFactory, FileUriReader, HttpUriReader

I/O Contract

Inputs

Name Type Required Description
uri str Yes URI to read from (file://, http://, s3://, etc.)
file_io FileIO Yes FileIO instance for file-based readers
catalog_options Options Yes Catalog options for factory configuration

Outputs

Name Type Description
stream InputStream Input stream for reading URI content
reader UriReader UriReader implementation for the URI scheme
file_path str Normalized file system path

Usage Examples

from pypaimon.common.uri_reader import UriReader, UriReaderFactory
from pypaimon.common.options import Options
from pypaimon.common.file_io import FileIO

# Create HTTP reader
http_reader = UriReader.from_http()
with http_reader.new_input_stream("https://example.com/data.json") as stream:
    content = stream.read()
    print(f"Downloaded {len(content)} bytes")

# Create file reader
file_io = FileIO.get("/tmp/data")
file_reader = UriReader.from_file(file_io)
with file_reader.new_input_stream("/tmp/data/file.txt") as stream:
    data = stream.read()

# Use factory for automatic reader selection
options = Options({"warehouse": "/path/to/warehouse"})
factory = UriReaderFactory(options)

# HTTP URI - creates HttpUriReader
reader = factory.create("https://storage.example.com/manifest.json")
with reader.new_input_stream("https://storage.example.com/manifest.json") as stream:
    manifest = stream.read()

# File URI - creates FileUriReader with appropriate FileIO
reader = factory.create("file:///tmp/local/data.parquet")
with reader.new_input_stream("file:///tmp/local/data.parquet") as stream:
    parquet_data = stream.read()

# S3 URI - creates FileUriReader with PyArrowFileIO
reader = factory.create("s3://bucket/path/to/file.orc")
with reader.new_input_stream("s3://bucket/path/to/file.orc") as stream:
    orc_data = stream.read()

# Factory caches readers by scheme and authority
reader1 = factory.create("s3://bucket-a/file1.txt")
reader2 = factory.create("s3://bucket-a/file2.txt")
# Both use same cached reader for s3://bucket-a

reader3 = factory.create("s3://bucket-b/file.txt")
# Different bucket, creates new reader

# Cache management
print(f"Cache size: {factory.get_cache_size()}")
factory.clear_cache()  # Clear all cached readers

# Extract file path from URI
path = UriReader.get_file_path("file:///tmp/data/table.parquet")
print(path)  # "/tmp/data/table.parquet"

path = UriReader.get_file_path("s3://bucket/key/file.txt")
print(path)  # "bucket/key/file.txt"

path = UriReader.get_file_path("/local/path/file.txt")
print(path)  # "/local/path/file.txt"

# Error handling
try:
    reader = UriReader.from_http()
    with reader.new_input_stream("https://invalid-url.example.com/404") as stream:
        data = stream.read()
except RuntimeError as e:
    print(f"HTTP error: {e}")

try:
    file_io = FileIO.get("/nonexistent")
    reader = UriReader.from_file(file_io)
    with reader.new_input_stream("/nonexistent/file.txt") as stream:
        data = stream.read()
except IOError as e:
    print(f"File error: {e}")

# Thread-safe factory usage
import concurrent.futures

def read_uri(uri: str):
    reader = factory.create(uri)
    with reader.new_input_stream(uri) as stream:
        return len(stream.read())

uris = [
    "https://example.com/file1.txt",
    "https://example.com/file2.txt",
    "s3://bucket/file3.txt"
]

with concurrent.futures.ThreadPoolExecutor() as executor:
    sizes = list(executor.map(read_uri, uris))
    print(f"File sizes: {sizes}")

Related Pages

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment