Implementation:Mage ai Mage ai Couchbase Source
| Knowledge Sources | |
|---|---|
| Domains | Data_Integration, Couchbase, Source_Connector, SQL_Based |
| Last Updated | 2026-02-09 00:00 GMT |
Overview
Concrete tool for extracting data from Couchbase collections provided by the Mage integrations source connector framework.
Description
The Couchbase source connector extends the SQL-based Source class (mage_integrations.sources.sql.base.Source) to implement data extraction from Couchbase NoSQL databases. It connects using bucket, scope, connection string, username, and password credentials through a CouchbaseConnection wrapper. Discovery enumerates all collections in the configured scope and uses Couchbase's INFER statement with configurable sample size (1000 documents) and similarity metrics to determine document schemas. The connector supports two schema strategies: INFER mode (default when a single schema is detected) selects the most common document structure and maps each field to its inferred type; COMBINE mode (default when multiple schemas are detected) collapses all documents into a single _document column of type object. Missing types in the INFER results are mapped to string. The connector overrides update_column_names() to replace the _document column with * for SQL queries, and _convert_to_rows() to unwrap Couchbase's nested response format. The test_connection() method verifies connectivity by retrieving the bucket. Custom comparison statement building supports quoted column names and datetime conversion.
Usage
Use this source connector when building a Mage data pipeline that needs to extract data from Couchbase. Configure with connection_string, bucket, scope, username, and password. Optionally set strategy to infer or combine to control schema discovery behavior.
Code Reference
Source Location
- Repository: mage-ai
- File: mage_integrations/mage_integrations/sources/couchbase/__init__.py
- Lines: 1-140
Signature
class Couchbase(Source):
def build_connection(self) -> CouchbaseConnection:
...
def discover(self, streams: List[str] = None) -> Catalog:
...
def test_connection(self):
...
def update_column_names(self, columns):
...
def _convert_to_rows(self, columns, rows_temp):
...
def _build_comparison_statement(self, col, val, properties, operator='=') -> str:
...
Import
from mage_integrations.sources.couchbase import Couchbase
I/O Contract
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| config | dict | Yes | Configuration dictionary with Couchbase connection credentials |
| catalog | Catalog | No | Singer catalog specifying streams to extract |
| state | dict | No | Previous sync state for incremental extraction |
Configuration Parameters
| Name | Type | Required | Description |
|---|---|---|---|
| connection_string | str | Yes | Couchbase connection string (e.g., couchbase://localhost) |
| bucket | str | Yes | Couchbase bucket name |
| scope | str | Yes | Couchbase scope name within the bucket |
| username | str | Yes | Couchbase username for authentication |
| password | str | Yes | Couchbase password for authentication |
| strategy | str | No | Schema discovery strategy: 'infer' (per-field types) or 'combine' (single document column) |
Outputs
| Name | Type | Description |
|---|---|---|
| catalog | Catalog | Discovered collections with schemas inferred via INFER statement (from discover()) |
| records | Generator[List[Dict]] | Batches of extracted records via SQL++ queries (inherited from SQL base Source) |
Usage Examples
from mage_integrations.sources.couchbase import Couchbase
config = {
"connection_string": "couchbase://localhost",
"bucket": "my-bucket",
"scope": "_default",
"username": "admin",
"password": "password123",
"strategy": "infer",
}
source = Couchbase(config=config)
# Discover available streams
catalog = source.discover()
# Test connection
source.test_connection()
Related Pages
Implements Principle
- Principle:Mage_ai_Mage_ai_Source_Lifecycle_Orchestration
- Principle:Mage_ai_Mage_ai_SQL_Schema_Discovery