Jump to content

Connect SuperML | Leeroopedia MCP: Equip your AI agents with best practices, code verification, and debugging knowledge. Powered by Leeroo — building Organizational Superintelligence. Contact us at founders@leeroo.com.

Implementation:TobikoData Sqlmesh Lineage Worker

From Leeroopedia


Knowledge Sources
Domains Web_UI, Data_Lineage, Web_Workers
Last Updated 2026-02-07 20:00 GMT

Overview

Web Worker for computing model dependency graphs and lineage connections in the SQLMesh web UI.

Description

The Lineage_Worker is a dedicated Web Worker that processes model lineage data off the main thread to compute upstream and downstream dependencies, connected node graphs, and column-level lineage relationships. It implements graph traversal algorithms to build complete dependency chains for visualizing data flow in the SQLMesh UI, handling both model-level and column-level lineage information.

Usage

Use this worker to compute lineage graphs for the SQLMesh visualization UI. Post lineage data with a main node identifier, and receive back a complete graph of connected nodes with their relationships.

Code Reference

Source Location

Message Protocol

// Input message
{
  topic: 'lineage',
  payload: {
    currentLineage: Record<string, Lineage>,
    newLineage: Record<string, string[]>,
    mainNode: string
  }
}

// Output message
{
  topic: 'lineage',
  payload: {
    lineage: Record<string, Lineage>,
    nodesConnections: Record<string, ConnectedNode>
  }
}

// Error message
{
  topic: 'error',
  error: Error
}

// Types
interface Lineage {
  models: string[]
  columns?: Record<string, LineageColumn>
}

interface ConnectedNode {
  id?: Optional<string>
  edges: ConnectedNode[]
}

Import

// Worker is typically loaded via URL, not direct import
const lineageWorker = new Worker(
  new URL('./workers/lineage.ts', import.meta.url),
  { type: 'module' }
)

I/O Contract

Input Message

Field Type Required Description
topic 'lineage' Yes Message type identifier
payload.currentLineage Record<string, Lineage> Yes Existing lineage data with column info
payload.newLineage Record<string, string[]> Yes New model relationships (model -> dependencies)
payload.mainNode string Yes Starting node for graph traversal

Output Message

Field Type Description
topic 'lineage' or 'error' Message type identifier
payload.lineage Record<string, Lineage> Merged lineage with models and columns
payload.nodesConnections Record<string, ConnectedNode> Graph of connected nodes with edges
error Error Error object if topic is 'error'

Implementation Details

Graph Traversal Algorithm

The worker implements bidirectional graph traversal:

  • Upstream: Finds all models that the main node depends on (ancestors)
  • Downstream: Finds all models that depend on the main node (descendants)

Node Connection Structure

Each ConnectedNode contains:

  • id: Composite ID from source and target (e.g., "model_a__model_b")
  • edges: Array of connected nodes forming the graph

Lineage Merging

Merges new model relationships with existing column-level lineage:

  • Preserves column lineage information from currentLineage
  • Updates model dependencies from newLineage
  • URI encodes all node identifiers for safe usage

Direction Handling

  • Downstream: Find models where node appears in their dependencies
  • Upstream: Get direct dependencies of the node

Cycle Prevention

The algorithm checks if nodes are already in the result map before recursing, preventing infinite loops in cyclic graphs.

Algorithm Flow

mergeLineageWithModels

  1. Iterate through new lineage data
  2. URI encode model names
  3. Preserve existing column lineage
  4. Create merged lineage object

getNodesConnections

  1. Initialize empty distances map
  2. Traverse upstream from main node
  3. Traverse downstream from main node
  4. Return complete graph structure

getConnectedNodes (Recursive)

  1. Determine models based on direction
  2. Create node if not exists in result
  3. For each connected model:
    1. Create connected node with edge reference
    2. Add to result or append edge
    3. Recurse for that model

Usage Examples

// Create worker instance
const lineageWorker = new Worker(
  new URL('./workers/lineage.ts', import.meta.url),
  { type: 'module' }
)

// Send lineage computation request
lineageWorker.postMessage({
  topic: 'lineage',
  payload: {
    currentLineage: {
      'model_a': {
        models: ['source_1', 'source_2'],
        columns: { /* column lineage */ }
      }
    },
    newLineage: {
      'model_b': ['model_a', 'source_3'],
      'model_c': ['model_b']
    },
    mainNode: 'model_b'
  }
})

// Handle response
lineageWorker.onmessage = (e: MessageEvent) => {
  if (e.data.topic === 'lineage') {
    const { lineage, nodesConnections } = e.data.payload

    // lineage contains merged model and column info
    console.log('Models:', Object.keys(lineage))

    // nodesConnections is a graph structure
    console.log('Main node connections:', nodesConnections['model_b'])

    // Each node has edges to connected nodes
    nodesConnections['model_b'].edges.forEach(edge => {
      console.log('Connected via:', edge.id)
    })
  } else if (e.data.topic === 'error') {
    console.error('Lineage computation failed:', e.data.error)
  }
}

// Visualize the graph
function visualizeLineage(nodesConnections: Record<string, ConnectedNode>) {
  Object.entries(nodesConnections).forEach(([node, data]) => {
    console.log(`Node: ${node}`)
    console.log(`  Edges: ${data.edges.length}`)
    data.edges.forEach(edge => {
      console.log(`    -> ${edge.id}`)
    })
  })
}

Performance Considerations

  • Off main thread: Graph computation doesn't block UI
  • Efficient traversal: Checks for visited nodes to avoid reprocessing
  • Lazy evaluation: Only computes connections for requested main node
  • URI encoding: Handles special characters in model names

Related Pages

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment