Implementation:TobikoData Sqlmesh Lineage Worker
| Knowledge Sources | |
|---|---|
| Domains | Web_UI, Data_Lineage, Web_Workers |
| Last Updated | 2026-02-07 20:00 GMT |
Overview
Web Worker for computing model dependency graphs and lineage connections in the SQLMesh web UI.
Description
The Lineage_Worker is a dedicated Web Worker that processes model lineage data off the main thread to compute upstream and downstream dependencies, connected node graphs, and column-level lineage relationships. It implements graph traversal algorithms to build complete dependency chains for visualizing data flow in the SQLMesh UI, handling both model-level and column-level lineage information.
Usage
Use this worker to compute lineage graphs for the SQLMesh visualization UI. Post lineage data with a main node identifier, and receive back a complete graph of connected nodes with their relationships.
Code Reference
Source Location
- Repository: TobikoData_Sqlmesh
- File: web/client/src/workers/lineage.ts
Message Protocol
// Input message
{
topic: 'lineage',
payload: {
currentLineage: Record<string, Lineage>,
newLineage: Record<string, string[]>,
mainNode: string
}
}
// Output message
{
topic: 'lineage',
payload: {
lineage: Record<string, Lineage>,
nodesConnections: Record<string, ConnectedNode>
}
}
// Error message
{
topic: 'error',
error: Error
}
// Types
interface Lineage {
models: string[]
columns?: Record<string, LineageColumn>
}
interface ConnectedNode {
id?: Optional<string>
edges: ConnectedNode[]
}
Import
// Worker is typically loaded via URL, not direct import
const lineageWorker = new Worker(
new URL('./workers/lineage.ts', import.meta.url),
{ type: 'module' }
)
I/O Contract
Input Message
| Field | Type | Required | Description |
|---|---|---|---|
| topic | 'lineage' | Yes | Message type identifier |
| payload.currentLineage | Record<string, Lineage> | Yes | Existing lineage data with column info |
| payload.newLineage | Record<string, string[]> | Yes | New model relationships (model -> dependencies) |
| payload.mainNode | string | Yes | Starting node for graph traversal |
Output Message
| Field | Type | Description |
|---|---|---|
| topic | 'lineage' or 'error' | Message type identifier |
| payload.lineage | Record<string, Lineage> | Merged lineage with models and columns |
| payload.nodesConnections | Record<string, ConnectedNode> | Graph of connected nodes with edges |
| error | Error | Error object if topic is 'error' |
Implementation Details
Graph Traversal Algorithm
The worker implements bidirectional graph traversal:
- Upstream: Finds all models that the main node depends on (ancestors)
- Downstream: Finds all models that depend on the main node (descendants)
Node Connection Structure
Each ConnectedNode contains:
- id: Composite ID from source and target (e.g., "model_a__model_b")
- edges: Array of connected nodes forming the graph
Lineage Merging
Merges new model relationships with existing column-level lineage:
- Preserves column lineage information from currentLineage
- Updates model dependencies from newLineage
- URI encodes all node identifiers for safe usage
Direction Handling
- Downstream: Find models where node appears in their dependencies
- Upstream: Get direct dependencies of the node
Cycle Prevention
The algorithm checks if nodes are already in the result map before recursing, preventing infinite loops in cyclic graphs.
Algorithm Flow
mergeLineageWithModels
- Iterate through new lineage data
- URI encode model names
- Preserve existing column lineage
- Create merged lineage object
getNodesConnections
- Initialize empty distances map
- Traverse upstream from main node
- Traverse downstream from main node
- Return complete graph structure
getConnectedNodes (Recursive)
- Determine models based on direction
- Create node if not exists in result
- For each connected model:
- Create connected node with edge reference
- Add to result or append edge
- Recurse for that model
Usage Examples
// Create worker instance
const lineageWorker = new Worker(
new URL('./workers/lineage.ts', import.meta.url),
{ type: 'module' }
)
// Send lineage computation request
lineageWorker.postMessage({
topic: 'lineage',
payload: {
currentLineage: {
'model_a': {
models: ['source_1', 'source_2'],
columns: { /* column lineage */ }
}
},
newLineage: {
'model_b': ['model_a', 'source_3'],
'model_c': ['model_b']
},
mainNode: 'model_b'
}
})
// Handle response
lineageWorker.onmessage = (e: MessageEvent) => {
if (e.data.topic === 'lineage') {
const { lineage, nodesConnections } = e.data.payload
// lineage contains merged model and column info
console.log('Models:', Object.keys(lineage))
// nodesConnections is a graph structure
console.log('Main node connections:', nodesConnections['model_b'])
// Each node has edges to connected nodes
nodesConnections['model_b'].edges.forEach(edge => {
console.log('Connected via:', edge.id)
})
} else if (e.data.topic === 'error') {
console.error('Lineage computation failed:', e.data.error)
}
}
// Visualize the graph
function visualizeLineage(nodesConnections: Record<string, ConnectedNode>) {
Object.entries(nodesConnections).forEach(([node, data]) => {
console.log(`Node: ${node}`)
console.log(` Edges: ${data.edges.length}`)
data.edges.forEach(edge => {
console.log(` -> ${edge.id}`)
})
})
}
Performance Considerations
- Off main thread: Graph computation doesn't block UI
- Efficient traversal: Checks for visited nodes to avoid reprocessing
- Lazy evaluation: Only computes connections for requested main node
- URI encoding: Handles special characters in model names