SDK & Engine Architecture

Understanding the SimulationEngine, event sourcing, and workflow execution

SimulationEngine

Overview

The SimulationEngine is the core class that executes workflows defined in the V3 DSL (Domain-Specific Language). It processes tokens through a directed graph of nodes, maintaining an event-sourced ledger of all operations for full auditability and time-travel debugging.

Key Characteristics

  • Singleton Pattern: While not enforced as a strict singleton, typically one engine instance per execution
  • Immutable Event Log: All state changes recorded in append-only ledger
  • BFS Execution Strategy: Uses active task queue for breadth-first traversal
  • Deterministic: Given same inputs and DSL, produces identical results

External Event Queue

The ExternalEventQueue is a pre-loaded queue of events that can be mapped to DataSource nodes in the workflow. This allows you to feed real event data (e.g., from the Parser) into the engine instead of relying on generated data.

How It Works

  1. 1.Load Events: Populate the external event queue with parsed/reconciled events
  2. 2.Map to DataSources: Each event has a targetDataSourceId specifying which DataSource node it belongs to
  3. 3.Engine Initialization: Engine reads DSL (which contains DataSource definitions) and establishes the mapping
  4. 4.Step-by-Step Extraction: At each engine step, the active DataSource extracts events from the queue and emits them as tokens
  5. 5.Downstream Processing: Tokens flow through the DSL graph (ProcessNodes, FSMs, Aggregators, etc.)

Code Example

import { SimulationEngine } from './core';
import { ExternalEventQueue } from './core/ExternalEventQueue';

// Initialize engine with DSL
const engine = new SimulationEngine();
engine.initialize(dslTemplate);

// Get external event queue
const eventQueue = engine.getExternalEventQueue();

// Add events with target DataSource mapping
solarEvents.forEach(event => {
  eventQueue.addEvent({
    ...event,
    targetDataSourceId: 'solar_readings_source'  // Maps to DataSource node
  });
});

// Run simulation step-by-step
while (engine.getQueue().size() > 0) {
  await engine.step();
}

// Access results
const results = engine.getNodeState('final_sink');

Execution Model: BFS Strategy

The engine uses a Breadth-First Search (BFS) execution strategy implemented via an active task queue. This ensures that all nodes at a given "level" of the graph complete before moving to the next level.

Active Task Queue

The activeTaskQueue contains tasks (node operations) that are ready to execute.

  • Step 1: DataSource nodes generate tokens and add downstream nodes to the queue
  • Step 2: ProcessNodes receive tokens, transform them, and add their downstream nodes to the queue
  • Step 3: FSMs update state, Aggregators collect tokens, Multiplexers route tokens
  • Step 4: Sinks collect final output

Execution Flow

// Pseudo-code for engine.step()

1. Pop next task from activeTaskQueue
2. Execute task (process token at node)
3. Record operation in event ledger
4. If node produces output:
   a. Create new token(s)
   b. Add downstream nodes to activeTaskQueue
5. Repeat until queue is empty

Event Ledger (Event Sourcing)

The ledger is an append-only log of all node operations. Every state change, token transformation, and decision is recorded as an immutable event. This provides:

Full Audit Trail

Trace every token's path through the workflow, including all transformations and state transitions

Time-Travel Debugging

Replay execution to any point in time by replaying ledger events

Provenance Tracking

Understand exactly how and why a token reached its final state

Deterministic Replay

Rebuild any execution state by replaying events from the ledger

Ledger Event Structure

{
  "eventId": "evt_12345",
  "timestamp": 1705322400000,
  "nodeId": "process_node_1",
  "operation": "TOKEN_PROCESSED",
  "inputToken": {
    "id": "token_001",
    "correlationIds": ["solar_reading_001"],
    "data": { "reading": 42.5 }
  },
  "outputToken": {
    "id": "token_002",
    "correlationIds": ["solar_reading_001"],
    "data": { "reading": 42.5, "carbonCredits": 0.02125 }
  }
}

API: Accessing Node State

Get Current Node State

// SDK Method
const nodeState = engine.getNodeState('node_id');

// Returns:
{
  nodeId: 'compliance_fsm',
  currentState: 'APPROVED',
  tokensProcessed: 206,
  aggregates: { APPROVED: 206, UNDER_REVIEW: 3 },
  lastUpdate: 1705322400000
}

Use case: Check current state of FSM, Aggregator, or any node

Get Node Activity Log

// API Endpoint
GET /api/engine/templates/{templateId}/executions/{executionId}/nodes/{nodeId}/activity

// Returns:
[
  {
    "timestamp": 1705322400000,
    "operation": "TOKEN_PROCESSED",
    "tokenId": "token_001",
    "details": { "input": {...}, "output": {...} }
  },
  ...
]

Use case: View all operations performed by a specific node

Get Node Events

// API Endpoint
GET /api/engine/templates/{templateId}/executions/{executionId}/nodes/{nodeId}/events

// Returns ledger events filtered by node
[
  {
    "eventId": "evt_12345",
    "timestamp": 1705322400000,
    "operation": "STATE_TRANSITION",
    "from": "PENDING",
    "to": "APPROVED"
  },
  ...
]

Use case: Audit trail for specific node operations

API: Token Provenance & Lineage

Get Token Lineage

// API Endpoint
GET /api/engine/templates/{templateId}/executions/{executionId}/tokens/{correlationId}/lineage

// Returns full path of token through workflow
{
  "correlationId": "solar_reading_001",
  "path": [
    {
      "nodeId": "solar_readings_source",
      "timestamp": 1705322400000,
      "operation": "TOKEN_GENERATED",
      "data": { "reading": 42.5 }
    },
    {
      "nodeId": "compliance_fsm",
      "timestamp": 1705322401000,
      "operation": "STATE_TRANSITION",
      "from": "PENDING",
      "to": "APPROVED",
      "data": { "reading": 42.5, "anomalyCount": 0 }
    },
    {
      "nodeId": "carbon_calculator",
      "timestamp": 1705322402000,
      "operation": "TOKEN_TRANSFORMED",
      "data": { "reading": 42.5, "carbonCredits": 0.02125 }
    },
    {
      "nodeId": "rec_minter",
      "timestamp": 1705322403000,
      "operation": "TOKEN_COLLECTED",
      "data": { "reading": 42.5, "carbonCredits": 0.02125, "recId": "REC_001" }
    }
  ]
}

Use case: Understand complete journey of a token through the workflow

Correlation ID

Every token has one or more correlationIds that uniquely identify it across the workflow. Use these IDs to track tokens from source to sink.

Get Global Last Step

// SDK Method
const lastStep = engine.getCurrentTick();

// Returns current simulation tick/step number
const totalSteps = 1523;  // Example

Use case: Know how many steps have been executed in the simulation

Get Per-Node Last Step

// SDK Method
const nodeState = engine.getNodeState('node_id');
const lastUpdate = nodeState.lastUpdate;  // Timestamp of last operation

// Or via API
GET /api/engine/templates/{templateId}/executions/{executionId}/nodes/{nodeId}/activity?limit=1

// Returns most recent operation
[
  {
    "timestamp": 1705322403000,  // Last update time
    "operation": "TOKEN_PROCESSED",
    "step": 1523
  }
]

Use case: Check when a specific node last performed an operation

Summary

  • SimulationEngine: Core execution class using BFS strategy with active task queue
  • External Event Queue: Pre-loaded events mapped to DataSources via targetDataSourceId
  • Event Ledger: Immutable log of all operations for auditability and time-travel debugging
  • API Access: Full access to node state, activity logs, and token provenance via REST API