SDK & Engine Architecture
Understanding the SimulationEngine, event sourcing, and workflow execution
SimulationEngine
Overview
The SimulationEngine is the core class that executes workflows defined in the V3 DSL (Domain-Specific Language). It processes tokens through a directed graph of nodes, maintaining an event-sourced ledger of all operations for full auditability and time-travel debugging.
Key Characteristics
- •Singleton Pattern: While not enforced as a strict singleton, typically one engine instance per execution
- •Immutable Event Log: All state changes recorded in append-only ledger
- •BFS Execution Strategy: Uses active task queue for breadth-first traversal
- •Deterministic: Given same inputs and DSL, produces identical results
External Event Queue
The ExternalEventQueue is a pre-loaded queue of events that can be mapped to DataSource nodes in the workflow. This allows you to feed real event data (e.g., from the Parser) into the engine instead of relying on generated data.
How It Works
- 1.Load Events: Populate the external event queue with parsed/reconciled events
- 2.Map to DataSources: Each event has a
targetDataSourceIdspecifying which DataSource node it belongs to - 3.Engine Initialization: Engine reads DSL (which contains DataSource definitions) and establishes the mapping
- 4.Step-by-Step Extraction: At each engine step, the active DataSource extracts events from the queue and emits them as tokens
- 5.Downstream Processing: Tokens flow through the DSL graph (ProcessNodes, FSMs, Aggregators, etc.)
Code Example
import { SimulationEngine } from './core';
import { ExternalEventQueue } from './core/ExternalEventQueue';
// Initialize engine with DSL
const engine = new SimulationEngine();
engine.initialize(dslTemplate);
// Get external event queue
const eventQueue = engine.getExternalEventQueue();
// Add events with target DataSource mapping
solarEvents.forEach(event => {
eventQueue.addEvent({
...event,
targetDataSourceId: 'solar_readings_source' // Maps to DataSource node
});
});
// Run simulation step-by-step
while (engine.getQueue().size() > 0) {
await engine.step();
}
// Access results
const results = engine.getNodeState('final_sink');Execution Model: BFS Strategy
The engine uses a Breadth-First Search (BFS) execution strategy implemented via an active task queue. This ensures that all nodes at a given "level" of the graph complete before moving to the next level.
Active Task Queue
The activeTaskQueue contains tasks (node operations) that are ready to execute.
- •Step 1: DataSource nodes generate tokens and add downstream nodes to the queue
- •Step 2: ProcessNodes receive tokens, transform them, and add their downstream nodes to the queue
- •Step 3: FSMs update state, Aggregators collect tokens, Multiplexers route tokens
- •Step 4: Sinks collect final output
Execution Flow
// Pseudo-code for engine.step()
1. Pop next task from activeTaskQueue
2. Execute task (process token at node)
3. Record operation in event ledger
4. If node produces output:
a. Create new token(s)
b. Add downstream nodes to activeTaskQueue
5. Repeat until queue is emptyEvent Ledger (Event Sourcing)
The ledger is an append-only log of all node operations. Every state change, token transformation, and decision is recorded as an immutable event. This provides:
Full Audit Trail
Trace every token's path through the workflow, including all transformations and state transitions
Time-Travel Debugging
Replay execution to any point in time by replaying ledger events
Provenance Tracking
Understand exactly how and why a token reached its final state
Deterministic Replay
Rebuild any execution state by replaying events from the ledger
Ledger Event Structure
{
"eventId": "evt_12345",
"timestamp": 1705322400000,
"nodeId": "process_node_1",
"operation": "TOKEN_PROCESSED",
"inputToken": {
"id": "token_001",
"correlationIds": ["solar_reading_001"],
"data": { "reading": 42.5 }
},
"outputToken": {
"id": "token_002",
"correlationIds": ["solar_reading_001"],
"data": { "reading": 42.5, "carbonCredits": 0.02125 }
}
}API: Accessing Node State
Get Current Node State
// SDK Method
const nodeState = engine.getNodeState('node_id');
// Returns:
{
nodeId: 'compliance_fsm',
currentState: 'APPROVED',
tokensProcessed: 206,
aggregates: { APPROVED: 206, UNDER_REVIEW: 3 },
lastUpdate: 1705322400000
}Use case: Check current state of FSM, Aggregator, or any node
Get Node Activity Log
// API Endpoint
GET /api/engine/templates/{templateId}/executions/{executionId}/nodes/{nodeId}/activity
// Returns:
[
{
"timestamp": 1705322400000,
"operation": "TOKEN_PROCESSED",
"tokenId": "token_001",
"details": { "input": {...}, "output": {...} }
},
...
]Use case: View all operations performed by a specific node
Get Node Events
// API Endpoint
GET /api/engine/templates/{templateId}/executions/{executionId}/nodes/{nodeId}/events
// Returns ledger events filtered by node
[
{
"eventId": "evt_12345",
"timestamp": 1705322400000,
"operation": "STATE_TRANSITION",
"from": "PENDING",
"to": "APPROVED"
},
...
]Use case: Audit trail for specific node operations
API: Token Provenance & Lineage
Get Token Lineage
// API Endpoint
GET /api/engine/templates/{templateId}/executions/{executionId}/tokens/{correlationId}/lineage
// Returns full path of token through workflow
{
"correlationId": "solar_reading_001",
"path": [
{
"nodeId": "solar_readings_source",
"timestamp": 1705322400000,
"operation": "TOKEN_GENERATED",
"data": { "reading": 42.5 }
},
{
"nodeId": "compliance_fsm",
"timestamp": 1705322401000,
"operation": "STATE_TRANSITION",
"from": "PENDING",
"to": "APPROVED",
"data": { "reading": 42.5, "anomalyCount": 0 }
},
{
"nodeId": "carbon_calculator",
"timestamp": 1705322402000,
"operation": "TOKEN_TRANSFORMED",
"data": { "reading": 42.5, "carbonCredits": 0.02125 }
},
{
"nodeId": "rec_minter",
"timestamp": 1705322403000,
"operation": "TOKEN_COLLECTED",
"data": { "reading": 42.5, "carbonCredits": 0.02125, "recId": "REC_001" }
}
]
}Use case: Understand complete journey of a token through the workflow
Correlation ID
Every token has one or more correlationIds that uniquely identify it across the workflow. Use these IDs to track tokens from source to sink.
Get Global Last Step
// SDK Method
const lastStep = engine.getCurrentTick();
// Returns current simulation tick/step number
const totalSteps = 1523; // ExampleUse case: Know how many steps have been executed in the simulation
Get Per-Node Last Step
// SDK Method
const nodeState = engine.getNodeState('node_id');
const lastUpdate = nodeState.lastUpdate; // Timestamp of last operation
// Or via API
GET /api/engine/templates/{templateId}/executions/{executionId}/nodes/{nodeId}/activity?limit=1
// Returns most recent operation
[
{
"timestamp": 1705322403000, // Last update time
"operation": "TOKEN_PROCESSED",
"step": 1523
}
]Use case: Check when a specific node last performed an operation
Summary
- •SimulationEngine: Core execution class using BFS strategy with active task queue
- •External Event Queue: Pre-loaded events mapped to DataSources via
targetDataSourceId - •Event Ledger: Immutable log of all operations for auditability and time-travel debugging
- •API Access: Full access to node state, activity logs, and token provenance via REST API