Source Detection with Edge Delta
Automatically detect data sources when installing a Fleet.
6 minute read
To design an effective data handling pipeline you should have a good understanding of the data your workloads generate. It is important to understand their structure and content, as well as whether they are homogeneous - of the same type and structure. You should gather a sample of the log structure you want to design a pipeline for. If your logs are not homogenous, you should gather one sample for each different data structure. You will use these samples to test node and pipeline function. You may want to gather 2 or 3 logs of each structure to have a sense of the range of values they may contain.
Consider the following set of logs that, for the purposes of this discussion, emanate from a single pipeline source:
{"timestamp": "2024-09-16T01:55:00.745903Z", "logLevel": "ERROR", "serviceName": "APIGateway", "nodeId": "node4", "message": "Incorrect password, user failed to authenticate.", "clientIP": "192.168.1.202", "username": "user982", "event": "login_failed", "outcome": "failure"}
{"timestamp": "2024-09-16T01:54:51.192456Z", "logLevel": "INFO", "serviceName": "DataService", "nodeId": "node3", "message": "The user has logged in successfully.", "clientIP": "192.168.1.166", "username": "user209", "event": "user_logged_in", "outcome": "success"}
{"timestamp": "2024-09-16T01:54:48.346160Z", "logLevel": "WARN", "serviceName": "InventoryService", "nodeId": "node2", "message": "Authentication process is taking longer than expected.", "clientIP": "192.168.1.248", "username": "user316", "event": "login_delayed", "outcome": "delayed"}
When logs are ingested into the pipeline, the entire log becomes the body and metadata is added to the log to build an OTEL data item.
Bear in mind that the OTEL source node attempts to use the incoming OTEL log fields.
See more examples of the data items. To understand how data is escaped, see Understand Escaping Characters.
To design effective log and metric pipelines, you must have a comprehensive understanding of the data handling requirements. These include business-driven factors such as cost-efficiency and adherence to legal mandates, data-specific needs such as volume capacity and optimization of data throughput, information security, and maintainability.
For the purposes of this document, the first log (node4 logs) should be enriched with a dynamic field based on the value of a field in the JSON body called outcome
.
Create a rough or conceptual pipeline containing the nodes whose functions fulfil the requirements. Consider the sequence of nodes and opportunities for branching the pipeline in paths. Develop a high level understanding of what your data should look like as it progresses through the pipeline to meet your requirements. For example, the first node might mask a specific field, while the next might extract a field from the body and convert it into an attribute. A parallel path might be required to also generate metrics or trigger alerts against a threshold. Consider the data destination data format requirements.
Assume for this example, the log sample comes from a single source node. Therefore, data needs to be routed appropriately on separate downstream paths to their respective processors (or, in a real world application, to a series of processors or perhaps a compound node). From there, data will be piped on to one or more outputs.
Now create a pipeline based on the conceptual design. To start, a Route node needs to be configured. In this scenario, the node4
keyword will be used to route node4 logs to the appropriate processor that will fulfill the requirement.
- name: route
type: route
paths:
- path: log_transform
condition: regex_match(item["body"], "node4")
The Route node has a condition that leverages CEL macros and an understanding of how to reference fields. You would add other paths and conditions to the route node to cater for other log structures on the same pipeline as per their requirements, such as node2 and node15.
See the Knowledge Library.
A test driven approach to configuration can be used. in this example, the Log Transform node will be used to meet the requirement:
outcome
field in the body.Suppose you want the new field to be located at attributes.outcome
.
Ensure that Kubernetes_input is selected as the source node. This will ensure that the routing logic of the preceding route node is also tested.
attributes.outcome
in the Field Path field.json(item["body"]).file.path
The inbound data and Outbound data panes are populated with samples of the expected input and output.
Note a few things:
resource
, _type
and timestamp
.node4
log has been listed in the inbound data pane. This indicates that the route node is correctly sending only the appropriate data to this node. All the other samples pasted into the test pane have been ignored.attributes.outcome
field has been added, but it is incorrectly configured. You need to point the CEL macro to the correct field within the log body
.file.path
and replace it with outcome
in the Value field:json(item["body"]).outcome
The Outbound data test pane now shows a data item that is conformant with the requirements. This indicates that the node is correctly configured so you click Save Changes before completing the configuration and deploying the pipeline.
See Use AI to create regex patterns and Use AI to create Grok patterns.
Managing computational cost is vital to ensure the Fleet’s performance and overall cost-effectiveness within an edge computing environment. Try to use pipeline configurations that are computationally less expensive than alternatives that perform the same function. For example, consider this transformation configuration:
- field_path: item["attributes"]["pod_name"]
operation: upsert
value: from_k8s(regex_capture(item["resource"]["ed.filepath"], "/var/lib/kubelet/pods/(?P<id>(.+))/volumes.*")["id"], "k8s.pod.name")
- field_path: item["attributes"]["pod_namespace"]
operation: upsert
value: from_k8s(regex_capture(item["resource"]["ed.filepath"], "/var/lib/kubelet/pods/(?P<id>(.+))/volumes.*")["id"], "k8s.namespace.name")
In this configuration, regex_capture
is called twice.
Now consider this version:
- field_path: item["attributes"]["pod_id"]
operation: upsert
value: regex_capture(item["resource"]["ed.filepath"], "/var/lib/kubelet/pods/(?P<id>(.+))/volumes.*")["id"]
- field_path: item["attributes"]["pod_name"]
operation: upsert
value: from_k8s(item["attributes"]["pod_id"], "k8s.pod.name")
- field_path: item["attributes"]["pod_namespace"]
operation: upsert
value: from_k8s(item["attributes"]["pod_id"], "k8s.namespace.name")
regex_capture
call is made in the efficient configuration, as opposed to four in the inefficient configuration. Since regex operations can be costly, minimizing their usage can lead to considerable performance improvements.pod_id
is extracted once and reused multiple times, which streamlines the data transformation process and reduces redundancy.See the CEL Macro page and the Designing Efficient Pipelines page for the computational expense of each CEL macro.
Automatically detect data sources when installing a Fleet.
Build and test regex patterns for use in your pipeline nodes.
Understand and manage timestamps using Edge Delta.
Enrich logs dynamically using data in a lookup table on the edge with Edge Delta’s agent.
Building efficient pipelines focused on optimizing computational resources.
Packs in the Edge Delta Visual Pipeline.
Creates a test bench to experiment with different pipeline configurations.
Global Pipeline configuration options.
Global agent configuration options.