Edge Delta Kafka Source
3 minute read
Overview
The Kafka source node consumes messages from Kafka topics. It can be configured for TLS and SASL configurations.
- outgoing_data_types: log
Example Configuration
- name: my_kafka_input
type: kafka_input
endpoint: "example.kafka:9092"
topic: "example_topic_123"
group_id: "group_456a"
Required Parameters
name
A descriptive name for the node. This is the name that will appear in Visual Pipelines and you can reference this node in the YAML using the name. It must be unique across all nodes. It is a YAML list element so it begins with a -
and a space followed by the string. It is a required parameter for all nodes.
nodes:
- name: <node name>
type: <node type>
type: kafka_input
The type
parameter specifies the type of node being configured. It is specified as a string from a closed list of node types. It is a required parameter.
nodes:
- name: <node name>
type: <node type>
Optional Parameters
commit_interval
The commit_interval
parameter defines how often the agent commits offsets back to the Kafka broker. If set to 0
, commits will be handled synchronously. It is only used when GroupID
is set. It is an optional parameter.
- name: my_kafka_input
type: kafka_input
endpoint: "<endpoint>"
topic: "<topic>"
group_id: "<my-group>"
commit_interval: "5s"
endpoint
The endpoint
parameter defines the Kafka broker from which the agent will collect data. Multiple brokers can be specified by comma separated values. It is an optional parameter.
- name: my_kafka_input
type: kafka_input
endpoint: "<endpoint>"
topic: "<topic>"
group_id: "<my-group>"
group_id
The group_id
parameter specifies the consumer group ID within the Kafka topic for group management and offset tracking. It is an optional parameter.
- name: my_kafka_input
type: kafka_input
endpoint: "<endpoint>"
topic: "<topic>"
group_id: "<my-group>"
max_batch_size
The max_batch_size
parameter defines the maximum number of messages the agent will poll in each batch. It is an optional parameter.
- name: my_kafka_input
type: kafka_input
endpoint: "<endpoint>"
topic: "<topic>"
group_id: "<my-group>"
max_batch_size: "1000"
queue_capacity
The queue_capacity
parameter defines the maximum number of messages that can be queued internally by the Kafka reader component of the agent. It is an optional parameter.
- name: my_kafka_input
type: kafka_input
endpoint: "<endpoint>"
topic: "<topic>"
group_id: "<my-group>"
queue_capacity: 5000
sasl
The sasl
block defines the authentication details such as username, password, and mechanism for SASL authentication with Kafka brokers. It is an optional parameter.
The following options can be set:
username
username for authenticationpassword
SASL password for authenticationmechanism
SASL mechanism for authentication. The mechanisms areplain
,scram-sha-256
, orscram-sha-512
.
- name: my_kafka_input
type: kafka_input
endpoint: "<endpoint>"
topic: "<topic>"
group_id: "<my-group>"
sasl:
username: "user"
password: "mypassword"
mechanism: "scram-sha-256"
tls
The tls
configuration block enables detailed customization of SSL/TLS settings for secure communication with brokers, including certificate checks, file paths, client authentication type, and supported TLS versions. It is an optional parameter.
The following options can be set:
ignore_certificate_check
Toggles certificate verification withtrue
orfalse
. Use with caution.ca_file
absolute file path to the CA certificate for SSL/TLS connections.ca_path
absolute path where CA certificate files are located for SSL/TLS.crt_file
absolute path to the SSL/TLS certificate file for secure communication.key_file
absolute path to the private key file used in SSL/TLS connections.key_password
Optional password for the key file.client_auth_type
Client auth type:noclientcert
(default),requestclientcert
,requireanyclientcert
,verifyclientcertifgiven
, orrequireandverifyclientcert
.min_version
Minimum supported TLS version:TLSv1_0
,TLSv1_1
,TLSv1_2
(default), orTLSv1_3
.max_version
Maximum supported TLS version:TLSv1_0
,TLSv1_1
,TLSv1_2
, orTLSv1_3
.
- name: my_kafka_input
type: kafka_input
endpoint: "<endpoint>"
topic: "<topic>"
group_id: "<my-group>"
tls:
ignore_certificate_check: false
ca_file: "/etc/edgedelta/ca.pem"
crt_file: "/etc/edgedelta/cert.pem"
key_file: "/etc/edgedelta/key.pem"
client_auth_type: "requireandverifyclientcert"
min_version: "TLSv1_2"
max_version: "TLSv1_3"
topic
The topic
parameter defines the Kafka topic from which the agent will collect data. It is an optional parameter.
- name: my_kafka_input
type: kafka_input
endpoint: "<endpoint>"
topic: "<topic>"
group_id: "<my-group>"