AWS Kinesis Transform Pack

This is a pack that allows for processing of AWS Kinesis logs. The pack includes unrolling the json, parsing and extracting the json.

Edge Delta Pipeline Pack for Kinesis Input Transform

Overview

The Edge Delta Kinesis Input Transform pack processes logs by unrolling JSON records from Kinesis data streams and transforming them into a standardized format suitable for monitoring and analytical purposes. It optimizes logs for uniformity and readiness for advanced monitoring and analytical operations. These transformations streamline the integration of Kinesis log data into Edge Delta’s observability stack, enhancing data utility and simplifying system-side log handling.

Pack Description

1. Data Ingestion

The data flow starts with the Pack Source as the entry point into the pack where all logs start their processing journey.

2. JSON Unroll

Logs are processed by the JSON Unroll Processor node, which is a JSON Unroll node.

- name: JSON Unroll Processor
  type: json_unroll
  field_path: 'item["body"]'
  json_field_path: Records

This node unrolls JSON data records from the Kinesis stream. It takes the body field and extracts the Records field from it. This process is crucial for breaking down the bulk JSON data into individual records that can be further processed individually. This unrolling makes it easier to handle each record separately for more detailed analysis and transformation down the line.

3. OTTL Transformation

The transformed data flows to the OTTL Transform node.

- name: OTTL Transform
  type: ottl_transform
  statements: |-
    // Prepare for normalization
    set(cache["decoded_body"], Decode(body, "utf-8"))
    set(cache["parsed_body"],ParseJSON(cache["decoded_body"]))
    set(attributes,cache["parsed_body"]["Records"])
    set(body,Decode(attributes["kinesis"]["data"],"base64"))

    // Match ED OTEL format
    set(resource["ed.source.name"],attributes["eventSource"])
    set(resource["ed.source.type"],attributes["eventName"])    

This node uses OTTL to handle the records extracted previously:

  1. Decoding and Parsing: It decodes the body as UTF-8, then parses the JSON to make the data accessible as structured attributes.
  2. Attribute Setting: Sets the attributes using the parsed JSON fields, including decoding the base64 encoded data.
  3. Normalizing to OTEL Format: The resource attributes are set to match the Edge Delta OTEL format by mapping eventSource and eventName. This makes the data fit into a familiar structure used within monitoring tools, aiding in consistency and easier log analysis.

4. Data Routing

The data is conditionally routed to either Successful Destination or Fail Destination compound output nodes.

- name: Successful Destination
  type: compound_output

- name: Fail Destination
  type: compound_output

Logs processed successfully are routed to the Successful Destination, indicating that the transformation was complete and the logs are ready for further processing or storage. Logs that encounter issues are routed to the Fail Destination, allowing for retries or diagnostics to address the processing failures.

Sample Input

{"Records": [{"kinesis": {"kinesisSchemaVersion": "1.0", "partitionKey": "1", "sequenceNumber": "49590338271490256608559692538361571095921575989136588898", "data": "SGVsbG8s7HR5a3MgaPFgYSBD61XN0Lg==", "approximateArrivalTimestamp": 1545084650.987}, "eventSource": "aws:kinesis", "eventVersion": "1.0", "eventID": "shardId-000000000006:495903382714902566085596925383615736588898", "eventName": "aws:kinesis:record", "invokeIdentityArn": "arn:aws:iam::123456789012:role/lambda-role", "awsRegion": "us-east-2", "eventSourceARN": "arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream"}, {"kinesis": {"kinesisSchemaVersion": "1.0", "partitionKey": "1", "sequenceNumber": "49590338271490256608559692540925702759324208523137515618", "data": "VGhpcyBpcyBvbmx5IGEgdGVzdC4=", "approximateArrivalTimestamp": 1545084711.166}, "eventSource": "aws:kinesis", "eventVersion": "1.0", "eventID": "shardId-000000000006:49590338271490256608559692540925702759324208523137515618", "eventName": "aws:kinesis:record", "invokeIdentityArn": "arn:aws:iam::123456789012:role/lambda-role", "awsRegion": "us-east-2", "eventSourceARN": "arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream"}]}