End-to-End Examples

Complete pipeline configurations demonstrating edx_code in real-world scenarios.

Complete Pipeline Example

This configuration demonstrates a complete pipeline that parses JSON logs, uses edx_code for complex transformations, and forwards the results to a destination:

- name: kubernetes_input_busy
  type: kubernetes_input
  user_description: Application Logs
  include:
  - k8s.namespace.name=production
  log_parsing_mode: full

- name: transform_sequence
  type: sequence
  user_description: Data Transformation Pipeline
  processors:
  - type: ottl_transform
    metadata: '{"id":"data-prep","type":"ottl_transform","name":"Extract Fields"}'
    data_types:
    - log
    statements: |
      // Extract fields from parsed JSON body
      set(attributes["user_id"], body["user"]["id"])
      set(attributes["event_type"], body["event_type"])
      set(attributes["score"], body["score"])
      set(attributes["level"], body["level"])

      // Type conversion for numeric operations
      set(attributes["score"], Int(attributes["score"]))      

  - type: ottl_transform
    metadata: '{"id":"transform","type":"ottl_transform","name":"Advanced Transforms"}'
    data_types:
    - log
    statements: |
      // Categorize score
      edx_code("item['attributes']['score_category'] = item['attributes']['score'] > 80 ? 'excellent' : item['attributes']['score'] > 50 ? 'good' : 'needs_improvement';")

      // Create user object if critical
      edx_code("{
        if (item['attributes']['level'] === 'ERROR' || item['attributes']['level'] === 'CRITICAL') {
          if (!item['attributes']['alert']) item['attributes']['alert'] = {};
          item['attributes']['alert']['user_id'] = item['attributes']['user_id'];
          item['attributes']['alert']['severity'] = item['attributes']['level'];
          item['attributes']['alert']['timestamp'] = new Date().toISOString();
        }
      }")

      // Generate summary
      edx_code("item['attributes']['summary'] = `User ${item['attributes']['user_id']}: ${item['attributes']['event_type']} - ${item['attributes']['score_category']}`;")      

- name: http_output
  type: http_output
  user_description: Forward to Analytics
  endpoint: https://analytics.example.com/logs
  use_legacy_formatting: false

Input Sample

{
  "user": {
    "id": "user-123",
    "email": "test@example.com"
  },
  "event_type": "login",
  "score": 85,
  "level": "INFO"
}

Output Sample

{
  "user_id": "user-123",
  "event_type": "login",
  "score": 85,
  "level": "INFO",
  "score_category": "excellent",
  "summary": "User user-123: login - excellent"
}

If the level were “ERROR”:

{
  "user_id": "user-123",
  "event_type": "login",
  "score": 85,
  "level": "ERROR",
  "score_category": "excellent",
  "alert": {
    "user_id": "user-123",
    "severity": "ERROR",
    "timestamp": "2025-09-28T14:23:45.678Z"
  },
  "summary": "User user-123: login - excellent"
}

Use Case: Log Enrichment

Enrich logs with derived fields for better analysis:

// Extract user tier from score
edx_code("item['attributes']['user_tier'] = item['attributes']['score'] > 90 ? 'premium' : item['attributes']['score'] > 60 ? 'standard' : 'basic';")

// Add timestamp
edx_code("item['attributes']['enriched_at'] = new Date().toISOString();")

// Create searchable tags
edx_code("item['attributes']['tags'] = [item['attributes']['user_tier'], item['attributes']['level'], item['attributes']['event_type']];")

Use Case: Data Normalization

Normalize inconsistent data formats:

// Normalize level field
edx_code("{
  const level = item['attributes']['level'].toUpperCase();
  if (level === 'ERR' || level === 'ERROR') {
    item['attributes']['normalized_level'] = 'ERROR';
  } else if (level === 'WARN' || level === 'WARNING') {
    item['attributes']['normalized_level'] = 'WARN';
  } else if (level === 'INFO' || level === 'INFORMATION') {
    item['attributes']['normalized_level'] = 'INFO';
  } else {
    item['attributes']['normalized_level'] = 'DEBUG';
  }
}")

Use Case: Alert Generation

Generate alerts based on complex conditions:

edx_code("{
  if (item['attributes']['level'] === 'ERROR' && item['attributes']['retry_count'] > 3) {
    if (!item['attributes']['alert']) item['attributes']['alert'] = {};
    item['attributes']['alert']['type'] = 'critical';
    item['attributes']['alert']['message'] = `Repeated errors detected: ${item['attributes']['error_message']}`;
    item['attributes']['alert']['retry_count'] = item['attributes']['retry_count'];
    item['attributes']['alert']['requires_action'] = true;
  }
}")