Edge Delta Kafka Input

Ingest logs from a Kafka topic.

Overview

The Kafka Input node consumes messages from Kafka topics. It can be configured for TLS and SASL configurations.

  • outgoing_data_types: log

Example Configuration

  - name: my_kafka_input
    type: kafka_input
    endpoint: "example.kafka:9092"
    topic: "example_topic_123"  
    group_id: "group_456a"

Required Parameters

name

A descriptive name for the node. This is the name that will appear in Visual Pipelines and you can reference this node in the yaml using the name. It must be unique across all nodes. It is a yaml list element so it begins with a - and a space followed by the string. It is a required parameter for all nodes.

nodes:
  - name: <node name>
    type: <node type>

type: kafka_input

The type parameter specifies the type of node being configured. It is specified as a string from a closed list of node types. It is a required parameter.

nodes:
  - name: <node name>
    type: <node type>

endpoint

The endpoint parameter defines the Kafka broker from which the agent will collect data. Multiple brokers can be specified by comma separated values. It is a required parameter.

  - name: my_kafka_input
    type: kafka_input
    endpoint: "<endpoint>"
    topic: "<topic>"  
    group_id: "<my-group>"

topic

The topic parameter defines the Kafka topic from which the agent will collect data. It is a required parameter.

  - name: my_kafka_input
    type: kafka_input
    endpoint: "<endpoint>"
    topic: "<topic>"  
    group_id: "<my-group>"

group_id

The group_id parameter specifies the consumer group ID within the Kafka topic for group management and offset tracking. It is a required parameter.

  - name: my_kafka_input
    type: kafka_input
    endpoint: "<endpoint>"
    topic: "<topic>"  
    group_id: "<my-group>"

Optional Parameters

max_batch_size

The max_batch_size parameter defines the maximum number of messages the agent will poll in each batch. It is an optional parameter.

  - name: my_kafka_input
    type: kafka_input
    endpoint: "<endpoint>"
    topic: "<topic>"  
    group_id: "<my-group>"
    max_batch_size: "1000"  

queue_capacity

The queue_capacity parameter defines the maximum number of messages that can be queued internally by the Kafka reader component of the agent. It is an optional parameter.

  - name: my_kafka_input
    type: kafka_input
    endpoint: "<endpoint>"
    topic: "<topic>"  
    group_id: "<my-group>"
    queue_capacity: 5000

commit_interval

The commit_interval parameter defines how often the agent commits offsets back to the Kafka broker. If set to 0, commits will be handled synchronously. It is only used when GroupID is set. It is an optional parameter.

  - name: my_kafka_input
    type: kafka_input
    endpoint: "<endpoint>"
    topic: "<topic>"  
    group_id: "<my-group>"
    commit_interval: "5s" 

tls

The tls configuration block enables detailed customization of SSL/TLS settings for secure communication with brokers, including certificate checks, file paths, client authentication type, and supported TLS versions. The following options can be set:

  • ignore_certificate_check Toggles certificate verification with true or false. Use with caution.
  • ca_file absolute file path to the CA certificate for SSL/TLS connections.
  • ca_path absolute path where CA certificate files are located for SSL/TLS.
  • crt_file absolute path to the SSL/TLS certificate file for secure communication.
  • key_file absolute path to the private key file used in SSL/TLS connections.
  • key_password Optional password for the key file.
  • client_auth_type Client auth type: noclientcert (default), requestclientcert, requireanyclientcert, verifyclientcertifgiven, or requireandverifyclientcert.
  • min_version Minimum supported TLS version: TLSv1_0, TLSv1_1, TLSv1_2 (default), or TLSv1_3.
  • max_version Maximum supported TLS version: TLSv1_0, TLSv1_1, TLSv1_2, or TLSv1_3.
  - name: my_kafka_input
    type: kafka_input
    endpoint: "<endpoint>"
    topic: "<topic>"  
    group_id: "<my-group>"
    tls:
      ignore_certificate_check: false  
      ca_file: "/etc/edgedelta/ca.pem"  
      crt_file: "/etc/edgedelta/cert.pem"  
      key_file: "/etc/edgedelta/key.pem"  
      client_auth_type: "requireandverifyclientcert" 
      min_version: "TLSv1_2"  
      max_version: "TLSv1_3" 

sasl

The sasl block defines the authentication details such as username, password, and mechanism for SASL authentication with Kafka brokers.

The following options can be set:

  • username username for authentication
  • password SASL password for authentication
  • mechanism SASL mechanism for authentication. The mechanisms are plain, scram-sha-256, or scram-sha-512.
  - name: my_kafka_input
    type: kafka_input
    endpoint: "<endpoint>"
    topic: "<topic>"  
    group_id: "<my-group>"
    sasl:
      username: "user"  
      password: "mypassword"  
      mechanism: "scram-sha-256"