Parser Query Language
Extract, reconcile, and transform multi-source event streams with SPL-like syntax
How It Works
Overview
The Parser Query Language allows you to extract structured data from multiple heterogeneous data sources (streams) and reconcile events across these sources using correlation IDs. Each source can be a different format (JSON, logs, CSV) and the parser will extract fields, then combine matching events into unified records.
Processing Flow
- 1.Load Sources: Data files are loaded from Firebase Storage (e.g., JSON arrays, log files)
- 2.Parse Each Source: Each source is interpreted as a list of events based on its delimiter
- 3.Extract Fields: Fields are extracted using JSON dot notation or regex patterns
- 4.Reconcile by GROUP_BY: Events with matching correlation IDs across ALL sources are combined
- 5.Output Reconciled Events: Only events that exist in all specified sources are included
What Are Streams?
A stream (or source) is any data file that contains a sequence of events or records. Each stream is stored in Firebase Storage and can be in different formats:
File-Based Streams
Static files uploaded to Firebase Storage:
- •
kafka-device-readings.json - •
device-stream.log - •
api-conversion-logs.json
Future Support
Planned support for real-time streams:
- • Kafka topics (real-time)
- • HTTP webhooks
- • Database change streams
- • Message queues (RabbitMQ, SQS)
Note
Currently, all streams are file-based and loaded from Firebase Storage at pled/raw-sources/. Each file represents a complete stream of events that can be processed and reconciled with other streams.
Delimiters and Parsing
Each source is interpreted as a list of events based on its delimiter. The delimiter tells the parser how to split the raw data into individual events.
json-array
The file contains a JSON array where each element is one event.
// File: kafka-device-readings.json
[
{ "rawMessage": { "correlationId": "001", "deviceId": "DEV_1" } },
{ "rawMessage": { "correlationId": "002", "deviceId": "DEV_2" } }
]Extraction: Use JSON dot notation like rawMessage.correlationId
newline
Each line in the file is one event (log files, plain text).
// File: device-stream.log
2025-01-15T10:00:00Z device=DEV_1 correlation_id=001 status=active
2025-01-15T10:05:00Z device=DEV_2 correlation_id=002 status=activeExtraction: Use regex patterns like correlation_id=([^ ]+) to extract fields
csv / tsv
Comma-separated or tab-separated values with optional header row.
// File: sensor-data.csv
correlationId,deviceId,temperature,timestamp
001,DEV_1,22.5,2025-01-15T10:00:00Z
002,DEV_2,23.1,2025-01-15T10:05:00ZExtraction: Access by column name or index
Field Extraction
JSON Dot Notation
For JSON sources (json-array delimiter), use dot notation to navigate nested objects:
Example Query
correlationId={rawMessage.correlationId} &&
deviceId={rawMessage.deviceId} &&
reading={rawMessage.readings.meterReading} &&
timestamp={rawMessage.timestamp}
FROM {kafka-device-readings:json-array}Extracts From
{
"rawMessage": {
"correlationId": "solar_reading_001",
"deviceId": "DEVICE_123",
"readings": {
"meterReading": 42.5
},
"timestamp": "2025-01-15T10:00:00Z"
}
}Regex Patterns
For log files (newline delimiter), use regex with capture groups:
Example Query
timestamp={^([\d-]+T[\d:.]+Z)} &&
device={device=([^ ]+)} &&
correlationId={correlation_id=([^ ]+)} &&
status={status=(\w+)}
FROM {device-stream-log:newline}Extracts From
2025-01-15T10:00:00Z device=DEVICE_123 correlation_id=solar_reading_001 status=activeRegex Explanation:
- •
^([\\d-]+T[\\d:.]+Z)- Match timestamp at start of line - •
device=([^ ]+)- Match "device=" followed by non-space characters - •
correlation_id=([^ ]+)- Extract correlation ID value - •
status=(\\w+)- Match "status=" followed by word characters
Reconciliation with GROUP_BY
The GROUP_BY KEY={ field } clause is used to match and combine events from multiple sources based on a common correlation field. Each source in the query MUST extract the GROUP_BY field.
Requirements
- •Every source must define the GROUP_BY field (e.g.,
correlationId) - •The GROUP_BY field must have the same name across all sources
- •Only events that exist in ALL sources with the same correlation ID are reconciled
Multi-Source Example
correlationId={rawMessage.correlationId} &&
deviceId={rawMessage.deviceId} &&
reading={rawMessage.readings.meterReading}
FROM {kafka-device-readings:json-array} &&
correlationId={request.headers.X-Correlation-ID} &&
conversionFactor={response.body.conversionFactor}
FROM {api-conversion-logs:json-array} &&
correlationId={event.correlationId} &&
temperature={event.weather.temperature}
FROM {api-weather-logs:json-array}
GROUP_BY KEY={correlationId}Result:
If correlationId = "solar_reading_001" exists in:
- ✓
kafka-device-readings.json(with deviceId + reading) - ✓
api-conversion-logs.json(with conversionFactor) - ✓
api-weather-logs.json(with temperature)
Then a reconciled event is created with all fields:
{
"correlationId": "solar_reading_001",
"deviceId": "DEVICE_123",
"reading": 42.5,
"conversionFactor": 0.0005,
"temperature": 22.5
}Important
If correlationId = "solar_reading_002" exists in only 2 out of 3 sources, it will NOT be included in the reconciled output. All sources must have matching events for reconciliation to occur.
Query Syntax Reference
Basic Pattern
field1={path1} && field2={path2} FROM {source:delimiter} &&
field3={path3} FROM {source2:delimiter} GROUP_BY KEY={correlationField}Field Definition
fieldName={ extraction_pattern }
Defines a field to extract. Pattern can be JSON path or regex.
Source Specification
FROM { source_id:delimiter_type }
Specifies which data source and how to parse it.
Field Separator
&&
Separates multiple field definitions within a source or between sources.
Reconciliation Key
GROUP_BY KEY={ field_name }
Specifies the field to use for matching events across sources.
Try it Live
Visit the Parser Visualization to see the query language in action with real data from the carbon credit demo (209 reconciled events across 4 sources):
Open Parser Visualization →