Parser Query Language

Extract, reconcile, and transform multi-source event streams with SPL-like syntax

How It Works

Overview

The Parser Query Language allows you to extract structured data from multiple heterogeneous data sources (streams) and reconcile events across these sources using correlation IDs. Each source can be a different format (JSON, logs, CSV) and the parser will extract fields, then combine matching events into unified records.

Processing Flow

  1. 1.Load Sources: Data files are loaded from Firebase Storage (e.g., JSON arrays, log files)
  2. 2.Parse Each Source: Each source is interpreted as a list of events based on its delimiter
  3. 3.Extract Fields: Fields are extracted using JSON dot notation or regex patterns
  4. 4.Reconcile by GROUP_BY: Events with matching correlation IDs across ALL sources are combined
  5. 5.Output Reconciled Events: Only events that exist in all specified sources are included

What Are Streams?

A stream (or source) is any data file that contains a sequence of events or records. Each stream is stored in Firebase Storage and can be in different formats:

File-Based Streams

Static files uploaded to Firebase Storage:

  • kafka-device-readings.json
  • device-stream.log
  • api-conversion-logs.json

Future Support

Planned support for real-time streams:

  • • Kafka topics (real-time)
  • • HTTP webhooks
  • • Database change streams
  • • Message queues (RabbitMQ, SQS)

Note

Currently, all streams are file-based and loaded from Firebase Storage at pled/raw-sources/. Each file represents a complete stream of events that can be processed and reconciled with other streams.

Delimiters and Parsing

Each source is interpreted as a list of events based on its delimiter. The delimiter tells the parser how to split the raw data into individual events.

json-array

The file contains a JSON array where each element is one event.

// File: kafka-device-readings.json
[
  { "rawMessage": { "correlationId": "001", "deviceId": "DEV_1" } },
  { "rawMessage": { "correlationId": "002", "deviceId": "DEV_2" } }
]

Extraction: Use JSON dot notation like rawMessage.correlationId

newline

Each line in the file is one event (log files, plain text).

// File: device-stream.log
2025-01-15T10:00:00Z device=DEV_1 correlation_id=001 status=active
2025-01-15T10:05:00Z device=DEV_2 correlation_id=002 status=active

Extraction: Use regex patterns like correlation_id=([^ ]+) to extract fields

csv / tsv

Comma-separated or tab-separated values with optional header row.

// File: sensor-data.csv
correlationId,deviceId,temperature,timestamp
001,DEV_1,22.5,2025-01-15T10:00:00Z
002,DEV_2,23.1,2025-01-15T10:05:00Z

Extraction: Access by column name or index

Field Extraction

JSON Dot Notation

For JSON sources (json-array delimiter), use dot notation to navigate nested objects:

Example Query

correlationId={rawMessage.correlationId} &&
deviceId={rawMessage.deviceId} &&
reading={rawMessage.readings.meterReading} &&
timestamp={rawMessage.timestamp}
FROM {kafka-device-readings:json-array}

Extracts From

{
  "rawMessage": {
    "correlationId": "solar_reading_001",
    "deviceId": "DEVICE_123",
    "readings": {
      "meterReading": 42.5
    },
    "timestamp": "2025-01-15T10:00:00Z"
  }
}

Regex Patterns

For log files (newline delimiter), use regex with capture groups:

Example Query

timestamp={^([\d-]+T[\d:.]+Z)} &&
device={device=([^ ]+)} &&
correlationId={correlation_id=([^ ]+)} &&
status={status=(\w+)}
FROM {device-stream-log:newline}

Extracts From

2025-01-15T10:00:00Z device=DEVICE_123 correlation_id=solar_reading_001 status=active

Regex Explanation:

  • ^([\\d-]+T[\\d:.]+Z) - Match timestamp at start of line
  • device=([^ ]+) - Match "device=" followed by non-space characters
  • correlation_id=([^ ]+) - Extract correlation ID value
  • status=(\\w+) - Match "status=" followed by word characters

Reconciliation with GROUP_BY

The GROUP_BY KEY={ field } clause is used to match and combine events from multiple sources based on a common correlation field. Each source in the query MUST extract the GROUP_BY field.

Requirements

  • Every source must define the GROUP_BY field (e.g., correlationId)
  • The GROUP_BY field must have the same name across all sources
  • Only events that exist in ALL sources with the same correlation ID are reconciled

Multi-Source Example

correlationId={rawMessage.correlationId} &&
deviceId={rawMessage.deviceId} &&
reading={rawMessage.readings.meterReading}
FROM {kafka-device-readings:json-array} &&

correlationId={request.headers.X-Correlation-ID} &&
conversionFactor={response.body.conversionFactor}
FROM {api-conversion-logs:json-array} &&

correlationId={event.correlationId} &&
temperature={event.weather.temperature}
FROM {api-weather-logs:json-array}

GROUP_BY KEY={correlationId}

Result:

If correlationId = "solar_reading_001" exists in:

  • kafka-device-readings.json (with deviceId + reading)
  • api-conversion-logs.json (with conversionFactor)
  • api-weather-logs.json (with temperature)

Then a reconciled event is created with all fields:

{
  "correlationId": "solar_reading_001",
  "deviceId": "DEVICE_123",
  "reading": 42.5,
  "conversionFactor": 0.0005,
  "temperature": 22.5
}

Important

If correlationId = "solar_reading_002" exists in only 2 out of 3 sources, it will NOT be included in the reconciled output. All sources must have matching events for reconciliation to occur.

Query Syntax Reference

Basic Pattern

field1={path1} && field2={path2} FROM {source:delimiter} &&
field3={path3} FROM {source2:delimiter} GROUP_BY KEY={correlationField}

Field Definition

fieldName={ extraction_pattern }

Defines a field to extract. Pattern can be JSON path or regex.

Source Specification

FROM { source_id:delimiter_type }

Specifies which data source and how to parse it.

Field Separator

&&

Separates multiple field definitions within a source or between sources.

Reconciliation Key

GROUP_BY KEY={ field_name }

Specifies the field to use for matching events across sources.

Try it Live

Visit the Parser Visualization to see the query language in action with real data from the carbon credit demo (209 reconciled events across 4 sources):

Open Parser Visualization →