Send Data from Edge Delta to Snowflake via Kafka

Configure Edge Delta to stream data into Snowflake using Kafka (Snowpipe Streaming), or batch via S3 + Snowpipe.

Overview

There is no dedicated Snowflake destination node. Instead, you can deliver Edge Delta data to Snowflake in two supported ways:

  • Streaming (recommended): Edge Delta sends data to Kafka, which is then consumed by Kafka Connect Snowflake Sink and ingested into Snowflake using Snowpipe Streaming.
  • Batch (alternative): Edge Delta sends data to S3, and Snowpipe automatically ingests the files into Snowflake. You can configure Edge Delta to send archives to S3 with the S3 destination node and set up Snowpipe to auto-load new files. See Prepare to send items to an S3 destination.

In the streaming pattern, the Edge Delta agent sends data to a Kafka topic using the Kafka destination node. Kafka Connect then writes those events to Snowflake in near real time.

  • Low latency: seconds‑level availability via Snowpipe Streaming.
  • Flexible data: logs, metrics, signals, cluster patterns & samples, and custom data.
  • Scale: works with self‑managed Kafka or AWS MSK.

Prerequisites

  • Kafka cluster (self‑hosted or MSK) and Kafka Connect with the Snowflake Kafka Connector JARs available.
  • Snowflake account with privileges to create roles, users, DB/schema, and tables.
  • (Optional demo) Docker for a local Kafka/ZooKeeper/Kafka Connect stack.

Step 1: Prepare Snowflake

Create the database/schema and (optionally) a target table.

CREATE DATABASE TEST_DATABASE;
USE DATABASE TEST_DATABASE;
USE SCHEMA PUBLIC;

-- Optional: schemaless variant table for quick testing
-- CREATE OR REPLACE TABLE KAFKA_DEMO_TABLE (DATA VARIANT);

SELECT * FROM KAFKA_DEMO_TABLE; -- sanity check later

Create a connector role/user and grant privileges:

USE ROLE ACCOUNTADMIN;

CREATE ROLE KAFKA_CONNECTOR_ROLE;

GRANT ALL PRIVILEGES ON DATABASE TEST_DATABASE TO ROLE KAFKA_CONNECTOR_ROLE;
GRANT CREATE TABLE ON SCHEMA TEST_DATABASE.PUBLIC TO ROLE KAFKA_CONNECTOR_ROLE;
GRANT ALL PRIVILEGES ON ALL SCHEMAS IN DATABASE TEST_DATABASE TO ROLE KAFKA_CONNECTOR_ROLE;
GRANT ALL PRIVILEGES ON ALL TABLES IN DATABASE TEST_DATABASE TO ROLE KAFKA_CONNECTOR_ROLE;
GRANT ALL PRIVILEGES ON FUTURE SCHEMAS IN DATABASE TEST_DATABASE TO ROLE KAFKA_CONNECTOR_ROLE;
GRANT ALL PRIVILEGES ON FUTURE TABLES IN DATABASE TEST_DATABASE TO ROLE KAFKA_CONNECTOR_ROLE;

CREATE USER KAFKA_CONNECTOR_USER
  PASSWORD = 'StrongPassword123'
  MUST_CHANGE_PASSWORD = FALSE
  DEFAULT_ROLE = KAFKA_CONNECTOR_ROLE
  COMMENT = 'User for Kafka -> Snowflake connector';

GRANT ROLE KAFKA_CONNECTOR_ROLE TO USER KAFKA_CONNECTOR_USER;

SHOW GRANTS TO ROLE KAFKA_CONNECTOR_ROLE;

Add an RSA public key for the connector user

On a secure workstation:

openssl genrsa 2048 | openssl pkcs8 -topk8 -inform PEM -out rsa_key.p8 -nocrypt
openssl rsa -in rsa_key.p8 -pubout -out rsa_key.pub

Paste the public key into Snowflake:

ALTER USER KAFKA_CONNECTOR_USER
SET RSA_PUBLIC_KEY='-----BEGIN PUBLIC KEY-----
MIIB...AB
-----END PUBLIC KEY-----';

Extract the private key body (no header/footer, no newlines) for the connector config:

sed -n '/-----BEGIN PRIVATE KEY-----/,/-----END PRIVATE KEY-----/p' rsa_key.p8   | sed '1d;$d' | tr -d '\n'

Security note: Treat the private key as a secret. Use a secrets manager in production.

Step 2: Run Kafka, ZooKeeper, and Kafka Connect (Docker demo)

This docker-compose.yml exposes Kafka inside the network at kafka:29092 and on the host at localhost:9092. Kafka Connect mounts ./jars for the Snowflake connector JARs.

services:
  zookeeper:
    image: confluentinc/cp-zookeeper:7.4.0
    container_name: zookeeper
    ports:
      - "2181:2181"
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181

  kafka:
    image: confluentinc/cp-kafka:7.4.0
    container_name: kafka
    depends_on:
      - zookeeper
    ports:
      - "9092:9092"
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_LISTENERS: INSIDE_DOCKER://0.0.0.0:29092,OUTSIDE_DOCKER://0.0.0.0:9092
      KAFKA_ADVERTISED_LISTENERS: INSIDE_DOCKER://kafka:29092,OUTSIDE_DOCKER://localhost:9092
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INSIDE_DOCKER:PLAINTEXT,OUTSIDE_DOCKER:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: INSIDE_DOCKER
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0

  kafka-connect:
    image: confluentinc/cp-kafka-connect:7.4.0
    container_name: kafka_connect
    depends_on:
      - kafka
    ports:
      - "8083:8083"
    environment:
      CONNECT_BOOTSTRAP_SERVERS: "kafka:29092"
      CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_REST_ADVERTISED_HOST_NAME: "kafka_connect"
      CONNECT_GROUP_ID: "1"
      CONNECT_CONFIG_STORAGE_TOPIC: "docker-connect-configs"
      CONNECT_OFFSET_STORAGE_TOPIC: "docker-connect-offsets"
      CONNECT_STATUS_STORAGE_TOPIC: "docker-connect-status"
      CONNECT_KEY_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
      CONNECT_VALUE_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
      CONNECT_INTERNAL_KEY_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
      CONNECT_INTERNAL_VALUE_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
      CONNECT_PLUGIN_PATH: "/usr/share/java,/etc/kafka-connect/jars"
    volumes:
      - ./jars:/etc/kafka-connect/jars

Bring it up and confirm:

docker compose up -d
docker container ls

Step 3: Create the Snowflake Sink Connector

Create connector_config.json with your Snowflake details and the private key you extracted above:

{
  "name": "snowflake-connector",
  "config": {
    "connector.class": "com.snowflake.kafka.connector.SnowflakeSinkConnector",
    "tasks.max": "1",
    "topics": "test-topic",

    "snowflake.url.name": "xma45480.snowflakecomputing.com",
    "snowflake.user.name": "KAFKA_CONNECTOR_USER",
    "snowflake.user.role": "KAFKA_CONNECTOR_ROLE",
    "snowflake.private.key": "<PASTE_PRIVATE_KEY_NO_HEADERS>",
    "snowflake.database.name": "TEST_DATABASE",
    "snowflake.schema.name": "PUBLIC",
    "snowflake.table.name": "KAFKA_DEMO_TABLE",

    "key.converter": "org.apache.kafka.connect.json.JsonConverter",
    "value.converter": "org.apache.kafka.connect.json.JsonConverter",
    "value.converter.schemas.enable": "false",

    "buffer.count.records": "10000",
    "buffer.flush.time": "60",
    "buffer.size.bytes": "5242880"
  }
}

Register the connector:

curl -i -X POST -H "Content-Type: application/json"   --data @connector_config.json   http://localhost:8083/connectors

A 201 Created response indicates the connector was registered.

Step 4: Configure the Edge Delta Agent (Kafka)

Use the pipeline builder to add a Kafka destination node, then route the data you want (logs, metrics, signals, cluster patterns & samples, custom).

Minimal example (local demo)

nodes:
  - name: kafka-main
    type: kafka_output
    endpoint: localhost:9092
    topic: test-topic

Secure example (TLS + SASL)

nodes:
  - name:kafka-secure
    type: kafka_output
    endpoint: broker1:9093,broker2:9093
    topic: prod-edgedelta
    tls:
      ignore_certificate_check: false
      ca_file: "/etc/edgedelta/ca.pem"
      crt_file: "/etc/edgedelta/cert.pem"
      key_file: "/etc/edgedelta/key.pem"
      client_auth_type: "requestclientcert"
      min_version: "TLSv1_2"
      max_version: "TLSv1_3"
    sasl:
      username: "edgedelta-user"
      password: "edgedelta-pass"
      mechanism: "scram-sha-512"

Step 5: Validate End‑to‑End

Kafka topic (quick peek):

kafka-console-consumer --bootstrap-server localhost:9092 --topic test-topic --from-beginning

You should see JSON events from Edge Delta.

Snowflake table:

USE DATABASE TEST_DATABASE;
USE SCHEMA PUBLIC;
SELECT * FROM KAFKA_DEMO_TABLE LIMIT 50;

Rows should start appearing within seconds of events landing in Kafka.

Alternative: Batch via S3 + Snowpipe (High Level)

If seconds‑level recency isn’t required, you can flush archives to S3 using the S3 destination node, then configure a Snowpipe on that bucket to auto‑ingest. Before configuring your node, follow the S3 preparation steps in Send Data to S3.

nodes:
  - name: s3-archive
    type: s3_output
    bucket: "<YOUR_BUCKET>"
    region: "<YOUR_REGION>"
    encoding: parquet
    compression: gzip
    use_native_compression: true
    path_prefix:
      order:
        - Year
        - Month
        - Day
        - Hour
        - Minute
        - tag
        - host
      format: "ver=parquet/year=%s/month=%s/day=%s/hour=%s/min=%s/tag=%s/host=%s/"

Troubleshooting

Connector not creating/writing the table

  • Confirm KAFKA_CONNECTOR_ROLE grants (CREATE TABLE on the schema; privileges on database/schema/tables).
  • Ensure snowflake.table.name is correct.

Auth failures

  • Verify the RSA public key was set on KAFKA_CONNECTOR_USER and that the private key in connector_config.json is a single line with no headers/footers.

No data in Snowflake

  • Check Kafka topic has messages.
  • Inspect connector status:
curl http://localhost:8083/connectors/snowflake-connector/status
  • Review Kafka Connect logs for connector errors.

Throughput/latency tuning

  • Tune connector buffers (buffer.count.records, buffer.flush.time, buffer.size.bytes).
  • Tune Edge Delta Kafka node (batch_size, batch_bytes, batch_timeout).

Production Hardening

For production hardening, you should enable TLS and SASL for Kafka and store all secrets in a secure secret manager. Run Kafka Connect in a high-availability configuration with persistent volumes, and continuously monitor task status and connector lag. For reliability at scale, it’s best to use a managed Kafka service such as AWS MSK.

To achieve higher throughput, partition topics appropriately and increase tasks.max as needed. You should also decide on a consistent event schema, either standardized JSON fields or Snowflake’s VARIANT type, for downstream analytics. Finally, make sure to monitor Snowflake credit consumption and warehouse usage to manage costs effectively.