AWS Kinesis Transform Pack
3 minute read
Edge Delta Pipeline Pack for Kinesis Input Transform
Overview
The Edge Delta Kinesis Input Transform pack processes logs by unrolling JSON records from Kinesis data streams and transforming them into a standardized format suitable for monitoring and analytical purposes. It optimizes logs for uniformity and readiness for advanced monitoring and analytical operations. These transformations streamline the integration of Kinesis log data into Edge Delta’s observability stack, enhancing data utility and simplifying system-side log handling.
Pack Description
1. Data Ingestion
The data flow starts with the Pack Source as the entry point into the pack where all logs start their processing journey.
2. JSON Unroll
Logs are processed by the JSON Unroll Processor node, which is a JSON Unroll node.
- name: JSON Unroll Processor
type: json_unroll
field_path: 'item["body"]'
json_field_path: Records
This node unrolls JSON data records from the Kinesis stream. It takes the body field and extracts the Records field from it. This process is crucial for breaking down the bulk JSON data into individual records that can be further processed individually. This unrolling makes it easier to handle each record separately for more detailed analysis and transformation down the line.
3. OTTL Transformation
The transformed data flows to the OTTL Transform node.
- name: OTTL Transform
type: ottl_transform
statements: |-
// Prepare for normalization
set(cache["decoded_body"], Decode(body, "utf-8"))
set(cache["parsed_body"],ParseJSON(cache["decoded_body"]))
set(attributes,cache["parsed_body"]["Records"])
set(body,Decode(attributes["kinesis"]["data"],"base64"))
// Match ED OTEL format
set(resource["ed.source.name"],attributes["eventSource"])
set(resource["ed.source.type"],attributes["eventName"])
This node uses OTTL to handle the records extracted previously:
- Decoding and Parsing: It decodes the
bodyas UTF-8, then parses the JSON to make the data accessible as structured attributes. - Attribute Setting: Sets the attributes using the parsed JSON fields, including decoding the base64 encoded data.
- Normalizing to OTEL Format: The resource attributes are set to match the Edge Delta OTEL format by mapping
eventSourceandeventName. This makes the data fit into a familiar structure used within monitoring tools, aiding in consistency and easier log analysis.
4. Data Routing
The data is conditionally routed to either Successful Destination or Fail Destination compound output nodes.
- name: Successful Destination
type: compound_output
- name: Fail Destination
type: compound_output
Logs processed successfully are routed to the Successful Destination, indicating that the transformation was complete and the logs are ready for further processing or storage. Logs that encounter issues are routed to the Fail Destination, allowing for retries or diagnostics to address the processing failures.
Sample Input
{"Records": [{"kinesis": {"kinesisSchemaVersion": "1.0", "partitionKey": "1", "sequenceNumber": "49590338271490256608559692538361571095921575989136588898", "data": "SGVsbG8s7HR5a3MgaPFgYSBD61XN0Lg==", "approximateArrivalTimestamp": 1545084650.987}, "eventSource": "aws:kinesis", "eventVersion": "1.0", "eventID": "shardId-000000000006:495903382714902566085596925383615736588898", "eventName": "aws:kinesis:record", "invokeIdentityArn": "arn:aws:iam::123456789012:role/lambda-role", "awsRegion": "us-east-2", "eventSourceARN": "arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream"}, {"kinesis": {"kinesisSchemaVersion": "1.0", "partitionKey": "1", "sequenceNumber": "49590338271490256608559692540925702759324208523137515618", "data": "VGhpcyBpcyBvbmx5IGEgdGVzdC4=", "approximateArrivalTimestamp": 1545084711.166}, "eventSource": "aws:kinesis", "eventVersion": "1.0", "eventID": "shardId-000000000006:49590338271490256608559692540925702759324208523137515618", "eventName": "aws:kinesis:record", "invokeIdentityArn": "arn:aws:iam::123456789012:role/lambda-role", "awsRegion": "us-east-2", "eventSourceARN": "arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream"}]}