Handle Node Processing Failures
5 minute read
Overview
In this tutorial you will update the test bench to handle node failures. Some nodes contain more than one output. If the node is unable to process a log as expected it is output on a fallback path labelled Failure or Unmatched. In the previous tutorial you created a route node that routed logs on one path if they contained the text PaymentService
in the log body. All other logs that don’t contain that text were sent to an unmatched path. In this tutorial you will add a second path to the route node and also connect the unmatched path to an output in order to monitor whether logs are being routed appropriately. In addition you will test individual processors using the Visual Pipelines.
Prerequisites
Before starting this tutorial, complete the steps in Tutorial 1: Create a Test Bench and Tutorial 2: Test a Pipeline
Scenario
Suppose you have two systems generating logs: PaymentService and AuthService
. Both need to end up in the ed_archive
but AuthService
generates logs containing a username that needs to be masked before ingestion in the archive. So you decide to add a path to the route node to route AuthService
logs on a second pipeline branch that includes a mask node. However, you also want to ensure that your route node configuration is working by monitoring the unmatched path.
These are examples of logs being generated:
{"timestamp": "2024-01-25T12:21:02.821442Z", "logLevel": "DEBUG", "service": "PaymentService", "records": [{"data": {"user": "user194", "action": "data_update", "details": "DataSyncTask debug: Synced 1500 rows from AnalyticsDB to ReportingDB"}}], "additionalInfo": {"clientIP": "192.168.1.182", "sessionID": "633aa6c1-fb36-41a4-9399-455e5a09a48d", "transactionID": "trx314809"}}
{"timestamp": "2024-01-25T12:22:27.947573Z", "logLevel": "INFO", "serviceName": "AuthService", "message": "The user has logged in successfully.", "username": "Raider293", "event": "user_logged_in", "outcome": "success"}
2024-01-25T09:25:34.051387Z WARN service=UserManagementService host.name=queue-east-714.stage message='User profile update operations are taking longer than expected' userID=user993 transactionID=trx831492
Notice the third log is from another service called
UserManagementService
.
Add a second Route path
- In the Edge Delta App, click Pipelines.
- Select the
testbench
fleet and click View/Edit Pipeline. - Click Edit Mode.
- Double-click the
route_test
node. - Click Add New in the Paths section.
- Enter
AuthService_Path
in the Path field. - Specify the following CEL macro in the Condition field:
regex_match(item["body"], "AuthService")
- Click Test Node.
- Copy the example logs and paste them in the Paste log data field:
{"timestamp": "2024-01-25T12:21:02.821442Z", "logLevel": "DEBUG", "service": "PaymentService", "records": [{"data": {"user": "user194", "action": "data_update", "details": "DataSyncTask debug: Synced 1500 rows from AnalyticsDB to ReportingDB"}}], "additionalInfo": {"clientIP": "192.168.1.182", "sessionID": "633aa6c1-fb36-41a4-9399-455e5a09a48d", "transactionID": "trx314809"}}
{"timestamp": "2024-01-25T12:22:27.947573Z", "logLevel": "INFO", "serviceName": "AuthService", "message": "The user has logged in successfully.", "username": "Raider293", "event": "user_logged_in", "outcome": "success"}
2024-01-25T09:25:34.051387Z WARN service=UserManagementService host.name=queue-east-714.stage message='User profile update operations are taking longer than expected' userID=user993 transactionID=trx831492
- Open the Processor tab and click Test Processor.
- Expand each Node Route in the Outgoing Data Items text box to ensure that each log is routed appropriately, with the third log routing to the unmatched fallback path.
- Click OK
Add a Mask Node
This is an example of a log generated by AuthService
.
{"timestamp": "2024-01-25T12:22:27.947573Z", "logLevel": "INFO", "serviceName": "AuthService", "message": "The user has logged in successfully.", "username": "Test user", "event": "user_logged_in", "outcome": "success"}
A Golang regex pattern that would identify the username field in logs like this is "username": "([^"]*)"
.
- Click Add Processor, expand Filters and select Mask Processor.
- Specify a Name for the node
mask_test
. - Enter the pattern in the Pattern field:
"username": "([^"]*)"
- Click Test Node.
- Copy the example log and paste it in the Paste log data field:
{"timestamp": "2024-01-25T12:22:27.947573Z", "logLevel": "INFO", "serviceName": "AuthService", "message": "The user has logged in successfully.", "username": "Test user", "event": "user_logged_in", "outcome": "success"}
- Open the Processor tab and click Test Processor.
- Examine the Outgoing Data Items text box to ensure that username field in the test log is masked appropriately.
- Click OK
Create a Fallback Output
In this step you create a local storage destination that will ingest all unmatched logs from the route
node. These are logs that match neither the AuthService_Path
nor the PaymentService_Path
. It will store these logs in a local on-cluster volume that you configured in Tutorial 1: Create a Test Bench.
- Click Add Destination, expand Archive and select Local Storage Destination.
- Enter
Route_Unmatched
in the Name field - Enter
/mnt/outputfile/route/logs
in the Mounted Path field - Select Uncompressed from the Compression list.
- Click OK.
Connect the new nodes
- Connect the Route node’s AuthService_Path to the Mask node’s input.
- Connect the Mask node’s output to the
ed_archive
node. - Connect the Route node’s Unmatched output to the
Route_Unmatched
destination node. - Click Review Changes.
- Click Deploy Changes.
Test the pipeline
- In a terminal, navigate to the local mapped volume of the input file, such as
/Users/path/testbench/inputlogs
. You configured this location when you created a cluster definition in Tutorial 1: Create a Test Bench. - Echo the following test messages into the input file:
echo "{"timestamp": "2024-01-25T12:21:02.821442Z", "logLevel": "DEBUG", "service": "PaymentService", "records": [{"data": {"user": "user194", "action": "data_update", "details": "DataSyncTask debug: Synced 1500 rows from AnalyticsDB to ReportingDB"}}], "additionalInfo": {"clientIP": "192.168.1.182", "sessionID": "633aa6c1-fb36-41a4-9399-455e5a09a48d", "transactionID": "trx314809"}}" >> testbench_input_file.log
echo "{"timestamp": "2024-01-25T12:22:27.947573Z", "logLevel": "INFO", "serviceName": "AuthService", "message": "The user has logged in successfully.", "username": "Raider293", "event": "user_logged_in", "outcome": "success"}" >> testbench_input_file.log
echo "2024-01-25T09:25:34.051387Z WARN service=UserManagementService host.name=queue-east-714.stage message='User profile update operations are taking longer than expected' userID=user993 transactionID=trx831492" >> testbench_input_file.log
This appends the logs one at a time to the file input.
- View the
testbench_input_file.log
file to see the logs in the input file:
cat testbench_input_file.log
- Navigate to the local mapped volume of the
Route_Unmatched
node, such as/Users/path/testbench/fails/route
. You configured this location when you created a cluster definition in Tutorial 1: Create a Test Bench.
There should be two files: A .log file containing the third log, and a JSON file containing metadata about the log that was unmatched.
- After a few minutes, click Logs in the Edge Delta app.
The new
PaymentService
log should be visible, and theAuthService
should be visible but with a redacted username.