Edge Delta Log Transform Node

Transform logs using upsert and delete operations.

Overview

The Log Transform node modifies log data as it flows through the pipeline, applying context-specific transformations for each log processed. The Log Transform node calculates a new value for each individual log that processes through the node. It uses upsert (add or replace) and delete operations. Certain core log fields are protected during transformation, such as body and resources in the OTEL schema.

The only expressions the node will consider are

  • string literals represented by a single quoted string (e.g. ‘foo’),
  • Field References: Represented by unquoted identifiers with a preceding dot, or by using bracket notation for the structured log fields such as item["resource"]["src_type"] and
  • a small collection of Edge Delta specified special functions, including environment(key) to resolve an environment variable, and now() to provide the current time.

For deletion cases the default behavior is to do nothing if the field is not found.

Each transformation node is designed for enriching different sections of the data item:

Node Attribute Resource Body Timestamp Any Field Restrictions
Output Transform Y Y Y Y Y Outputs a Custom type data item that cant be ingested by the Edge Delta Archive node. The whole payload is flattened and sent as the event, with all other fields empty.
Log Transform Y N N Y N Can only ingest logs, and it outputs only logs.
Resource Transform N Y N N N Can only ingest logs, and it outputs only logs.
Mask N N Y N N Can only ingest logs, and it outputs only logs. New value can only be a static string.

Example Configuration 1

In this example a static string is used as well as a two field references, one of which changes dynamically with each log processed.

nodes:
- name: log_transform_test
  type: log_transform
  transformations:
  - field_path: attributes.newfield
    operation: upsert
    value: '"I added this new field value"'
  - field_path: attributes.anothernewfield
    operation: upsert
    value: item["resource"]["host.name"]
  - field_path: attributes.dynamicfield
    operation: upsert
    value: json(item["body"]).outcome

Suppose the following logs are sent through this pipeline. Each has a different outcome value.

{"timestamp":"2023-04-23T12:34:56.789Z","logLevel":"ERROR","serviceName":"AuthService","nodeId":"node4","message":"Login failed due to incorrect password","clientIP":"192.168.1.10","username":"user123","event":"login_attempt","outcome":"failure"}
{"timestamp":"2023-04-23T12:36:10.123Z","logLevel":"INFO","serviceName":"AuthService","nodeId":"node4","message":"User login successful","clientIP":"192.168.1.15","username":"user456","event":"login_attempt","outcome":"success"}
{"timestamp":"2023-04-23T12:37:30.456Z","logLevel":"WARN","serviceName":"AuthService","nodeId":"node4","message":"Login delayed due to system load","clientIP":"192.168.1.20","username":"user789","event":"login_attempt","outcome":"delayed"}

The following output logs are generated. The dynamicfield attribute shows a different value for each log, pulled from the outcome field within the log body.

Example Configuration 2

In the following example there are matching logs and logs that do not match the log transform node configuration.

nodes:
  - name: log_transform_example
    type: ​​log_transform
    transformations:
    - field_path: attributes.parsed
      operation: upsert
      value: json(item["body"])
    - field_path: attributes.original_timestamp
      operation: upsert
      value: item["attributes"]["parsed"]["timestamp"]
    - field_path: parsed_body
      operation: upsert
      value: item["attributes"]["parsed"]["msg"]

Example Logs

Matching Input

{
    "timestamp": "1581452773000000789",
    "body": "{\"timestamp\": \"2023/07/11 09:40:21\",\"msg\": \"Failed to do something\"}",
    "resource": {
      "host": "host-1",
      "tag": "app-dev",
      "__source": {
        "type": "K8s",
        "short_name": "short_source_name",
        "name": "source_name",
        "group_name": "group_name",
        "logical_name": "logical_source",
      },
      "k8s.namespace.name": "edgedelta",
      "k8s.pod.name": "api-deployment-d79fab72249c",
      "k8s.container.name": "echo:latest",
      "k8s.controller.logical_name":"Deployment",
      "k8s.labels.app":"my-api",
    },
    "type": "log",
    "attributes": {
        "pod_id":"api-deployment-d79fab72249c-vtq9x",
        "instance_id":"i-1234567890abcdef0",
        "instance_name":"test-name",
    }
}

Matching Output

{
    "timestamp": "1581452773000000789",
    "body": "Failed to do something",
    "resource": {
      "host": "host-1",
      "tag": "app-dev",
      "__source": {
        "type": "K8s",
        "short_name": "short_source_name",
        "name": "source_name",
        "group_name": "group_name",
        "logical_name": "logical_source",
      },
      "k8s.namespace.name": "edgedelta",
      "k8s.pod.name": "api-deployment-d79fab72249c",
      "k8s.container.name": "echo:latest",
      "k8s.controller.logical_name":"Deployment",
      "k8s.labels.app":"my-api",
    },
    "type": "log",
    "attributes": {
        "pod_id":"api-deployment-d79fab72249c-vtq9x",
        "instance_id":"i-1234567890abcdef0",
        "instance_name":"test-name",
        "parsed": {
          "timestamp": "2023/07/11 09:40:21",
          "msg": "Failed to do something",
        },
        "original_timestamp": "2023/07/11 09:40:21"
    }
}

Not Matching Input

{
    "timestamp": "1581452773000000789",
    "body": "hello world",
    "resource": {
      "host": "host-1",
      "tag": "app-dev",
      "src_type": "K8s",
      "__short_src_name": "short_source_name",
      "__src_name": "source_name",
      "__group_name": "group_name",
      "__logical_source": "logical_source",
      "k8s.namespace.name": "edgedelta",
      "k8s.pod.name": "api-deployment-d79fab72249c",
      "k8s.container.name": "echo:latest",
      "k8s.controller.logical_name":"Deployment",
      "k8s.labels.app":"my-api",
    },
    "type": "log",
    "attributes": {
        "pod_id":"api-deployment-d79fab72249c-vtq9x",
        "instance_id":"i-1234567890abcdef0",
        "instance_name":"test-name",
    }
}

Not Matching Output

{
    "timestamp": "1581452773000000789",
    "body": "",
    "resource": {
      "host": "host-1",
      "tag": "app-dev",
      "src_type": "K8s",
      "__short_src_name": "short_source_name",
      "__src_name": "source_name",
      "__group_name": "group_name",
      "__logical_source": "logical_source",
      "k8s.namespace.name": "edgedelta",
      "k8s.pod.name": "api-deployment-d79fab72249c",
      "k8s.container.name": "echo:latest",
      "k8s.labels.app":"my-api",
    },
    "type": "log",
    "attributes": {
        "pod_id":"api-deployment-d79fab72249c-vtq9x",
        "instance_id":"i-1234567890abcdef0",
        "instance_name":"test-name",
        "parsed": "",
        "original_timestamp": "",
    }
}

In the not matching example, the log is not in JSON format. Since the JSON parsing of the log message fails, an empty string is used to assign to the new fields. In addition, the body field that stores the log message is also replaced with the empty string, causing the original log message to be lost.

Required Parameters

name

A descriptive name for the node. This is the name that will appear in Visual Pipelines and you can reference this node in the yaml using the name. It must be unique across all nodes. It is a yaml list element so it begins with a - and a space followed by the string. It is a required parameter for all nodes.

nodes:
  - name: <node name>
    type: <node type>

type: log_transform

The type parameter specifies the type of node being configured. It is specified as a string from a closed list of node types. It is a required parameter.

nodes:
  - name: <node name>
    type: <node type>

Transformations

The transformations parameter is used to specify the log transformation operations. It consists of three child parameters:

  • operationis used to specify the transformation operation. Currently it can be delete or upsert, which updates the field if it exists or adds it if it doesn’t exist.
  • field_path is the dot separated path where the operation should be applied.
  • value is the CEL expression that determines the value to be applied and you can use CEL macros. With the OTEL schema the indexing method of CEL must be used, for example item["resource"]["host.name"] because OTEL has some fields that already contain dots in them.
nodes:
  - name: <node name>
    type: ​​log_transform
    transformations:
    - operation: upsert|delete
      field_path: <dot separated path>
      value: <CEL expression>