Send Data from Edge Delta to Snowflake via Kafka
5 minute read
Overview
There is no dedicated Snowflake destination node. Instead, you can deliver Edge Delta data to Snowflake in two supported ways:
- Streaming (recommended): Edge Delta sends data to Kafka, which is then consumed by Kafka Connect Snowflake Sink and ingested into Snowflake using Snowpipe Streaming.
- Batch (alternative): Edge Delta sends data to S3, and Snowpipe automatically ingests the files into Snowflake. You can configure Edge Delta to send archives to S3 with the S3 destination node and set up Snowpipe to auto-load new files. See Prepare to send items to an S3 destination.
In the streaming pattern, the Edge Delta agent sends data to a Kafka topic using the Kafka destination node. Kafka Connect then writes those events to Snowflake in near real time.
- Low latency: seconds‑level availability via Snowpipe Streaming.
- Flexible data: logs, metrics, signals, cluster patterns & samples, and custom data.
- Scale: works with self‑managed Kafka or AWS MSK.
Prerequisites
- Kafka cluster (self‑hosted or MSK) and Kafka Connect with the Snowflake Kafka Connector JARs available.
- Snowflake account with privileges to create roles, users, DB/schema, and tables.
- (Optional demo) Docker for a local Kafka/ZooKeeper/Kafka Connect stack.
Step 1: Prepare Snowflake
Create the database/schema and (optionally) a target table.
CREATE DATABASE TEST_DATABASE;
USE DATABASE TEST_DATABASE;
USE SCHEMA PUBLIC;
-- Optional: schemaless variant table for quick testing
-- CREATE OR REPLACE TABLE KAFKA_DEMO_TABLE (DATA VARIANT);
SELECT * FROM KAFKA_DEMO_TABLE; -- sanity check later
Create a connector role/user and grant privileges:
USE ROLE ACCOUNTADMIN;
CREATE ROLE KAFKA_CONNECTOR_ROLE;
GRANT ALL PRIVILEGES ON DATABASE TEST_DATABASE TO ROLE KAFKA_CONNECTOR_ROLE;
GRANT CREATE TABLE ON SCHEMA TEST_DATABASE.PUBLIC TO ROLE KAFKA_CONNECTOR_ROLE;
GRANT ALL PRIVILEGES ON ALL SCHEMAS IN DATABASE TEST_DATABASE TO ROLE KAFKA_CONNECTOR_ROLE;
GRANT ALL PRIVILEGES ON ALL TABLES IN DATABASE TEST_DATABASE TO ROLE KAFKA_CONNECTOR_ROLE;
GRANT ALL PRIVILEGES ON FUTURE SCHEMAS IN DATABASE TEST_DATABASE TO ROLE KAFKA_CONNECTOR_ROLE;
GRANT ALL PRIVILEGES ON FUTURE TABLES IN DATABASE TEST_DATABASE TO ROLE KAFKA_CONNECTOR_ROLE;
CREATE USER KAFKA_CONNECTOR_USER
PASSWORD = 'StrongPassword123'
MUST_CHANGE_PASSWORD = FALSE
DEFAULT_ROLE = KAFKA_CONNECTOR_ROLE
COMMENT = 'User for Kafka -> Snowflake connector';
GRANT ROLE KAFKA_CONNECTOR_ROLE TO USER KAFKA_CONNECTOR_USER;
SHOW GRANTS TO ROLE KAFKA_CONNECTOR_ROLE;
Add an RSA public key for the connector user
On a secure workstation:
openssl genrsa 2048 | openssl pkcs8 -topk8 -inform PEM -out rsa_key.p8 -nocrypt
openssl rsa -in rsa_key.p8 -pubout -out rsa_key.pub
Paste the public key into Snowflake:
ALTER USER KAFKA_CONNECTOR_USER
SET RSA_PUBLIC_KEY='-----BEGIN PUBLIC KEY-----
MIIB...AB
-----END PUBLIC KEY-----';
Extract the private key body (no header/footer, no newlines) for the connector config:
sed -n '/-----BEGIN PRIVATE KEY-----/,/-----END PRIVATE KEY-----/p' rsa_key.p8 | sed '1d;$d' | tr -d '\n'
Security note: Treat the private key as a secret. Use a secrets manager in production.
Step 2: Run Kafka, ZooKeeper, and Kafka Connect (Docker demo)
This docker-compose.yml
exposes Kafka inside the network at kafka:29092
and on the host at localhost:9092
. Kafka Connect mounts ./jars
for the Snowflake connector JARs.
services:
zookeeper:
image: confluentinc/cp-zookeeper:7.4.0
container_name: zookeeper
ports:
- "2181:2181"
environment:
ZOOKEEPER_CLIENT_PORT: 2181
kafka:
image: confluentinc/cp-kafka:7.4.0
container_name: kafka
depends_on:
- zookeeper
ports:
- "9092:9092"
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_LISTENERS: INSIDE_DOCKER://0.0.0.0:29092,OUTSIDE_DOCKER://0.0.0.0:9092
KAFKA_ADVERTISED_LISTENERS: INSIDE_DOCKER://kafka:29092,OUTSIDE_DOCKER://localhost:9092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INSIDE_DOCKER:PLAINTEXT,OUTSIDE_DOCKER:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: INSIDE_DOCKER
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
kafka-connect:
image: confluentinc/cp-kafka-connect:7.4.0
container_name: kafka_connect
depends_on:
- kafka
ports:
- "8083:8083"
environment:
CONNECT_BOOTSTRAP_SERVERS: "kafka:29092"
CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1
CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1
CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1
CONNECT_REST_ADVERTISED_HOST_NAME: "kafka_connect"
CONNECT_GROUP_ID: "1"
CONNECT_CONFIG_STORAGE_TOPIC: "docker-connect-configs"
CONNECT_OFFSET_STORAGE_TOPIC: "docker-connect-offsets"
CONNECT_STATUS_STORAGE_TOPIC: "docker-connect-status"
CONNECT_KEY_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
CONNECT_VALUE_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
CONNECT_INTERNAL_KEY_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
CONNECT_INTERNAL_VALUE_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
CONNECT_PLUGIN_PATH: "/usr/share/java,/etc/kafka-connect/jars"
volumes:
- ./jars:/etc/kafka-connect/jars
Bring it up and confirm:
docker compose up -d
docker container ls
Step 3: Create the Snowflake Sink Connector
Create connector_config.json
with your Snowflake details and the private key you extracted above:
{
"name": "snowflake-connector",
"config": {
"connector.class": "com.snowflake.kafka.connector.SnowflakeSinkConnector",
"tasks.max": "1",
"topics": "test-topic",
"snowflake.url.name": "xma45480.snowflakecomputing.com",
"snowflake.user.name": "KAFKA_CONNECTOR_USER",
"snowflake.user.role": "KAFKA_CONNECTOR_ROLE",
"snowflake.private.key": "<PASTE_PRIVATE_KEY_NO_HEADERS>",
"snowflake.database.name": "TEST_DATABASE",
"snowflake.schema.name": "PUBLIC",
"snowflake.table.name": "KAFKA_DEMO_TABLE",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "false",
"buffer.count.records": "10000",
"buffer.flush.time": "60",
"buffer.size.bytes": "5242880"
}
}
Register the connector:
curl -i -X POST -H "Content-Type: application/json" --data @connector_config.json http://localhost:8083/connectors
A 201 Created
response indicates the connector was registered.
Step 4: Configure the Edge Delta Agent (Kafka)
Use the pipeline builder to add a Kafka destination node, then route the data you want (logs, metrics, signals, cluster patterns & samples, custom).
Minimal example (local demo)
nodes:
- name: kafka-main
type: kafka_output
endpoint: localhost:9092
topic: test-topic
Secure example (TLS + SASL)
nodes:
- name:kafka-secure
type: kafka_output
endpoint: broker1:9093,broker2:9093
topic: prod-edgedelta
tls:
ignore_certificate_check: false
ca_file: "/etc/edgedelta/ca.pem"
crt_file: "/etc/edgedelta/cert.pem"
key_file: "/etc/edgedelta/key.pem"
client_auth_type: "requestclientcert"
min_version: "TLSv1_2"
max_version: "TLSv1_3"
sasl:
username: "edgedelta-user"
password: "edgedelta-pass"
mechanism: "scram-sha-512"
Step 5: Validate End‑to‑End
Kafka topic (quick peek):
kafka-console-consumer --bootstrap-server localhost:9092 --topic test-topic --from-beginning
You should see JSON events from Edge Delta.
Snowflake table:
USE DATABASE TEST_DATABASE;
USE SCHEMA PUBLIC;
SELECT * FROM KAFKA_DEMO_TABLE LIMIT 50;
Rows should start appearing within seconds of events landing in Kafka.
Alternative: Batch via S3 + Snowpipe (High Level)
If seconds‑level recency isn’t required, you can flush archives to S3 using the S3 destination node, then configure a Snowpipe on that bucket to auto‑ingest. Before configuring your node, follow the S3 preparation steps in Send Data to S3.
nodes:
- name: s3-archive
type: s3_output
bucket: "<YOUR_BUCKET>"
region: "<YOUR_REGION>"
encoding: parquet
compression: gzip
use_native_compression: true
path_prefix:
order:
- Year
- Month
- Day
- Hour
- Minute
- tag
- host
format: "ver=parquet/year=%s/month=%s/day=%s/hour=%s/min=%s/tag=%s/host=%s/"
Troubleshooting
Connector not creating/writing the table
- Confirm
KAFKA_CONNECTOR_ROLE
grants (CREATE TABLE on the schema; privileges on database/schema/tables). - Ensure
snowflake.table.name
is correct.
Auth failures
- Verify the RSA public key was set on
KAFKA_CONNECTOR_USER
and that the private key inconnector_config.json
is a single line with no headers/footers.
No data in Snowflake
- Check Kafka topic has messages.
- Inspect connector status:
curl http://localhost:8083/connectors/snowflake-connector/status
- Review Kafka Connect logs for connector errors.
Throughput/latency tuning
- Tune connector buffers (
buffer.count.records
,buffer.flush.time
,buffer.size.bytes
). - Tune Edge Delta Kafka node (
batch_size
,batch_bytes
,batch_timeout
).
Production Hardening
For production hardening, you should enable TLS and SASL for Kafka and store all secrets in a secure secret manager. Run Kafka Connect in a high-availability configuration with persistent volumes, and continuously monitor task status and connector lag. For reliability at scale, it’s best to use a managed Kafka service such as AWS MSK.
To achieve higher throughput, partition topics appropriately and increase tasks.max
as needed. You should also decide on a consistent event schema, either standardized JSON fields or Snowflake’s VARIANT type, for downstream analytics. Finally, make sure to monitor Snowflake credit consumption and warehouse usage to manage costs effectively.