Design Effective Pipelines
6 minute read
Know your Data
To design an effective data handling pipeline you should have a good understanding of the data your workloads generate. It is important to understand their structure and content, as well as whether they are homogeneous - of the same type and structure. You should gather a sample of the log structure you want to design a pipeline for. If your logs are not homogenous, you should gather one sample for each different data structure. You will use these samples to test node and pipeline function. You may want to gather 2 or 3 logs of each structure to have a sense of the range of values they may contain.
Consider the following set of logs that, for the purposes of this discussion, emanate from a single pipeline source:
{"timestamp": "2024-09-16T01:55:00.745903Z", "logLevel": "ERROR", "serviceName": "APIGateway", "nodeId": "node4", "message": "Incorrect password, user failed to authenticate.", "clientIP": "192.168.1.202", "username": "user982", "event": "login_failed", "outcome": "failure"}
{"timestamp": "2024-09-16T01:54:51.192456Z", "logLevel": "INFO", "serviceName": "DataService", "nodeId": "node3", "message": "The user has logged in successfully.", "clientIP": "192.168.1.166", "username": "user209", "event": "user_logged_in", "outcome": "success"}
{"timestamp": "2024-09-16T01:54:48.346160Z", "logLevel": "WARN", "serviceName": "InventoryService", "nodeId": "node2", "message": "Authentication process is taking longer than expected.", "clientIP": "192.168.1.248", "username": "user316", "event": "login_delayed", "outcome": "delayed"}
Understand the Ingestion Metadata
When logs are ingested into the pipeline, the entire log becomes the body and metadata is added to the log to build an OTEL data item.
Bear in mind that the OTEL source node attempts to use the incoming OTEL log fields.
See more examples of the data items. To understand how data is escaped, see Understand Escaping Characters.
Know your Requirements
To design effective log and metric pipelines, you must have a comprehensive understanding of the data handling requirements. These include business-driven factors such as cost-efficiency and adherence to legal mandates, data-specific needs such as volume capacity and optimization of data throughput, information security, and maintainability.
For the purposes of this document, the first log (node4 logs) should be enriched with a dynamic field based on the value of a field in the JSON body called outcome
.
Pipeline Conceptual Design
Create a rough or conceptual pipeline containing the nodes whose functions fulfil the requirements. Consider the sequence of nodes and opportunities for branching the pipeline in paths. Develop a high level understanding of what your data should look like as it progresses through the pipeline to meet your requirements. For example, the first node might mask a specific field, while the next might extract a field from the body and convert it into an attribute. A parallel path might be required to also generate metrics or trigger alerts against a threshold. Consider the data destination data format requirements.
Assume for this example, the log sample comes from a single source node. Therefore, data needs to be routed appropriately on separate downstream paths to their respective processors (or, in a real world application, to a series of processors or perhaps a compound node). From there, data will be piped on to one or more outputs.
Example Pipeline Configuration
Now create a pipeline based on the conceptual design. To start, a Route node needs to be configured. In this scenario, the node4
keyword will be used to route node4 logs to the appropriate processor that will fulfill the requirement.
- Click Edit Mode.
- Click Processors, expand Filters, and select Route.
- Click Add New in the Paths section.
- Specify a path and regex_match CEL macro such as the following to match the keyword node4:
- name: route
type: route
paths:
- path: log_transform
condition: regex_match(item["body"], "node4")
The Route node has a condition that leverages CEL macros and an understanding of how to reference fields. You would add other paths and conditions to the route node to cater for other log structures on the same pipeline as per their requirements, such as node2 and node15.
- Connect the route node to a source.
Test Driven Configuration
A test driven approach to configuration can be used. in this example, the Log Transform node will be used to meet the requirement:
- Enrich each log with an attribute, which should be the value the
outcome
field in the body.
Suppose you want the new field to be located at attributes.outcome
.
- Click Edit Mode, click Add Processor, expand Transformations, and select Log Transform.
- Click Save Changes to close the node for now.
- Connect the Log Transform node to the log_transform path of the route node.
- Connect the Log Transform node output to the ed_logs_output node.
- Open the Log Transform node.
- Paste the log samples above into the Samples pane.
Ensure that Kubernetes_input is selected as the source node. This will ensure that the routing logic of the preceding route node is also tested.
- Click Add New in the Transformation section.
- Enter
attributes.outcome
in the Field Path field. - Select Upsert from the Operation list.
- Click Open CEL Library in the Value field to open the CEL macro builder.
- Select json.
- Click Copy CEL Expression then click Cancel.
- Paste the copied expression into the Value field.
json(item["body"]).file.path
- Click Process Samples.
The inbound data and Outbound data panes are populated with samples of the expected input and output.
Note a few things:
- The inbound and outbound data include the parameters that would be added at ingestion time such as
resource
,_type
andtimestamp
. - The
node4
log has been listed in the inbound data pane. This indicates that the route node is correctly sending only the appropriate data to this node. All the other samples pasted into the test pane have been ignored. - The
attributes.outcome
field has been added, but it is incorrectly configured. You need to point the CEL macro to the correct field within the logbody
.
- Delete
file.path
and replace it withoutcome
in the Value field:
json(item["body"]).outcome
The Outbound data test pane now shows a data item that is conformant with the requirements. This indicates that the node is correctly configured so you click Save Changes before completing the configuration and deploying the pipeline.
Efficient Pipelines
Managing computational cost is vital to ensure the Fleet’s performance and overall cost-effectiveness within an edge computing environment. Try to use pipeline configurations that are computationally less expensive than alternatives that perform the same function. For example, consider this transformation configuration:
- field_path: item["attributes"]["pod_name"]
operation: upsert
value: from_k8s(regex_capture(item["resource"]["ed.filepath"], "/var/lib/kubelet/pods/(?P<id>(.+))/volumes.*")["id"], "k8s.pod.name")
- field_path: item["attributes"]["pod_namespace"]
operation: upsert
value: from_k8s(regex_capture(item["resource"]["ed.filepath"], "/var/lib/kubelet/pods/(?P<id>(.+))/volumes.*")["id"], "k8s.namespace.name")
In this configuration, regex_capture
is called twice.
Now consider this version:
- field_path: item["attributes"]["pod_id"]
operation: upsert
value: regex_capture(item["resource"]["ed.filepath"], "/var/lib/kubelet/pods/(?P<id>(.+))/volumes.*")["id"]
- field_path: item["attributes"]["pod_name"]
operation: upsert
value: from_k8s(item["attributes"]["pod_id"], "k8s.pod.name")
- field_path: item["attributes"]["pod_namespace"]
operation: upsert
value: from_k8s(item["attributes"]["pod_id"], "k8s.namespace.name")
- Fewer Regex Operations: Only one
regex_capture
call is made in the efficient configuration, as opposed to four in the inefficient configuration. Since regex operations can be costly, minimizing their usage can lead to considerable performance improvements. - Reusing Extracted Data: The
pod_id
is extracted once and reused multiple times, which streamlines the data transformation process and reduces redundancy. - Optimized API Calls: With fewer steps involved in data transformation, the API interactions, particularly with Kubernetes, become more efficient. This leads to faster processing times and lower latency.
See the CEL Macro page and the Designing Efficient Pipelines page for the computational expense of each CEL macro.