Why Kafka Connect?

While you can always write your own Kafka connector to write data from Kafka to S3 or a database using for example confluent-kafka-python, this might be hard to maintain and error prone. Kafka Connect can help you to simplify this task.

In this post we will …

  • … set up a local Kafka cluster, S3 storage & Kafka Connect with Docker Compose
  • … create a Kafka topic and publish messages to it
  • … use Kafka Connect to create an S3 sink connector and write the messages to S3

Run the Example

You can run the full example which is described in this post by executing the following script: It will download the necessary files, start the containers, create a Kafka topic, publish messages, and create an S3 sink connector to write the data to S3:

#!/usr/bin/env zsh

curl https://gist.githubusercontent.com/layandreas/ac955778ad97b55112301e5efaf07e60/raw/5d065594619bd998c7ebb39ce1c91518ef25272c/kafka-connect-self-contained-example.sh > kafka-connect-self-contained-example.sh

zsh kafka-connect-self-contained-example.sh

You can clean up the containers afterwards by running:

#!/usr/bin/env zsh

cd kafka-connect-example
docker compose down -v

Setup

Prerequisite: Download S3 Sink Connector

First we need to download the Confluent Kafka Connect S3 Sink Connector:

#!/usr/bin/env zsh

mkdir kafka-connect-example
cd kafka-connect-example
mkdir kafka-connect-plugins

curl -O https://hub-downloads.confluent.io/api/plugins/confluentinc/kafka-connect-s3/versions/10.6.7/confluentinc-kafka-connect-s3-10.6.7.zip

unzip confluentinc-kafka-connect-s3-10.6.7.zip -d ./kafka-connect-plugins/confluentinc-kafka-connect-jdbc;

This will be mounted into the Kafka Connect container so that it can use the S3 Sink Connector.

Start Containers & Create S3 Bucket

Let’s use our docker-compose.yml to start the containers:

curl https://gist.githubusercontent.com/layandreas/ffb57ec5f102ed0d285c1e8e40838e1a/raw/be4dd51730c931085c58f9c00c761840d188326b/docker-compose.yml > docker-compose.yml

docker compose up

In a separate terminal create the S3 bucket to which we will write our data:

docker exec minio mc alias set local http://localhost:9000 minioadmin minioadmin && docker exec minio mc mb local/my-bucket --ignore-existing

Kafka: Create Topic & Publish Messages

After our containers have started we first need to create a Kafka topic:

#!/usr/bin/env zsh

# Create Kafka topic. This is were we will send our messages to
docker exec kafka kafka-topics --create \
  --topic kafka_message \
  --bootstrap-server localhost:9092 \
  --partitions 1 \
  --replication-factor 1

Now we’re ready to publish some messages:

#!/usr/bin/env zsh

# Publish messages to Kafka topic
for i in {1..10}; do
  echo "Sending message $i..."
  docker exec -i kafka kafka-console-producer --broker-list kafka:9092 --topic my-topic <<EOF
{"id":$i,"name":"Alice","email":"alice@example.com"}
EOF
done

echo "✅ Done sending 10 messages."

You can go to http://localhost:8080/ui/clusters/local/all-topics/my-topic/messages to open the provectus Kafka UI. You should see the messages we ave just published:

Minio Bucket

Kafka Connect: Create S3 Sink Connector

Next we will create an S3 sink connector:

#!/usr/bin/env zsh

curl -X POST -H "Content-Type: application/json" --data '{
  "name": "s3-sink",
  "config": {
    "connector.class": "io.confluent.connect.s3.S3SinkConnector",
    "tasks.max": "1",
    "topics": "my-topic",
    "s3.bucket.name": "my-bucket",
    "s3.region": "us-east-1",
    "store.url": "http://minio:9000",
    "flush.size": "3",
    "storage.class": "io.confluent.connect.s3.storage.S3Storage",
    "format.class": "io.confluent.connect.s3.format.json.JsonFormat",
    "schema.compatibility": "NONE",
    "aws.access.key.id": "minioadmin",
    "aws.secret.access.key": "minioadmin",
    "key.converter": "org.apache.kafka.connect.storage.StringConverter",
    "value.converter": "org.apache.kafka.connect.json.JsonConverter",
    "value.converter.schemas.enable": "false",
    "key.converter.schemas.enable": "false"
  }
}' http://localhost:8083/connectors

The connector will read messages from the my-topic topic and write them to the minio S3 bucket my-bucket in JSON format. The connector will flush data to S3 every 3 messages.

Check the Results

You can go to the minio web interface at http://localhost:9001/browser/my-bucket and log in with minioadmin as both username and password. You should see the bucket my-bucket with the files loaded into it by Kafka Connect:

Minio Bucket

Each file will contain 3 messages in JSON Lines format:

{"name":"Alice","id":70,"email":"alice@example.com"}
{"name":"Alice","id":71,"email":"alice@example.com"}
{"name":"Alice","id":72,"email":"alice@example.com"}

Full Docker Compose File

docker-compose.yml:

version: "3.8"

services:
  zookeeper:
    image: confluentinc/cp-zookeeper:7.4.0
    container_name: zookeeper
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000

  kafka:
    image: confluentinc/cp-kafka:7.4.0
    container_name: kafka
    ports:
      - "9092:9092"
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: "zookeeper:2181"
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
    depends_on:
      - zookeeper

  minio:
    image: minio/minio
    container_name: minio
    ports:
      - "9000:9000"
      - "9001:9001"
    environment:
      MINIO_ROOT_USER: minioadmin
      MINIO_ROOT_PASSWORD: minioadmin
    command: server /data --console-address ":9001"
    volumes:
      - minio-data:/data

  kafka-connect:
    image: confluentinc/cp-kafka-connect:7.4.0
    container_name: kafka-connect
    ports:
      - "8083:8083"
    environment:
      CONNECT_REST_ADVERTISED_HOST_NAME: kafka-connect
      CONNECT_BOOTSTRAP_SERVERS: kafka:9092
      CONNECT_REST_PORT: 8083
      CONNECT_GROUP_ID: "connect-cluster"
      CONNECT_CONFIG_STORAGE_TOPIC: "connect-configs"
      CONNECT_OFFSET_STORAGE_TOPIC: "connect-offsets"
      CONNECT_STATUS_STORAGE_TOPIC: "connect-status"
      CONNECT_KEY_CONVERTER: org.apache.kafka.connect.json.JsonConverter
      CONNECT_VALUE_CONVERTER: org.apache.kafka.connect.json.JsonConverter
      CONNECT_INTERNAL_KEY_CONVERTER: org.apache.kafka.connect.json.JsonConverter
      CONNECT_INTERNAL_VALUE_CONVERTER: org.apache.kafka.connect.json.JsonConverter
      CONNECT_PLUGIN_PATH: /usr/share/java,/etc/kafka-connect/jars
      CONNECT_LOG4J_LOGGERS: org.apache.kafka.connect.runtime.rest=WARN,org.reflections=ERROR
      CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1
      # S3 / MinIO access config (can be passed to the connector config)
      AWS_ACCESS_KEY_ID: minioadmin
      AWS_SECRET_ACCESS_KEY: minioadmin
      AWS_ENDPOINT_OVERRIDE: http://minio:9000
      # This will help the S3 connector connect to MinIO properly
    volumes:
      - ./kafka-connect-plugins:/etc/kafka-connect/jars
    depends_on:
      - kafka
      - minio

  kafka-ui:
    image: provectuslabs/kafka-ui:latest
    container_name: kafka-ui
    ports:
      - "8080:8080"
    environment:
      - KAFKA_CLUSTERS_0_NAME=local
      - KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS=kafka:9092
      - KAFKA_CLUSTERS_0_ZOOKEEPER=zookeeper:2181
    depends_on:
      - kafka
      - zookeeper

volumes:
  minio-data: