Edge Delta HTTP Workflow Source

Configure HTTP Workflow sources to execute multi-step API orchestration with support for OAuth flows, conditional logic, iterators, and Redis caching.

  15 minute read  

Overview

HTTP Workflow chains multiple HTTP requests together, where each step can use data from previous responses. Use it when you need to:

  • Authenticate first, then fetch data (OAuth flows)
  • Get a list, then fetch details for each item (fan-out patterns)
  • React to webhooks with multi-step processing
  • Cache tokens or responses in Redis

Not sure which to use?

Use CaseSource
Single API endpoint, simple pollingHTTP Pull
Multiple dependent requests, OAuth, webhooksHTTP Workflow (this page)
  • Outgoing data types: log
  • Minimum version: v2.8.0

Quick Start: Your First Workflow

Get a working workflow in under 2 minutes.

Simplest Example: Poll a Public API

nodes:
- name: github_status
  type: http_workflow_input
  workflow_pull_interval: 5m
  steps:
    - name: get_status
      endpoint: https://www.githubstatus.com/api/v2/status.json

That’s it. This workflow:

  1. Runs every 5 minutes
  2. Fetches GitHub’s status API
  3. Emits the JSON response as a log item
HTTP Workflow node in pipeline builder connected to destination HTTP Workflow node in pipeline builder connected to destination

Add Authentication

Most APIs require authentication. Add a header using a secret reference:

nodes:
- name: my_api_workflow
  type: http_workflow_input
  workflow_pull_interval: 5m
  steps:
    - name: fetch_data
      endpoint: https://api.example.com/v1/data
      headers:
        Authorization: "Bearer {{ SECRET api_token }}"
        Accept: application/json

The {{ SECRET api_token }} references a secret named api_token stored in EdgeDelta’s secrets management. Create secrets in the EdgeDelta UI under Settings > Secrets.

Step configuration panel showing authentication headers Step configuration panel showing authentication headers

Core Concepts

Before diving deeper, understand these three building blocks:

flowchart LR classDef trigger fill:#E3F8EE,stroke:#1B7F5E,color:#0F3B2E; classDef step fill:#E7F0FB,stroke:#2563EB,color:#1E3A8A; classDef cache fill:#ECEBFF,stroke:#4338CA,color:#312E81; classDef output fill:#FCEADB,stroke:#EA580C,color:#7C2D12; T["Trigger<br/>(poll/schedule/webhook)"] S1["Step 1<br/>HTTP Request"] S2["Step 2<br/>Uses Step 1 Data"] SN["Step N<br/>Final Request"] C[("Redis Cache<br/>(optional)")] O["Emit Log"] T --> S1 S1 --> S2 S2 -.-> SN SN --> O C <-.-> S1 C <-.-> S2 C <-.-> SN class T trigger; class S1,S2,SN step; class C cache; class O output;
HTTP Workflow execution model

1. Triggers: What Starts the Workflow

Choose one trigger type:

TriggerWhen to UseExample
workflow_pull_intervalRegular pollingworkflow_pull_interval: 5m
workflow_pull_scheduleSpecific times (cron)workflow_pull_schedule: "0 9 * * 1-5"
webhookReact to external eventsIncoming HTTP POST

2. Steps: The Requests in Your Workflow

Each step is an HTTP request. Steps run sequentially, and each step can access data from previous steps.

steps:
  - name: step_one    # First request
    endpoint: https://api.example.com/auth

  - name: step_two    # Uses data from step_one
    endpoint: https://api.example.com/data
    header_expressions:
      Authorization: Concat(["Bearer ", steps["step_one"].body["token"]], "")

3. Variables: Passing Data Between Steps

Access previous step results using these patterns:

What You NeedExpressionExample
Response body fieldsteps["name"].body["field"]steps["auth"].body["token"]
HTTP status codesteps["name"].status_codesteps["fetch"].status_code
Response headersteps["name"].headers["header"]steps["auth"].headers["x-request-id"]
Global variablevariables["key"]variables["api_base"]
Environment variableEDXEnv("NAME", "default")EDXEnv("API_KEY", "")

Warning: All expressions using Concat, steps[], etc. must be written on a single line. Multi-line expressions will fail validation.

Common Patterns

Pattern 1: OAuth Token + Data Fetch

The most common pattern: get an OAuth token, then use it to fetch data.

sequenceDiagram participant Agent as Edge Delta participant Redis as Redis Cache participant Auth as Auth Server participant API as Data API Agent->>Redis: Check for cached token alt Cache Hit Redis-->>Agent: Return cached token else Cache Miss Agent->>Auth: POST /oauth/token Auth-->>Agent: access_token Agent->>Redis: Cache token (TTL: 50m) end Agent->>API: GET /data + Bearer token API-->>Agent: Response data
OAuth authentication flow
nodes:
- name: oauth_workflow
  type: http_workflow_input
  workflow_pull_interval: 15m

  # Redis for token caching (optional but recommended)
  redis_address: "redis://localhost:6379"

  # Global variables for reuse (secrets resolved at runtime)
  variables:
    client_id: "{{ SECRET oauth_client_id }}"
    client_secret: "{{ SECRET oauth_client_secret }}"

  steps:
    # Step 1: Get OAuth token (cached for 50 minutes)
    - name: get_token
      endpoint: https://auth.example.com/oauth/token
      method: POST
      headers:
        Content-Type: application/x-www-form-urlencoded
      request_body_expression: Concat(["grant_type=client_credentials&client_id=", variables["client_id"], "&client_secret=", variables["client_secret"]], "")
      redis_cache:
        key_expression: '"oauth-token"'
        ttl: 50m
        check_before_request: true
        populate_on_success: true

    # Step 2: Fetch data using the token
    - name: fetch_data
      endpoint: https://api.example.com/v1/data
      header_expressions:
        Authorization: Concat(["Bearer ", steps["get_token"].body["access_token"]], "")
Two-step OAuth workflow configuration with get_token and fetch_data steps Two-step OAuth workflow configuration with get_token and fetch_data steps

Why cache the token?

  • OAuth tokens typically last 1 hour
  • Caching for 50 minutes avoids re-authenticating every request
  • If cache is empty or expired, the HTTP request runs automatically

Pattern 2: Webhook-Triggered Processing

Process incoming webhooks and enrich data from other APIs.

flowchart LR classDef external fill:#E3F8EE,stroke:#1B7F5E,color:#0F3B2E; classDef security fill:#EAF4F3,stroke:#14532D,color:#0F3B2E; classDef process fill:#E7F0FB,stroke:#2563EB,color:#1E3A8A; classDef success fill:#E3F8EE,stroke:#1B7F5E,color:#0F3B2E; classDef reject fill:#FEE2E2,stroke:#DC2626,color:#7F1D1D; WH["Webhook<br/>Request"] RL{"Rate<br/>Limit?"} AU{"Auth<br/>Valid?"} S1["Enrich Data"] S2["Notify Slack"] OK["202 Accepted"] TM["429 Too Many"] UN["401 Unauthorized"] WH --> RL RL -->|OK| AU RL -->|Exceeded| TM AU -->|Yes| S1 AU -->|No| UN S1 --> S2 S2 --> OK class WH external; class RL,AU security; class S1,S2 process; class OK success; class TM,UN reject;
Webhook processing flow
nodes:
- name: alert_webhook
  type: http_workflow_input

  webhook:
    path: /webhooks/alerts
    port: 8888
    auth_type: bearer
    bearer_token: "{{ SECRET webhook_bearer_token }}"
    rate_limit_enabled: true
    rate_limit_max_per_min: 100

  steps:
    # Access webhook payload with webhook_payload["field"]
    - name: enrich_alert
      endpoint_expression: Concat(["https://api.example.com/users/", webhook_payload["user_id"]], "")
      headers:
        Authorization: "Bearer {{ SECRET api_token }}"

    - name: send_to_slack
      endpoint: "https://hooks.slack.com/services/{{ SECRET slack_webhook_path }}"
      method: POST
      headers:
        Content-Type: application/json
      request_body_expression: Concat(["{\"text\":\"Alert from ", steps["enrich_alert"].body["username"], ": ", webhook_payload["message"], "\"}"], "")
Webhook configuration panel with path, authentication, and rate limiting options Webhook configuration panel with path, authentication, and rate limiting options

Webhook authentication options:

Auth TypeConfigurationCaller Sends
bearerbearer_token: "{{ SECRET webhook_token }}"Authorization: Bearer <token>
api_keyapi_key: "{{ SECRET webhook_api_key }}", api_key_header: X-API-KeyX-API-Key: <key>
hmachmac_secret: "{{ SECRET hmac_secret }}", hmac_header: X-SignatureHMAC signature of body
none(no config needed)Nothing required

Pattern 3: Fan-Out (Get List, Then Details)

Fetch a list of items, then get details for each item in parallel.

flowchart TD classDef list fill:#E3F8EE,stroke:#1B7F5E,color:#0F3B2E; classDef split fill:#E9F5F4,stroke:#0F766E,color:#0F3B2E; classDef item fill:#E7F0FB,stroke:#2563EB,color:#1E3A8A; classDef merge fill:#E9F5F4,stroke:#0F766E,color:#0F3B2E; classDef output fill:#FCEADB,stroke:#EA580C,color:#7C2D12; L["Get User List"] F{"Fan-Out<br/>max_parallel: 10"} U1["User 1 Details"] U2["User 2 Details"] UN["User N Details"] M["Collect Results"] E["Emit Logs"] L --> F F --> U1 & U2 & UN U1 & U2 & UN --> M M --> E class L list; class F,M split; class U1,U2,UN item; class E output;
Fan-out pattern
nodes:
- name: user_details_workflow
  type: http_workflow_input
  workflow_pull_interval: 30m

  steps:
    # Step 1: Get list of users
    - name: get_users
      endpoint: https://api.example.com/users?limit=50
      headers:
        Authorization: "Bearer {{ SECRET api_token }}"

    # Step 2: For each user, get their profile
    - name: get_profile
      endpoint_expression: Concat(["https://api.example.com/users/", iterator_item["id"], "/profile"], "")
      headers:
        Authorization: "Bearer {{ SECRET api_token }}"
      is_last_step: true
      iterator:
        source: ParseJSON(steps["get_users"].body)["users"]
        variable_name: iterator_item
        max_parallel: 10
        continue_on_error: true
Iterator configuration with source expression, variable name, and max_parallel settings Iterator configuration with source expression, variable name, and max_parallel settings

Inside an iterator step, you can access:

VariableDescription
iterator_itemCurrent item from the list
iterator_item["field"]Field from current item
iterator_indexZero-based index (0, 1, 2…)

Warning: With max_parallel: 10, you’ll make 10 concurrent requests. If the target API has rate limits, lower this value or add retry_http_code: [429] to handle rate limit responses.

Fan-Out with Per-Item Caching

Cache each item’s response to avoid redundant API calls on subsequent runs. If a user’s profile was fetched recently, skip the API call and use the cached version.

flowchart TD classDef list fill:#E3F8EE,stroke:#1B7F5E,color:#0F3B2E; classDef cache fill:#ECEBFF,stroke:#4338CA,color:#312E81; classDef api fill:#E7F0FB,stroke:#2563EB,color:#1E3A8A; classDef skip fill:#F3F4F6,stroke:#9CA3AF,color:#6B7280; L["Get 100 Users"] F{"Fan-Out<br/>100 items"} C1{"User 1<br/>Cached?"} C2{"User 2<br/>Cached?"} CN{"User N<br/>Cached?"} HIT1["Use Cached"] HIT2["Use Cached"] API["Fetch from API"] SAVE["Cache Response"] L --> F F --> C1 F --> C2 F --> CN C1 -->|Hit| HIT1 C2 -->|Hit| HIT2 CN -->|Miss| API API --> SAVE class L list; class C1,C2,CN,SAVE cache; class API api; class HIT1,HIT2 skip;
Fan-out with per-item caching reduces API calls
nodes:
- name: cached_fan_out
  type: http_workflow_input
  workflow_pull_interval: 30m
  redis_address: "redis://localhost:6379"

  steps:
    - name: get_users
      endpoint: https://api.example.com/users
      headers:
        Authorization: "Bearer {{ SECRET api_token }}"

    - name: get_profile
      endpoint_expression: Concat(["https://api.example.com/users/", iterator_item["id"], "/profile"], "")
      headers:
        Authorization: "Bearer {{ SECRET api_token }}"
      is_last_step: true
      iterator:
        source: ParseJSON(steps["get_users"].body)["users"]
        variable_name: iterator_item
        max_parallel: 10
      # Cache each user's profile by their ID
      redis_cache:
        key_expression: Concat(["user-profile-", iterator_item["id"]], "")
        ttl: 1h
        check_before_request: true      # Check cache first
        populate_on_success: true       # Cache new responses

Result: On the first run, all 100 profiles are fetched. On subsequent runs within the TTL, only new/changed users trigger API calls—cached profiles are reused.

Pattern 3b: Caching Expensive API Calls

Some APIs are rate-limited, slow, or have per-call costs. Cache responses to minimize calls.

flowchart LR classDef check fill:#FEF3C7,stroke:#D97706,color:#78350F; classDef cache fill:#ECEBFF,stroke:#4338CA,color:#312E81; classDef api fill:#E7F0FB,stroke:#2563EB,color:#1E3A8A; classDef cost fill:#FEE2E2,stroke:#DC2626,color:#7F1D1D; REQ["Workflow<br/>Triggered"] CHK{"Cache<br/>Valid?"} HIT["Use Cached<br/>FREE"] MISS["Call API<br/>$0.01"] SAVE[("Save to<br/>Cache")] REQ --> CHK CHK -->|Hit| HIT CHK -->|Miss| MISS MISS --> SAVE class CHK check; class HIT,SAVE cache; class MISS cost;
Caching reduces expensive API calls
nodes:
- name: expensive_api_workflow
  type: http_workflow_input
  workflow_pull_interval: 5m           # Check every 5 minutes
  redis_address: "redis://localhost:6379"

  steps:
    # This API costs $0.01 per call - cache for 1 hour
    - name: fetch_expensive_data
      endpoint: https://api.expensive-service.com/data
      headers:
        Authorization: "Bearer {{ SECRET expensive_api_key }}"
      redis_cache:
        key_expression: '"expensive-data-cache"'
        ttl: 1h                         # Cache for 1 hour
        check_before_request: true      # Always check cache first
        populate_on_success: true       # Store successful responses

    # This API is rate-limited to 10 req/min - cache for 5 minutes
    - name: fetch_rate_limited
      endpoint: https://api.strict-limits.com/metrics
      headers:
        Authorization: "Bearer {{ SECRET metrics_api_key }}"
      redis_cache:
        key_expression: '"metrics-cache"'
        ttl: 5m
        check_before_request: true
        populate_on_success: true

Cost savings example:

ScenarioAPI Calls/DayCost @ $0.01/call
No caching (every 5 min)288$2.88
1-hour cache24$0.24
Savings264 calls$2.64/day

Pattern 4: Conditional Execution

Skip steps based on previous results.

steps:
  - name: check_status
    endpoint: https://api.example.com/status

  # Only runs if status is "active"
  - name: fetch_active_data
    endpoint: https://api.example.com/active-data
    run_condition: steps["check_status"].body["status"] == "active"

  # Only runs if status is NOT "active"
  - name: send_alert
    endpoint: https://alerts.example.com/webhook
    method: POST
    run_condition: steps["check_status"].body["status"] != "active"
    request_body: '{"message": "Service is not active"}'

Pattern 5: State Persistence (Markers/Cursors)

Persist state between workflow runs using Redis-only steps. Common use cases:

  • Pagination cursors: Remember where you left off
  • Timestamps: Only fetch records since last run
  • Deduplication: Track processed record IDs
flowchart LR classDef redis fill:#ECEBFF,stroke:#4338CA,color:#312E81; classDef api fill:#E7F0FB,stroke:#2563EB,color:#1E3A8A; classDef output fill:#FCEADB,stroke:#EA580C,color:#7C2D12; R1[("Redis<br/>Read Marker")] API["Fetch Records<br/>since: marker"] R2[("Redis<br/>Save New Marker")] OUT["Emit Logs"] R1 -->|"last_timestamp"| API API --> R2 API --> OUT R2 -.->|"next run"| R1 class R1,R2 redis; class API api; class OUT output;
State persistence with Redis markers
nodes:
- name: incremental_sync
  type: http_workflow_input
  workflow_pull_interval: 5m
  redis_address: "redis://localhost:6379"

  steps:
    # Step 1: Read last processed timestamp from Redis (no HTTP call)
    - name: get_marker
      redis_cache:
        key_expression: '"last-sync-timestamp"'
        check_before_request: true

    # Step 2: Fetch new records since the marker
    - name: fetch_new_records
      endpoint: https://api.example.com/records
      parameter_expressions:
        since: steps["get_marker"].body
        limit: '"100"'
      headers:
        Authorization: "Bearer {{ SECRET api_token }}"

    # Step 3: Save the new marker to Redis (no HTTP call)
    - name: save_marker
      redis_cache:
        key_expression: '"last-sync-timestamp"'
        value_expression: steps["fetch_new_records"].body["latest_timestamp"]
        populate_on_success: true
        ttl: 720h  # 30 days

How Redis-only steps work:

Step TypeConfigurationBehavior
Read markercheck_before_request: true, no endpointReads value from Redis, available as steps["name"].body
Write markerpopulate_on_success: true + value_expression, no endpointWrites computed value to Redis

Note: Steps with redis_cache configured but no endpoint are Redis-only steps. They don’t make HTTP calls—they only interact with Redis. Use value_expression to specify what to store.

Redis Caching

Cache responses to avoid redundant API calls. Essential for OAuth tokens.

flowchart TD classDef decision fill:#FEF3C7,stroke:#D97706,color:#78350F; classDef action fill:#E7F0FB,stroke:#2563EB,color:#1E3A8A; classDef cache fill:#ECEBFF,stroke:#4338CA,color:#312E81; classDef terminal fill:#E3F8EE,stroke:#1B7F5E,color:#0F3B2E; START(["Step Begins"]) CHECK{"check_before_request?"} LOOKUP["Lookup Cache Key"] HIT{"Cache Hit?"} USE["Use Cached Response"] EXEC["Execute HTTP Request"] POP{"populate_on_success?"} STORE["Store in Cache"] DONE(["Continue Workflow"]) START --> CHECK CHECK -->|No| EXEC CHECK -->|Yes| LOOKUP LOOKUP --> HIT HIT -->|Yes| USE HIT -->|No| EXEC USE --> DONE EXEC --> POP POP -->|Yes| STORE POP -->|No| DONE STORE --> DONE class CHECK,HIT,POP decision; class LOOKUP,EXEC action; class USE,STORE cache; class START,DONE terminal;
Cache decision flow

Quick Setup

nodes:
- name: cached_workflow
  type: http_workflow_input
  workflow_pull_interval: 5m

  # Global Redis connection
  redis_address: "redis://localhost:6379"
  redis_password: "{{ SECRET redis_password }}"  # Optional
  redis_tls: false

  steps:
    - name: cached_request
      endpoint: https://api.example.com/data
      redis_cache:
        key_expression: '"my-cache-key"'      # Cache key (must evaluate to string)
        ttl: 10m                               # Cache duration
        check_before_request: true             # Check cache first
        populate_on_success: true              # Store response on success
Redis cache configuration with key expression, TTL, and cache behavior options Redis cache configuration with key expression, TTL, and cache behavior options

Cache Configuration Reference

FieldTypeDescription
key_expressionstringOTTL expression for cache key
ttldurationTime-to-live (5m, 1h, 720h, etc.)
check_before_requestboolIf true, check cache before HTTP request
populate_on_successboolIf true, cache response after successful request
value_expressionstringOTTL expression for value to store (required for Redis-only SET steps)
skip_remaining_on_hitboolIf true, skip all remaining steps on cache hit

Complete Reference

Node-Level Parameters

ParameterTypeRequiredDefaultDescription
namestringYes-Unique node identifier
typestringYes-Must be http_workflow_input
stepsarrayYes-List of HTTP request steps
workflow_pull_intervaldurationOne trigger required1mPolling interval
workflow_pull_schedulestringOne trigger required-Cron expression
webhookobjectOne trigger required-Webhook configuration
global_timeoutdurationNo2mMax workflow execution time
emit_intermediateboolNofalseEmit logs for non-final steps
variablesmapNo-Global variables for all steps
redis_addressstringNo-Redis connection string
redis_passwordstringNo-Redis password
redis_tlsboolNofalseEnable TLS for Redis
redis_dbintNo0Redis database number

Step-Level Parameters

ParameterTypeRequiredDefaultDescription
namestringYes-Unique step identifier
endpointstringOne required-Static URL
endpoint_expressionstringOne required-Dynamic URL via OTTL
methodstringNoGETHTTP method
headersmapNo-Static headers
header_expressionsmapNo-Dynamic headers via OTTL
parametersmapNo-Query parameters
parameter_expressionsmapNo-Dynamic query params via OTTL
request_bodystringNo-Static request body
request_body_expressionstringNo-Dynamic body via OTTL
request_timeoutdurationNo30sTimeout for this step
run_conditionstringNo-OTTL condition to execute step
retry_http_codearray[int]No-HTTP codes that trigger retry
iteratorobjectNo-Fan-out configuration
redis_cacheobjectNo-Caching configuration
is_last_stepboolNofalseMark as terminal step

Webhook Configuration

FieldTypeDefaultDescription
pathstringRequiredURL path (e.g., /webhooks/alerts)
portint8080HTTP server port
auth_typestringnonenone, bearer, api_key, hmac
bearer_tokenstring-Token for bearer auth
api_keystring-Key for API key auth
api_key_headerstringX-API-KeyHeader name for API key
hmac_secretstring-Secret for HMAC validation
hmac_headerstringX-Webhook-SignatureHeader with HMAC signature
rate_limit_enabledboolfalseEnable rate limiting
rate_limit_max_per_minint60Max requests per minute

Iterator Configuration

FieldTypeDefaultDescription
sourcestringRequiredOTTL expression returning array
variable_namestringiterator_itemName for current item
max_parallelint10Concurrent iterations
continue_on_errorboolfalseContinue if iteration fails

Security

Built-in SSRF Protection

HTTP Workflow blocks requests to:

  • Localhost (127.0.0.1, ::1, localhost)
  • Private IPs (10.x, 172.16-31.x, 192.168.x)
  • Cloud metadata (169.254.169.254)
  • Non-HTTP schemes (file://, ftp://)

Credential Best Practices

# DO: Use EdgeDelta secrets (recommended)
headers:
  Authorization: "Bearer {{ SECRET api_token }}"

# DO: Use EDXEnv for environment variables
header_expressions:
  Authorization: Concat(["Bearer ", EDXEnv("API_TOKEN", "")], "")

# DON'T: Hardcode secrets
headers:
  Authorization: Bearer sk_live_abc123  # Never do this!

Note: Create secrets in the EdgeDelta UI: Settings > Secrets. Use lowercase names with underscores (e.g., api_token, oauth_client_secret).

Troubleshooting

No Data Retrieved

  1. Verify endpoint is accessible: Test with curl first
  2. Check authentication: Ensure env vars are set
  3. Review step names: Case-sensitive when referenced
  4. Enable debug logging:
    log:
      level: debug
    

Token Not Being Cached

  1. Verify Redis is reachable: redis-cli ping
  2. Check check_before_request: true is set
  3. Ensure key_expression evaluates to a string
  4. Confirm TTL is less than token expiry

Iterator Not Working

  1. Verify source returns an array (not object)
  2. Check previous step completed successfully
  3. Try continue_on_error: true for partial success

Webhook Not Receiving Requests

  1. Check port availability: lsof -i:8888
  2. Verify firewall allows incoming connections
  3. Confirm auth headers match configuration
  4. Check agent logs for 401 or 429 responses

Testing Your Workflow

Before deploying, test endpoints manually:

# Test OAuth endpoint
curl -X POST "https://auth.example.com/oauth/token" \
  -H "Content-Type: application/x-www-form-urlencoded" \
  -d "grant_type=client_credentials&client_id=<YOUR_CLIENT_ID>&client_secret=<YOUR_CLIENT_SECRET>"

# Test data endpoint with token
curl "https://api.example.com/data" \
  -H "Authorization: Bearer <YOUR_ACCESS_TOKEN>"

# Test webhook locally
curl -X POST "http://localhost:8888/webhooks/test" \
  -H "Authorization: Bearer <YOUR_WEBHOOK_SECRET>" \
  -H "Content-Type: application/json" \
  -d '{"event": "test", "user_id": "123"}'

See Also