Design Effective Pipelines

Design and build effective pipelines by testing.

Overview

You can test a node and the preceding nodes of pipeline using your own data to ensure that it will process logs and metrics as expected.

Know your Data

To design an effective data handling pipeline you should have a good understanding of the data your workloads generate. It is important to understand their structure and content, as well as whether they are homogeneous - of the same type and structure. You should gather a sample of the log structure you want to design a pipeline for. If your logs are not homogenous, you should gather one sample for each different data structure. You will use these samples to test node and pipeline function. You may want to gather 2 or 3 logs of each structure to have a sense of the range of values they may contain.

Consider the following set of logs that, for the purposes of this discussion, emanate from a single pipeline source:

{"timestamp": "2024-09-16T01:55:00.745903Z", "logLevel": "ERROR", "serviceName": "APIGateway", "nodeId": "node4", "message": "Incorrect password, user failed to authenticate.", "clientIP": "192.168.1.202", "username": "user982", "event": "login_failed", "outcome": "failure"}

{"timestamp": "2024-09-16T01:54:51.192456Z", "logLevel": "INFO", "serviceName": "DataService", "nodeId": "node3", "message": "The user has logged in successfully.", "clientIP": "192.168.1.166", "username": "user209", "event": "user_logged_in", "outcome": "success"}

{"timestamp": "2024-09-16T01:54:48.346160Z", "logLevel": "WARN", "serviceName": "InventoryService", "nodeId": "node2", "message": "Authentication process is taking longer than expected.", "clientIP": "192.168.1.248", "username": "user316", "event": "login_delayed", "outcome": "delayed"}

Understand the Ingestion Metadata

When logs are ingested into the pipeline, the entire log becomes the body and metadata is added to the log to build an OTEL data item.

Bear in mind that the OTEL source node attempts to use the incoming OTEL log fields.

See more examples of the data items. To understand how data is escaped, see Understand Escaping Characters.

Know your Requirements

To design effective log and metric pipelines, you must have a comprehensive understanding of the data handling requirements. These include business-driven factors such as cost-efficiency and adherence to legal mandates, data-specific needs such as volume capacity and optimization of data throughput, information security, and maintainability.

For the purposes of this document, the first log (node4 logs) should be enriched with a dynamic field based on the value of a field in the JSON body called outcome.

Pipeline Conceptual Design

Create a rough or conceptual pipeline containing the nodes whose functions fulfil the requirements. Consider the sequence of nodes and opportunities for branching the pipeline in paths. Develop a high level understanding of what your data should look like as it progresses through the pipeline to meet your requirements. For example, the first node might mask a specific field, while the next might extract a field from the body and convert it into an attribute. A parallel path might be required to also generate metrics or trigger alerts against a threshold. Consider the data destination data format requirements.

Assume for this example, the log sample comes from a single source node. Therefore, data needs to be routed appropriately on separate downstream paths to their respective processors (or, in a real world application, to a series of processors or perhaps a compound node). From there, data will be piped on to one or more outputs.

Pipeline Configuration

To start, a Route node needs to be configured. In this scenario, the node4 keyword will be used to route node4 logs to the appropriate processor that will fulfill the requirement.

  1. Click Edit Mode.
  2. Click Processors, expand Filters, and select Route.
  3. Click Add New in the Paths section.
  4. Specify a path and regex_match CEL macro such as the following to match the keyword node4:
- name: route
  type: route
  paths:
  - path: log_transform
    condition: regex_match(item["body"], "node4")

You would add other paths and conditions to the route node to cater for other log structures on the same pipeline as per their requirements, such as node2 and node15.

  1. Connect the route node to a source.

Test Driven Configuration

A test driven approach to configuration can be used. in this example, the Log Transform node will be used to meet the requirement: Enrich each log with an attribute, which should be the value the outcome field in the body. Suppose you want the new field to be located at attributes.outcome.

  1. Click Edit Mode, click Add Processor, expand Transformations, and select Log Transform.
  2. Click Save Changes to close the node for now.
  3. Connect the Log Transform node to the log_transform path of the route node.
  4. Connect the Log Transform node output to the ed_logs_output node.
  5. Open the Log Transform node.
  6. Paste the log samples above into the Samples pane.

Ensure that Kubernetes_input is selected as the source node. This will ensure that the routing logic of the preceding route node is also tested.

  1. Click Add New in the Transformation section.
  2. Enter attributes.outcome in the Field Path field.
  3. Select Upsert from the Operation list.
  4. Click Open CEL Library in the Value field to open the CEL macro builder.
  1. Select json.
  2. Click Copy CEL Expression then click Cancel.
  1. Paste the copied expression into the Value field.
json(item["body"]).file.path
  1. Click Process Samples.

The inbound data and Outbound data panes are populated with samples of the expected input and output.

Note a few things:

  • The inbound and outbound data include the parameters that would be added at ingestion time such as resource, _type and timestamp.
  • The node4 log has been listed in the inbound data pane. This indicates that the route node is correctly sending only the appropriate data to this node. All the other samples pasted into the test pane have been ignored.
  • The attributes.outcome field has been added, but it is incorrectly configured. You need to point the CEL macro to the correct field within the log body.
  1. Delete file.path and replace it with outcome in the Value field:
json(item["body"]).outcome

The Outbound data test pane now shows a data item that is conformant with the requirements. This indicates that the node is correctly configured so you click Save Changes before completing the configuration and deploying the pipeline.

Efficient Pipelines

Managing computational cost is vital to ensure the Fleet’s performance and overall cost-effectiveness within an edge computing environment. Try to use pipeline configurations that are computationally less expensive than alternatives that perform the same function. For example, consider this transformation configuration:

- field_path: item["attributes"]["pod_name"]
  operation: upsert
  value: from_k8s(regex_capture(item["resource"]["ed.filepath"], "/var/lib/kubelet/pods/(?P<id>(.+))/volumes.*")["id"], "k8s.pod.name")
- field_path: item["attributes"]["pod_namespace"]
  operation: upsert
  value: from_k8s(regex_capture(item["resource"]["ed.filepath"], "/var/lib/kubelet/pods/(?P<id>(.+))/volumes.*")["id"], "k8s.namespace.name")

In this configuration, regex_capture is called twice.

Now consider this version:

- field_path: item["attributes"]["pod_id"]
  operation: upsert
  value: regex_capture(item["resource"]["ed.filepath"], "/var/lib/kubelet/pods/(?P<id>(.+))/volumes.*")["id"]
- field_path: item["attributes"]["pod_name"]
  operation: upsert
  value: from_k8s(item["attributes"]["pod_id"], "k8s.pod.name")
- field_path: item["attributes"]["pod_namespace"]
  operation: upsert
  value: from_k8s(item["attributes"]["pod_id"], "k8s.namespace.name")
  • Fewer Regex Operations: Only one regex_capture call is made in the efficient configuration, as opposed to four in the inefficient configuration. Since regex operations can be costly, minimizing their usage can lead to considerable performance improvements.
  • Reusing Extracted Data: The pod_id is extracted once and reused multiple times, which streamlines the data transformation process and reduces redundancy.
  • Optimized API Calls: With fewer steps involved in data transformation, the API interactions, particularly with Kubernetes, become more efficient. This leads to faster processing times and lower latency.

See the CEL Macro page and the Designing Efficient Pipelines page for the computational expense of each CEL macro.

See Also: