Edge Delta Log Transform Node
5 minute read
Overview
The Log Transform node modifies log data as it flows through the pipeline, applying context-specific transformations for each log processed. The Log Transform node calculates a new value for each individual log that processes through the node. It uses upsert
(add or replace) and delete
operations. Certain core log fields are protected during transformation, such as body
and resources
in the OTEL schema.
The only expressions the node will consider are
- string literals represented by a single quoted string (e.g. ‘foo’),
- Field References: Represented by unquoted identifiers with a preceding dot, or by using bracket notation for the structured log fields such as
item["resource"]["src_type"]
and - a small collection of Edge Delta specified special functions, including environment(key) to resolve an environment variable, and now() to provide the current time.
For deletion cases the default behavior is to do nothing if the field is not found.
Each transformation node is designed for enriching different sections of the data item:
Node | Attribute | Resource | Body | Timestamp | Any Field | Restrictions |
---|---|---|---|---|---|---|
Output Transform | Y | Y | Y | Y | Y | Outputs a Custom type data item that cant be ingested by the Edge Delta Archive node. The whole payload is flattened and sent as the event, with all other fields empty. |
Log Transform | Y | N | N | Y | N | Can only ingest logs, and it outputs only logs. |
Resource Transform | N | Y | N | N | N | Can only ingest logs, and it outputs only logs. |
Mask | N | N | Y | N | N | Can only ingest logs, and it outputs only logs. New value can only be a static string. |
Example Configuration 1
In this example a static string is used as well as a two field references, one of which changes dynamically with each log processed.
nodes:
- name: log_transform_test
type: log_transform
transformations:
- field_path: attributes.newfield
operation: upsert
value: '"I added this new field value"'
- field_path: attributes.anothernewfield
operation: upsert
value: item["resource"]["host.name"]
- field_path: attributes.dynamicfield
operation: upsert
value: json(item["body"]).outcome
Suppose the following logs are sent through this pipeline. Each has a different outcome
value.
{"timestamp":"2023-04-23T12:34:56.789Z","logLevel":"ERROR","serviceName":"AuthService","nodeId":"node4","message":"Login failed due to incorrect password","clientIP":"192.168.1.10","username":"user123","event":"login_attempt","outcome":"failure"}
{"timestamp":"2023-04-23T12:36:10.123Z","logLevel":"INFO","serviceName":"AuthService","nodeId":"node4","message":"User login successful","clientIP":"192.168.1.15","username":"user456","event":"login_attempt","outcome":"success"}
{"timestamp":"2023-04-23T12:37:30.456Z","logLevel":"WARN","serviceName":"AuthService","nodeId":"node4","message":"Login delayed due to system load","clientIP":"192.168.1.20","username":"user789","event":"login_attempt","outcome":"delayed"}
The following output logs are generated. The dynamicfield
attribute shows a different value for each log, pulled from the outcome field within the log body.
Example Configuration 2
In the following example there are matching logs and logs that do not match the log transform node configuration.
nodes:
- name: log_transform_example
type: log_transform
transformations:
- field_path: attributes.parsed
operation: upsert
value: json(item["body"])
- field_path: attributes.original_timestamp
operation: upsert
value: item["attributes"]["parsed"]["timestamp"]
- field_path: parsed_body
operation: upsert
value: item["attributes"]["parsed"]["msg"]
Example Logs
Matching Input
{
"timestamp": "1581452773000000789",
"body": "{\"timestamp\": \"2023/07/11 09:40:21\",\"msg\": \"Failed to do something\"}",
"resource": {
"host": "host-1",
"tag": "app-dev",
"__source": {
"type": "K8s",
"short_name": "short_source_name",
"name": "source_name",
"group_name": "group_name",
"logical_name": "logical_source",
},
"k8s.namespace.name": "edgedelta",
"k8s.pod.name": "api-deployment-d79fab72249c",
"k8s.container.name": "echo:latest",
"k8s.controller.logical_name":"Deployment",
"k8s.labels.app":"my-api",
},
"type": "log",
"attributes": {
"pod_id":"api-deployment-d79fab72249c-vtq9x",
"instance_id":"i-1234567890abcdef0",
"instance_name":"test-name",
}
}
Matching Output
{
"timestamp": "1581452773000000789",
"body": "Failed to do something",
"resource": {
"host": "host-1",
"tag": "app-dev",
"__source": {
"type": "K8s",
"short_name": "short_source_name",
"name": "source_name",
"group_name": "group_name",
"logical_name": "logical_source",
},
"k8s.namespace.name": "edgedelta",
"k8s.pod.name": "api-deployment-d79fab72249c",
"k8s.container.name": "echo:latest",
"k8s.controller.logical_name":"Deployment",
"k8s.labels.app":"my-api",
},
"type": "log",
"attributes": {
"pod_id":"api-deployment-d79fab72249c-vtq9x",
"instance_id":"i-1234567890abcdef0",
"instance_name":"test-name",
"parsed": {
"timestamp": "2023/07/11 09:40:21",
"msg": "Failed to do something",
},
"original_timestamp": "2023/07/11 09:40:21"
}
}
Not Matching Input
{
"timestamp": "1581452773000000789",
"body": "hello world",
"resource": {
"host": "host-1",
"tag": "app-dev",
"src_type": "K8s",
"__short_src_name": "short_source_name",
"__src_name": "source_name",
"__group_name": "group_name",
"__logical_source": "logical_source",
"k8s.namespace.name": "edgedelta",
"k8s.pod.name": "api-deployment-d79fab72249c",
"k8s.container.name": "echo:latest",
"k8s.controller.logical_name":"Deployment",
"k8s.labels.app":"my-api",
},
"type": "log",
"attributes": {
"pod_id":"api-deployment-d79fab72249c-vtq9x",
"instance_id":"i-1234567890abcdef0",
"instance_name":"test-name",
}
}
Not Matching Output
{
"timestamp": "1581452773000000789",
"body": "",
"resource": {
"host": "host-1",
"tag": "app-dev",
"src_type": "K8s",
"__short_src_name": "short_source_name",
"__src_name": "source_name",
"__group_name": "group_name",
"__logical_source": "logical_source",
"k8s.namespace.name": "edgedelta",
"k8s.pod.name": "api-deployment-d79fab72249c",
"k8s.container.name": "echo:latest",
"k8s.labels.app":"my-api",
},
"type": "log",
"attributes": {
"pod_id":"api-deployment-d79fab72249c-vtq9x",
"instance_id":"i-1234567890abcdef0",
"instance_name":"test-name",
"parsed": "",
"original_timestamp": "",
}
}
In the not matching example, the log is not in JSON format. Since the JSON parsing of the log message fails, an empty string is used to assign to the new fields. In addition, the body field that stores the log message is also replaced with the empty string, causing the original log message to be lost.
Required Parameters
name
A descriptive name for the node. This is the name that will appear in Visual Pipelines and you can reference this node in the yaml using the name. It must be unique across all nodes. It is a yaml list element so it begins with a -
and a space followed by the string. It is a required parameter for all nodes.
nodes:
- name: <node name>
type: <node type>
type: log_transform
The type
parameter specifies the type of node being configured. It is specified as a string from a closed list of node types. It is a required parameter.
nodes:
- name: <node name>
type: <node type>
Transformations
The transformations
parameter is used to specify the log transformation operations. It consists of three child parameters:
operation
is used to specify the transformation operation. Currently it can be delete or upsert, which updates the field if it exists or adds it if it doesn’t exist.field_path
is the dot separated path where the operation should be applied.value
is the CEL expression that determines the value to be applied and you can use CEL macros. With the OTEL schema the indexing method of CEL must be used, for exampleitem["resource"]["host.name"]
because OTEL has some fields that already contain dots in them.
nodes:
- name: <node name>
type: log_transform
transformations:
- operation: upsert|delete
field_path: <dot separated path>
value: <CEL expression>