Kafka Connector
7 minute read
Overview
The Kafka connector collects and streams data from Apache Kafka topics in real-time. Apache Kafka is a distributed event streaming platform handling trillions of events per day. Edge Delta acts as a Kafka consumer, ingesting telemetry data, application logs, events, and metrics. Content streams into Edge Delta Pipelines for analysis by AI teammates through the Edge Delta MCP connector.
The connector supports Apache Kafka 0.10+, Confluent Cloud, AWS MSK, and managed Kafka services. It handles consumer group management, offset tracking, and reliable message consumption.
When you add this streaming connector, it appears as a Kafka source in your selected pipeline. AI teammates access this data by querying the Edge Delta backend with the Edge Delta MCP connector.
Add the Kafka Connector
To add the Kafka connector, you configure Edge Delta as a Kafka consumer with broker addresses, topic names, and authentication credentials.
Prerequisites
Before configuring the connector, ensure you have:
- Access to Apache Kafka cluster (self-hosted, Confluent Cloud, AWS MSK, or managed service)
- Kafka broker addresses and port numbers (typically 9092 for plaintext or 9093 for SSL/TLS)
- Authentication credentials if cluster requires SASL or SSL/TLS
- Identified Kafka topics containing logs, metrics, or events to analyze
Configuration Steps
- Navigate to AI Team > Connectors in the Edge Delta application
- Find the Kafka connector in Streaming Connectors
- Click the connector card
- Configure the Endpoint with broker addresses (comma-separated)
- Specify the Topic to consume from
- Optionally configure Advanced Settings for Group ID, batch size, TLS, or SASL
- Select a target environment
- Click Save
The connector deploys and begins consuming messages from specified Kafka topics.

Configuration Options
Connector Name
Name to identify this Kafka connector instance.
Endpoint
Kafka brokers endpoint for collecting data. Multiple brokers can be specified comma-separated.
Format: host1:port1,host2:port2,host3:port3
Examples:
localhost:9092
- Single brokerkafka-broker-1:9092,kafka-broker-2:9092,kafka-broker-3:9092
- Multiple brokerspkc-abc123.us-east-1.aws.confluent.cloud:9092
- Confluent Cloudb-1.mycluster.xyz.kafka.us-east-1.amazonaws.com:9092
- AWS MSK
Port Notes:
9092
- Plaintext connections9093
- SSL/TLS encrypted connections
Topic
Kafka topic to collect data from. Specify topic name to subscribe to.
Format: Topic name string
Examples:
application-logs
- Single topicsystem-metrics
- Metrics topicuser-events
- Events topic
Advanced Settings
Group ID
Consumer group ID within the specified topic name. Kafka uses this to coordinate partition assignment and track consumption offsets.
Default: edgedelta_consumer_group
Examples:
edgedelta-production
- Production consumer groupedgedelta-logs-consumer
- Logs-specific groupedgedelta-event-correlation
- Event processing group
Note: All agents with same Group ID share partition consumption. Use different Group IDs for independent consumption.
Max Batch Size
Maximum batch size that agents should accept.
Use Cases:
- Control memory usage
- Limit processing batch size
- Tune performance for message size
Queue Capacity
Capacity of the internal message queue of the Kafka reader component.
Use Cases:
- Buffer messages during processing
- Handle burst traffic
- Manage backpressure
Commit Interval
Interval at which offsets are committed to Kafka broker. If set to 0, commits handled synchronously. Only used when Group ID is set.
Format: Duration in milliseconds
Examples:
5000
- Commit every 5 seconds10000
- Commit every 10 seconds0
- Synchronous commits (every message)
Trade-offs:
- Lower values: More frequent commits, less data loss on failure, higher overhead
- Higher values: Less overhead, potential duplicate processing on restart
TLS
Optional TLS/SSL configuration for encrypted connections.
Configuration Options:
- Ignore Certificate Check: Disables SSL/TLS certificate verification. Use with caution in testing environments only.
- CA File: Absolute file path to the CA certificate for SSL/TLS connections
- CA Path: Absolute path where CA certificate files are located
- CRT File: Absolute path to the SSL/TLS certificate file
- Key File: Absolute path to the private key file
- Key Password: Optional password for the key file
- Client Auth Type: Client authentication type. Default is noclientcert.
- Minimum Version: Minimum TLS version. Default is TLSv1_2.
- Maximum Version: Maximum TLS version allowed for connections
When to Enable:
- Production deployments
- Public Kafka clusters
- Confluent Cloud
- AWS MSK with encryption
SASL
SASL authentication settings for secure broker connections.
Configuration Options:
- Username: Username for SASL authentication
- For Confluent Cloud: API key
- For self-hosted: Configured username
- Password: Password for SASL authentication
- For Confluent Cloud: API secret
- For self-hosted: User password
- Mechanism: SASL mechanism for authentication
- Options: PLAIN, SCRAM-SHA-256, SCRAM-SHA-512, AWS_MSK_IAM
Mechanism Selection:
- PLAIN: Username/password over TLS (Confluent Cloud)
- SCRAM-SHA-256: Secure challenge-response (recommended)
- SCRAM-SHA-512: Higher security SCRAM variant
- AWS_MSK_IAM: IAM-based auth for AWS MSK
Security Note: Always use SASL with TLS for production. PLAIN mechanism requires TLS to protect credentials.
Metadata Level
This option is used to define which detected resources and attributes to add to each data item as it is ingested by Edge Delta. You can select:
- Required Only: This option includes the minimum required resources and attributes for Edge Delta to operate.
- Default: This option includes the required resources and attributes plus those selected by Edge Delta
- High: This option includes the required resources and attributes along with a larger selection of common optional fields.
- Custom: With this option selected, you can choose which attributes and resources to include. The required fields are selected by default and can’t be unchecked.
Based on your selection in the GUI, the source_metadata
YAML is populated as two dictionaries (resource_attributes
and attributes
) with Boolean values.
See Choose Data Item Metadata for more information on selecting metadata.
Kafka-specific metadata included:
messaging.kafka.consumer.group
- Consumer group IDmessaging.kafka.destination.partition
- Partition number
Rate Limit
Rate limit configuration to control message consumption rate and manage processing capacity.
Target Environments
Select the Edge Delta pipeline (environment) where you want to deploy this connector.
Consumer Group Behavior:
- Same Group ID across environments: Load-balanced consumption (messages split)
- Different Group IDs: Independent consumption (all messages to each)
How to Use the Kafka Connector
The Kafka connector integrates seamlessly with AI Team, enabling real-time analysis of streaming data from Kafka topics. AI teammates automatically leverage the ingested data based on the queries they receive and the context of the conversation.
Use Case: Real-Time Application Log Analysis
Collect application logs from microservices publishing to Kafka topics for immediate analysis. AI teammates identify error patterns, correlate with deployment events, and provide resolution recommendations within seconds. When combined with PagerDuty alerts, teammates automatically query Kafka-sourced logs during incident investigation to identify root causes across distributed services.
Configuration: Endpoint: kafka-prod:9092,kafka-prod-2:9092
, Topic: app-logs-production
, Group ID: edgedelta-prod-logs
, SASL: SCRAM-SHA-256
, TLS: Enabled
Use Case: Multi-Service Event Correlation
Correlate events across multiple Kafka topics to reconstruct user journeys and identify failure points in complex workflows. AI teammates trace sessions across services (auth, orders, payments), identify where transactions fail, and pinpoint root causes even when failures manifest in downstream services. This is valuable for troubleshooting e-commerce checkouts or multi-step business processes.
Configuration: Configure separate connectors for each topic (user-events
, order-events
, payment-events
) with same Group ID, or use different Group IDs for independent consumption.
Use Case: System Metrics Anomaly Detection
Monitor metrics flowing through Kafka (API response times, connection pools, error rates) for anomalies indicating performance degradation. AI teammates analyze current values against baselines, identify deviations, and correlate anomalies across systems. When combined with Jira integration, teammates automatically document performance issues by querying metric trends and creating tickets with diagnostic details.
Configuration: Endpoint: confluent-cloud:9092
, Topic: system-metrics
, SASL: PLAIN
(Confluent API key/secret), TLS: Enabled
Troubleshooting
Connection failures: Verify broker addresses reachable from agent network (telnet kafka-broker:9092
). Confirm topics exist (kafka-topics --list --bootstrap-server <broker>
). Check consumer group connecting (kafka-consumer-groups --describe --group <group> --bootstrap-server <broker>
).
SASL authentication failed: Verify SASL mechanism matches cluster configuration. For Confluent Cloud, use PLAIN with API key as username and API secret as password. For AWS MSK IAM, ensure agent IAM role has Kafka cluster permissions. Check credentials for typos (case-sensitive).
SSL handshake failed: Verify brokers configured for TLS on specified port (typically 9093). Check certificate validity (openssl s_client -connect kafka-broker:9093 -showcerts
). For self-signed certificates, disable Verify Certificate (testing only). For production, ensure Edge Delta trusts your CA.
High consumer lag: Increase Edge Delta agents to distribute partition consumption. Optimize processing pipeline (simplify regex, reduce aggregations). Increase topic partition count for greater parallelism. Monitor partition assignment (kafka-consumer-groups --describe
).
Offset out of range: Committed offset points to deleted message (retention expired). Reset offset to Latest (skip gap) or Earliest (oldest available). Delete consumer group offsets to force new strategy (kafka-consumer-groups --delete --group <group> --bootstrap-server <broker>
).
Messages malformed/unparsed: Verify message format matches expectations (kafka-console-consumer --topic <topic> --bootstrap-server <broker> --max-messages 5
). Check messages are valid JSON without extra text. For non-JSON formats (Avro, Protobuf), add custom processing to deserialize.
Messages split between pipelines: Multiple deployments sharing same Group ID causes Kafka to distribute partitions (load balancing). For independent consumption, use different Group IDs per pipeline. For load balancing, use same Group ID with enough partitions for parallelism.
Next Steps
- Learn about Kafka input node for advanced configuration and offset management
- Explore Kafka output preparation for sending data to Kafka
- Learn about creating custom teammates that can use Kafka streaming data
For additional help, visit AI Team Support.