Handle Node Processing Failures

This tutorial tests upgrades the test bench to handle node failures.

Overview

In this tutorial you will update the test bench to handle node failures. Some nodes contain more than one output. If the node is unable to process a log as expected it is output on a fallback path labelled Failure or Unmatched. In the previous tutorial you created a route node that routed logs on one path if they contained the text PaymentService in the log body. All other logs that don’t contain that text were sent to an unmatched path. In this tutorial you will add a second path to the route node and also connect the unmatched path to an output in order to monitor whether logs are being routed appropriately. In addition you will test individual processors using the Visual Pipelines.

Prerequisites

Before starting this tutorial, complete the steps in Tutorial 1: Create a Test Bench and Tutorial 2: Test a Pipeline

Scenario

Suppose you have two systems generating logs: PaymentService and AuthService. Both need to end up in the ed_archive but AuthService generates logs containing a username that needs to be masked before ingestion in the archive. So you decide to add a path to the route node to route AuthService logs on a second pipeline branch that includes a mask node. However, you also want to ensure that your route node configuration is working by monitoring the unmatched path.

These are examples of logs being generated:

{"timestamp": "2024-01-25T12:21:02.821442Z", "logLevel": "DEBUG", "service": "PaymentService", "records": [{"data": {"user": "user194", "action": "data_update", "details": "DataSyncTask debug: Synced 1500 rows from AnalyticsDB to ReportingDB"}}], "additionalInfo": {"clientIP": "192.168.1.182", "sessionID": "633aa6c1-fb36-41a4-9399-455e5a09a48d", "transactionID": "trx314809"}}
{"timestamp": "2024-01-25T12:22:27.947573Z", "logLevel": "INFO", "serviceName": "AuthService", "message": "The user has logged in successfully.", "username": "Raider293", "event": "user_logged_in", "outcome": "success"}
2024-01-25T09:25:34.051387Z WARN service=UserManagementService host.name=queue-east-714.stage message='User profile update operations are taking longer than expected' userID=user993 transactionID=trx831492

Notice the third log is from another service called UserManagementService.

Add a second Route path

  1. In the Edge Delta App, click Pipelines - Pipelines.
  2. Select the testbench pipeline.
  3. Click Edit Mode.
  4. Double-click the route_test node.
  5. Click Add New in the Paths section.
  6. Enter AuthService_Path in the Path field.
  7. Specify the following CEL macro in the Condition field:
regex_match(item["body"], "AuthService")
  1. Click Test Node.
  2. Copy the example logs and paste them in the Paste log data field:
{"timestamp": "2024-01-25T12:21:02.821442Z", "logLevel": "DEBUG", "service": "PaymentService", "records": [{"data": {"user": "user194", "action": "data_update", "details": "DataSyncTask debug: Synced 1500 rows from AnalyticsDB to ReportingDB"}}], "additionalInfo": {"clientIP": "192.168.1.182", "sessionID": "633aa6c1-fb36-41a4-9399-455e5a09a48d", "transactionID": "trx314809"}}
{"timestamp": "2024-01-25T12:22:27.947573Z", "logLevel": "INFO", "serviceName": "AuthService", "message": "The user has logged in successfully.", "username": "Raider293", "event": "user_logged_in", "outcome": "success"}
2024-01-25T09:25:34.051387Z WARN service=UserManagementService host.name=queue-east-714.stage message='User profile update operations are taking longer than expected' userID=user993 transactionID=trx831492
  1. Open the Processor tab and click Test Processor.
  2. Expand each Node Route in the Outgoing Data Items text box to ensure that each log is routed appropriately, with the third log routing to the unmatched fallback path.
  3. Click OK

Add a Mask Node

This is an example of a log generated by AuthService. {"timestamp": "2024-01-25T12:22:27.947573Z", "logLevel": "INFO", "serviceName": "AuthService", "message": "The user has logged in successfully.", "username": "Test user", "event": "user_logged_in", "outcome": "success"} A Golang regex pattern that would identify the username field in logs like this is "username": "([^"]*)".

  1. Click Add Processor, expand Filters and select Mask Processor.
  2. Specify a Name for the node mask_test.
  3. Enter the pattern in the Pattern field:
"username": "([^"]*)"
  1. Click Test Node.
  2. Copy the example log and paste it in the Paste log data field:
{"timestamp": "2024-01-25T12:22:27.947573Z", "logLevel": "INFO", "serviceName": "AuthService", "message": "The user has logged in successfully.", "username": "Test user", "event": "user_logged_in", "outcome": "success"}
  1. Open the Processor tab and click Test Processor.
  2. Examine the Outgoing Data Items text box to ensure that username field in the the test log is masked appropriately.
  3. Click OK

Create a Fallback Output

In this step you create a local storage output that will ingest all unmatched logs from the route node. These are logs that match neither the AuthService_Path nor the PaymentService_Path. It will store these logs in a local on-cluster volume that you configured in Tutorial 1: Create a Test Bench.

  1. Click Add Output, expand Archive and select Local Storage Output.
  2. Enter Route_Unmatched in the Name field
  3. Enter /mnt/outputfile/route/logs in the Mounted Path field
  4. Select Uncompressed from the Compression list.
  5. Click OK.

Connect the new nodes

  1. Connect the Route node’s AuthService_Path to the Mask node’s input.
  2. Connect the Mask node’s output to the ed_archive node.
  3. Connect the Route node’s Unmatched output to the Route_Unmatched output node.
  4. Click Review Changes.
  5. Click Deploy Changes.

Test the pipeline

  1. In a terminal, navigate to the local mapped volume of the input file, such as /Users/path/testbench/inputlogs. You configured this location when you created a cluster definition in Tutorial 1: Create a Test Bench.
  2. Echo the following test messages into the input file:
echo "{"timestamp": "2024-01-25T12:21:02.821442Z", "logLevel": "DEBUG", "service": "PaymentService", "records": [{"data": {"user": "user194", "action": "data_update", "details": "DataSyncTask debug: Synced 1500 rows from AnalyticsDB to ReportingDB"}}], "additionalInfo": {"clientIP": "192.168.1.182", "sessionID": "633aa6c1-fb36-41a4-9399-455e5a09a48d", "transactionID": "trx314809"}}" >> testbench_input_file.log
echo "{"timestamp": "2024-01-25T12:22:27.947573Z", "logLevel": "INFO", "serviceName": "AuthService", "message": "The user has logged in successfully.", "username": "Raider293", "event": "user_logged_in", "outcome": "success"}" >> testbench_input_file.log
echo "2024-01-25T09:25:34.051387Z WARN service=UserManagementService host.name=queue-east-714.stage message='User profile update operations are taking longer than expected' userID=user993 transactionID=trx831492" >> testbench_input_file.log

This appends the logs one at a time to the file input.

  1. View the testbench_input_file.log file to see the logs in the input file:
cat testbench_input_file.log
  1. Navigate to the local mapped volume of the Route_Unmatched node, such as /Users/path/testbench/fails/route. You configured this location when you created a cluster definition in Tutorial 1: Create a Test Bench.

There should be two files: A .log file containing the third log, and a json file containing metadata about the log that was unmatched.

  1. After a few minutes, click Logs in the Edge Delta app. The new PaymentService log should be visible, and the AuthService should be visible but with a redacted username.