End-to-End Examples
2 minute read
Complete Pipeline Example
This configuration demonstrates a complete pipeline that parses JSON logs, uses edx_code
for complex transformations, and forwards the results to a destination:
- name: kubernetes_input_busy
type: kubernetes_input
user_description: Application Logs
include:
- k8s.namespace.name=production
log_parsing_mode: full
- name: transform_sequence
type: sequence
user_description: Data Transformation Pipeline
processors:
- type: ottl_transform
metadata: '{"id":"data-prep","type":"ottl_transform","name":"Extract Fields"}'
data_types:
- log
statements: |
// Extract fields from parsed JSON body
set(attributes["user_id"], body["user"]["id"])
set(attributes["event_type"], body["event_type"])
set(attributes["score"], body["score"])
set(attributes["level"], body["level"])
// Type conversion for numeric operations
set(attributes["score"], Int(attributes["score"]))
- type: ottl_transform
metadata: '{"id":"transform","type":"ottl_transform","name":"Advanced Transforms"}'
data_types:
- log
statements: |
// Categorize score
edx_code("item['attributes']['score_category'] = item['attributes']['score'] > 80 ? 'excellent' : item['attributes']['score'] > 50 ? 'good' : 'needs_improvement';")
// Create user object if critical
edx_code("{
if (item['attributes']['level'] === 'ERROR' || item['attributes']['level'] === 'CRITICAL') {
if (!item['attributes']['alert']) item['attributes']['alert'] = {};
item['attributes']['alert']['user_id'] = item['attributes']['user_id'];
item['attributes']['alert']['severity'] = item['attributes']['level'];
item['attributes']['alert']['timestamp'] = new Date().toISOString();
}
}")
// Generate summary
edx_code("item['attributes']['summary'] = `User ${item['attributes']['user_id']}: ${item['attributes']['event_type']} - ${item['attributes']['score_category']}`;")
- name: http_output
type: http_output
user_description: Forward to Analytics
endpoint: https://analytics.example.com/logs
use_legacy_formatting: false
Input Sample
{
"user": {
"id": "user-123",
"email": "test@example.com"
},
"event_type": "login",
"score": 85,
"level": "INFO"
}
Output Sample
{
"user_id": "user-123",
"event_type": "login",
"score": 85,
"level": "INFO",
"score_category": "excellent",
"summary": "User user-123: login - excellent"
}
If the level were “ERROR”:
{
"user_id": "user-123",
"event_type": "login",
"score": 85,
"level": "ERROR",
"score_category": "excellent",
"alert": {
"user_id": "user-123",
"severity": "ERROR",
"timestamp": "2025-09-28T14:23:45.678Z"
},
"summary": "User user-123: login - excellent"
}
Use Case: Log Enrichment
Enrich logs with derived fields for better analysis:
// Extract user tier from score
edx_code("item['attributes']['user_tier'] = item['attributes']['score'] > 90 ? 'premium' : item['attributes']['score'] > 60 ? 'standard' : 'basic';")
// Add timestamp
edx_code("item['attributes']['enriched_at'] = new Date().toISOString();")
// Create searchable tags
edx_code("item['attributes']['tags'] = [item['attributes']['user_tier'], item['attributes']['level'], item['attributes']['event_type']];")
Use Case: Data Normalization
Normalize inconsistent data formats:
// Normalize level field
edx_code("{
const level = item['attributes']['level'].toUpperCase();
if (level === 'ERR' || level === 'ERROR') {
item['attributes']['normalized_level'] = 'ERROR';
} else if (level === 'WARN' || level === 'WARNING') {
item['attributes']['normalized_level'] = 'WARN';
} else if (level === 'INFO' || level === 'INFORMATION') {
item['attributes']['normalized_level'] = 'INFO';
} else {
item['attributes']['normalized_level'] = 'DEBUG';
}
}")
Use Case: Alert Generation
Generate alerts based on complex conditions:
edx_code("{
if (item['attributes']['level'] === 'ERROR' && item['attributes']['retry_count'] > 3) {
if (!item['attributes']['alert']) item['attributes']['alert'] = {};
item['attributes']['alert']['type'] = 'critical';
item['attributes']['alert']['message'] = `Repeated errors detected: ${item['attributes']['error_message']}`;
item['attributes']['alert']['retry_count'] = item['attributes']['retry_count'];
item['attributes']['alert']['requires_action'] = true;
}
}")