Edge Delta HTTP Workflow Source
Configure HTTP Workflow sources to execute multi-step API orchestration with support for OAuth flows, conditional logic, iterators, and Redis caching.
15 minute read
Overview
HTTP Workflow chains multiple HTTP requests together, where each step can use data from previous responses. Use it when you need to:
- Authenticate first, then fetch data (OAuth flows)
- Get a list, then fetch details for each item (fan-out patterns)
- React to webhooks with multi-step processing
- Cache tokens or responses in Redis
Not sure which to use?
| Use Case | Source |
|---|---|
| Single API endpoint, simple polling | HTTP Pull |
| Multiple dependent requests, OAuth, webhooks | HTTP Workflow (this page) |
- Outgoing data types: log
- Minimum version: v2.8.0
Quick Start: Your First Workflow
Get a working workflow in under 2 minutes.
Simplest Example: Poll a Public API
nodes:
- name: github_status
type: http_workflow_input
workflow_pull_interval: 5m
steps:
- name: get_status
endpoint: https://www.githubstatus.com/api/v2/status.json
That’s it. This workflow:
- Runs every 5 minutes
- Fetches GitHub’s status API
- Emits the JSON response as a log item

Add Authentication
Most APIs require authentication. Add a header using a secret reference:
nodes:
- name: my_api_workflow
type: http_workflow_input
workflow_pull_interval: 5m
steps:
- name: fetch_data
endpoint: https://api.example.com/v1/data
headers:
Authorization: "Bearer {{ SECRET api_token }}"
Accept: application/json
The {{ SECRET api_token }} references a secret named api_token stored in EdgeDelta’s secrets management. Create secrets in the EdgeDelta UI under Settings > Secrets.

Core Concepts
Before diving deeper, understand these three building blocks:
1. Triggers: What Starts the Workflow
Choose one trigger type:
| Trigger | When to Use | Example |
|---|---|---|
workflow_pull_interval | Regular polling | workflow_pull_interval: 5m |
workflow_pull_schedule | Specific times (cron) | workflow_pull_schedule: "0 9 * * 1-5" |
webhook | React to external events | Incoming HTTP POST |
2. Steps: The Requests in Your Workflow
Each step is an HTTP request. Steps run sequentially, and each step can access data from previous steps.
steps:
- name: step_one # First request
endpoint: https://api.example.com/auth
- name: step_two # Uses data from step_one
endpoint: https://api.example.com/data
header_expressions:
Authorization: Concat(["Bearer ", steps["step_one"].body["token"]], "")
3. Variables: Passing Data Between Steps
Access previous step results using these patterns:
| What You Need | Expression | Example |
|---|---|---|
| Response body field | steps["name"].body["field"] | steps["auth"].body["token"] |
| HTTP status code | steps["name"].status_code | steps["fetch"].status_code |
| Response header | steps["name"].headers["header"] | steps["auth"].headers["x-request-id"] |
| Global variable | variables["key"] | variables["api_base"] |
| Environment variable | EDXEnv("NAME", "default") | EDXEnv("API_KEY", "") |
Warning: All expressions using
Concat,steps[], etc. must be written on a single line. Multi-line expressions will fail validation.
Common Patterns
Pattern 1: OAuth Token + Data Fetch
The most common pattern: get an OAuth token, then use it to fetch data.
nodes:
- name: oauth_workflow
type: http_workflow_input
workflow_pull_interval: 15m
# Redis for token caching (optional but recommended)
redis_address: "redis://localhost:6379"
# Global variables for reuse (secrets resolved at runtime)
variables:
client_id: "{{ SECRET oauth_client_id }}"
client_secret: "{{ SECRET oauth_client_secret }}"
steps:
# Step 1: Get OAuth token (cached for 50 minutes)
- name: get_token
endpoint: https://auth.example.com/oauth/token
method: POST
headers:
Content-Type: application/x-www-form-urlencoded
request_body_expression: Concat(["grant_type=client_credentials&client_id=", variables["client_id"], "&client_secret=", variables["client_secret"]], "")
redis_cache:
key_expression: '"oauth-token"'
ttl: 50m
check_before_request: true
populate_on_success: true
# Step 2: Fetch data using the token
- name: fetch_data
endpoint: https://api.example.com/v1/data
header_expressions:
Authorization: Concat(["Bearer ", steps["get_token"].body["access_token"]], "")

Why cache the token?
- OAuth tokens typically last 1 hour
- Caching for 50 minutes avoids re-authenticating every request
- If cache is empty or expired, the HTTP request runs automatically
Pattern 2: Webhook-Triggered Processing
Process incoming webhooks and enrich data from other APIs.
nodes:
- name: alert_webhook
type: http_workflow_input
webhook:
path: /webhooks/alerts
port: 8888
auth_type: bearer
bearer_token: "{{ SECRET webhook_bearer_token }}"
rate_limit_enabled: true
rate_limit_max_per_min: 100
steps:
# Access webhook payload with webhook_payload["field"]
- name: enrich_alert
endpoint_expression: Concat(["https://api.example.com/users/", webhook_payload["user_id"]], "")
headers:
Authorization: "Bearer {{ SECRET api_token }}"
- name: send_to_slack
endpoint: "https://hooks.slack.com/services/{{ SECRET slack_webhook_path }}"
method: POST
headers:
Content-Type: application/json
request_body_expression: Concat(["{\"text\":\"Alert from ", steps["enrich_alert"].body["username"], ": ", webhook_payload["message"], "\"}"], "")

Webhook authentication options:
| Auth Type | Configuration | Caller Sends |
|---|---|---|
bearer | bearer_token: "{{ SECRET webhook_token }}" | Authorization: Bearer <token> |
api_key | api_key: "{{ SECRET webhook_api_key }}", api_key_header: X-API-Key | X-API-Key: <key> |
hmac | hmac_secret: "{{ SECRET hmac_secret }}", hmac_header: X-Signature | HMAC signature of body |
none | (no config needed) | Nothing required |
Pattern 3: Fan-Out (Get List, Then Details)
Fetch a list of items, then get details for each item in parallel.
nodes:
- name: user_details_workflow
type: http_workflow_input
workflow_pull_interval: 30m
steps:
# Step 1: Get list of users
- name: get_users
endpoint: https://api.example.com/users?limit=50
headers:
Authorization: "Bearer {{ SECRET api_token }}"
# Step 2: For each user, get their profile
- name: get_profile
endpoint_expression: Concat(["https://api.example.com/users/", iterator_item["id"], "/profile"], "")
headers:
Authorization: "Bearer {{ SECRET api_token }}"
is_last_step: true
iterator:
source: ParseJSON(steps["get_users"].body)["users"]
variable_name: iterator_item
max_parallel: 10
continue_on_error: true

Inside an iterator step, you can access:
| Variable | Description |
|---|---|
iterator_item | Current item from the list |
iterator_item["field"] | Field from current item |
iterator_index | Zero-based index (0, 1, 2…) |
Warning: With
max_parallel: 10, you’ll make 10 concurrent requests. If the target API has rate limits, lower this value or addretry_http_code: [429]to handle rate limit responses.
Fan-Out with Per-Item Caching
Cache each item’s response to avoid redundant API calls on subsequent runs. If a user’s profile was fetched recently, skip the API call and use the cached version.
nodes:
- name: cached_fan_out
type: http_workflow_input
workflow_pull_interval: 30m
redis_address: "redis://localhost:6379"
steps:
- name: get_users
endpoint: https://api.example.com/users
headers:
Authorization: "Bearer {{ SECRET api_token }}"
- name: get_profile
endpoint_expression: Concat(["https://api.example.com/users/", iterator_item["id"], "/profile"], "")
headers:
Authorization: "Bearer {{ SECRET api_token }}"
is_last_step: true
iterator:
source: ParseJSON(steps["get_users"].body)["users"]
variable_name: iterator_item
max_parallel: 10
# Cache each user's profile by their ID
redis_cache:
key_expression: Concat(["user-profile-", iterator_item["id"]], "")
ttl: 1h
check_before_request: true # Check cache first
populate_on_success: true # Cache new responses
Result: On the first run, all 100 profiles are fetched. On subsequent runs within the TTL, only new/changed users trigger API calls—cached profiles are reused.
Pattern 3b: Caching Expensive API Calls
Some APIs are rate-limited, slow, or have per-call costs. Cache responses to minimize calls.
nodes:
- name: expensive_api_workflow
type: http_workflow_input
workflow_pull_interval: 5m # Check every 5 minutes
redis_address: "redis://localhost:6379"
steps:
# This API costs $0.01 per call - cache for 1 hour
- name: fetch_expensive_data
endpoint: https://api.expensive-service.com/data
headers:
Authorization: "Bearer {{ SECRET expensive_api_key }}"
redis_cache:
key_expression: '"expensive-data-cache"'
ttl: 1h # Cache for 1 hour
check_before_request: true # Always check cache first
populate_on_success: true # Store successful responses
# This API is rate-limited to 10 req/min - cache for 5 minutes
- name: fetch_rate_limited
endpoint: https://api.strict-limits.com/metrics
headers:
Authorization: "Bearer {{ SECRET metrics_api_key }}"
redis_cache:
key_expression: '"metrics-cache"'
ttl: 5m
check_before_request: true
populate_on_success: true
Cost savings example:
| Scenario | API Calls/Day | Cost @ $0.01/call |
|---|---|---|
| No caching (every 5 min) | 288 | $2.88 |
| 1-hour cache | 24 | $0.24 |
| Savings | 264 calls | $2.64/day |
Pattern 4: Conditional Execution
Skip steps based on previous results.
steps:
- name: check_status
endpoint: https://api.example.com/status
# Only runs if status is "active"
- name: fetch_active_data
endpoint: https://api.example.com/active-data
run_condition: steps["check_status"].body["status"] == "active"
# Only runs if status is NOT "active"
- name: send_alert
endpoint: https://alerts.example.com/webhook
method: POST
run_condition: steps["check_status"].body["status"] != "active"
request_body: '{"message": "Service is not active"}'
Pattern 5: State Persistence (Markers/Cursors)
Persist state between workflow runs using Redis-only steps. Common use cases:
- Pagination cursors: Remember where you left off
- Timestamps: Only fetch records since last run
- Deduplication: Track processed record IDs
nodes:
- name: incremental_sync
type: http_workflow_input
workflow_pull_interval: 5m
redis_address: "redis://localhost:6379"
steps:
# Step 1: Read last processed timestamp from Redis (no HTTP call)
- name: get_marker
redis_cache:
key_expression: '"last-sync-timestamp"'
check_before_request: true
# Step 2: Fetch new records since the marker
- name: fetch_new_records
endpoint: https://api.example.com/records
parameter_expressions:
since: steps["get_marker"].body
limit: '"100"'
headers:
Authorization: "Bearer {{ SECRET api_token }}"
# Step 3: Save the new marker to Redis (no HTTP call)
- name: save_marker
redis_cache:
key_expression: '"last-sync-timestamp"'
value_expression: steps["fetch_new_records"].body["latest_timestamp"]
populate_on_success: true
ttl: 720h # 30 days
How Redis-only steps work:
| Step Type | Configuration | Behavior |
|---|---|---|
| Read marker | check_before_request: true, no endpoint | Reads value from Redis, available as steps["name"].body |
| Write marker | populate_on_success: true + value_expression, no endpoint | Writes computed value to Redis |
Note: Steps with
redis_cacheconfigured but noendpointare Redis-only steps. They don’t make HTTP calls—they only interact with Redis. Usevalue_expressionto specify what to store.
Redis Caching
Cache responses to avoid redundant API calls. Essential for OAuth tokens.
Quick Setup
nodes:
- name: cached_workflow
type: http_workflow_input
workflow_pull_interval: 5m
# Global Redis connection
redis_address: "redis://localhost:6379"
redis_password: "{{ SECRET redis_password }}" # Optional
redis_tls: false
steps:
- name: cached_request
endpoint: https://api.example.com/data
redis_cache:
key_expression: '"my-cache-key"' # Cache key (must evaluate to string)
ttl: 10m # Cache duration
check_before_request: true # Check cache first
populate_on_success: true # Store response on success

Cache Configuration Reference
| Field | Type | Description |
|---|---|---|
key_expression | string | OTTL expression for cache key |
ttl | duration | Time-to-live (5m, 1h, 720h, etc.) |
check_before_request | bool | If true, check cache before HTTP request |
populate_on_success | bool | If true, cache response after successful request |
value_expression | string | OTTL expression for value to store (required for Redis-only SET steps) |
skip_remaining_on_hit | bool | If true, skip all remaining steps on cache hit |
Complete Reference
Node-Level Parameters
| Parameter | Type | Required | Default | Description |
|---|---|---|---|---|
name | string | Yes | - | Unique node identifier |
type | string | Yes | - | Must be http_workflow_input |
steps | array | Yes | - | List of HTTP request steps |
workflow_pull_interval | duration | One trigger required | 1m | Polling interval |
workflow_pull_schedule | string | One trigger required | - | Cron expression |
webhook | object | One trigger required | - | Webhook configuration |
global_timeout | duration | No | 2m | Max workflow execution time |
emit_intermediate | bool | No | false | Emit logs for non-final steps |
variables | map | No | - | Global variables for all steps |
redis_address | string | No | - | Redis connection string |
redis_password | string | No | - | Redis password |
redis_tls | bool | No | false | Enable TLS for Redis |
redis_db | int | No | 0 | Redis database number |
Step-Level Parameters
| Parameter | Type | Required | Default | Description |
|---|---|---|---|---|
name | string | Yes | - | Unique step identifier |
endpoint | string | One required | - | Static URL |
endpoint_expression | string | One required | - | Dynamic URL via OTTL |
method | string | No | GET | HTTP method |
headers | map | No | - | Static headers |
header_expressions | map | No | - | Dynamic headers via OTTL |
parameters | map | No | - | Query parameters |
parameter_expressions | map | No | - | Dynamic query params via OTTL |
request_body | string | No | - | Static request body |
request_body_expression | string | No | - | Dynamic body via OTTL |
request_timeout | duration | No | 30s | Timeout for this step |
run_condition | string | No | - | OTTL condition to execute step |
retry_http_code | array[int] | No | - | HTTP codes that trigger retry |
iterator | object | No | - | Fan-out configuration |
redis_cache | object | No | - | Caching configuration |
is_last_step | bool | No | false | Mark as terminal step |
Webhook Configuration
| Field | Type | Default | Description |
|---|---|---|---|
path | string | Required | URL path (e.g., /webhooks/alerts) |
port | int | 8080 | HTTP server port |
auth_type | string | none | none, bearer, api_key, hmac |
bearer_token | string | - | Token for bearer auth |
api_key | string | - | Key for API key auth |
api_key_header | string | X-API-Key | Header name for API key |
hmac_secret | string | - | Secret for HMAC validation |
hmac_header | string | X-Webhook-Signature | Header with HMAC signature |
rate_limit_enabled | bool | false | Enable rate limiting |
rate_limit_max_per_min | int | 60 | Max requests per minute |
Iterator Configuration
| Field | Type | Default | Description |
|---|---|---|---|
source | string | Required | OTTL expression returning array |
variable_name | string | iterator_item | Name for current item |
max_parallel | int | 10 | Concurrent iterations |
continue_on_error | bool | false | Continue if iteration fails |
Security
Built-in SSRF Protection
HTTP Workflow blocks requests to:
- Localhost (
127.0.0.1,::1,localhost) - Private IPs (
10.x,172.16-31.x,192.168.x) - Cloud metadata (
169.254.169.254) - Non-HTTP schemes (
file://,ftp://)
Credential Best Practices
# DO: Use EdgeDelta secrets (recommended)
headers:
Authorization: "Bearer {{ SECRET api_token }}"
# DO: Use EDXEnv for environment variables
header_expressions:
Authorization: Concat(["Bearer ", EDXEnv("API_TOKEN", "")], "")
# DON'T: Hardcode secrets
headers:
Authorization: Bearer sk_live_abc123 # Never do this!
Note: Create secrets in the EdgeDelta UI: Settings > Secrets. Use lowercase names with underscores (e.g.,
api_token,oauth_client_secret).
Troubleshooting
No Data Retrieved
- Verify endpoint is accessible: Test with
curlfirst - Check authentication: Ensure env vars are set
- Review step names: Case-sensitive when referenced
- Enable debug logging:
log: level: debug
Token Not Being Cached
- Verify Redis is reachable:
redis-cli ping - Check
check_before_request: trueis set - Ensure
key_expressionevaluates to a string - Confirm TTL is less than token expiry
Iterator Not Working
- Verify
sourcereturns an array (not object) - Check previous step completed successfully
- Try
continue_on_error: truefor partial success
Webhook Not Receiving Requests
- Check port availability:
lsof -i:8888 - Verify firewall allows incoming connections
- Confirm auth headers match configuration
- Check agent logs for
401or429responses
Testing Your Workflow
Before deploying, test endpoints manually:
# Test OAuth endpoint
curl -X POST "https://auth.example.com/oauth/token" \
-H "Content-Type: application/x-www-form-urlencoded" \
-d "grant_type=client_credentials&client_id=<YOUR_CLIENT_ID>&client_secret=<YOUR_CLIENT_SECRET>"
# Test data endpoint with token
curl "https://api.example.com/data" \
-H "Authorization: Bearer <YOUR_ACCESS_TOKEN>"
# Test webhook locally
curl -X POST "http://localhost:8888/webhooks/test" \
-H "Authorization: Bearer <YOUR_WEBHOOK_SECRET>" \
-H "Content-Type: application/json" \
-d '{"event": "test", "user_id": "123"}'
See Also
- HTTP Pull Source — Single-request HTTP polling
- OTTL Language Guide — Complete expression reference