Stream Real-Time WebSocket Data to Apache Kafka with Our New Source Connector

An open-source Kafka Connect connector for WebSocket streams. Auto-reconnect, auth support, and configurable buffering. Get started in 5 minutes.

Stéphane DerosiauxStéphane Derosiaux · February 24, 2026 ·
Stream Real-Time WebSocket Data to Apache Kafka with Our New Source Connector

Kafka Connect has connectors for databases, files, cloud services, message queues. But if you wanted to pipe a WebSocket stream into Kafka, you had two options: write custom code or pay for a commercial connector. Most teams wrote the custom code. Then they maintained the reconnection logic, the auth handling, the 3 AM debugging when messages stopped flowing.

We got tired of watching teams solve the same problem. So we built an open-source WebSocket source connector for Kafka Connect and are releasing it today.

WebSocket connector architecture: A WebSocket API (like Coinbase) streams ticker, trades, and order book data over wss:// to the WebSocket Source Connector in Kafka Connect, which writes messages to a Kafka topic. The connector auto-reconnects on disconnect.

What it does

The connector is Apache-licensed. Point it at any WebSocket endpoint, give it a Kafka topic, and it streams messages in. wss://ws-feed.exchange.coinbase.com for Bitcoin trades, your IoT platform's WebSocket API for sensor data, Binance or Discord feeds. Standard Kafka Connect configuration, nothing custom.

What you get:

  • Automatic reconnection with configurable intervals
  • Bearer token and custom header authentication
  • Subscription messages for exchanges like Binance and Coinbase
  • Configurable in-memory buffering for bursty traffic
  • JMX metrics for connection health and throughput

Quick start: running in 5 minutes

You need Docker and Docker Compose. We'll spin up Kafka, Kafka Connect, and Conduktor Console (a web UI for viewing topics), then deploy the connector to stream live Coinbase trades.

Step 1: Clone and build

git clone https://github.com/conduktor/kafka-connect-websocket.git
cd kafka-connect-websocket
mvn clean package -DskipTests

No local Java? Use Docker instead:

docker run --rm -v "$(pwd)":/app -w /app maven:3.9-eclipse-temurin-17 mvn clean package -DskipTests

Step 2: Start the stack

cd examples
docker compose up -d

This starts:

  • Kafka (KRaft mode) on port 9092
  • Kafka Connect on port 8083
  • Conduktor Console on port 8080 for viewing topics

Wait about 30 seconds for everything to initialize. Check that Kafka Connect is up:

curl http://localhost:8083/

You should get back something like:

{"version":"3.9.0","commit":"a60e31147e6b01ee","kafka_cluster_id":"5L6g3nShT-eMCtK--X86sw"}

Step 3: Deploy the WebSocket connector

Deploy a connector that subscribes to Coinbase BTC-USD ticker updates:

curl -X POST http://localhost:8083/connectors \
  -H "Content-Type: application/json" \
  -d '{
    "name": "coinbase-btcusd",
    "config": {
      "connector.class": "io.conduktor.connect.websocket.WebSocketSourceConnector",
      "tasks.max": "1",
      "websocket.url": "wss://ws-feed.exchange.coinbase.com",
      "kafka.topic": "coinbase-btcusd-trades",
      "websocket.subscription.message": "{\"type\":\"subscribe\",\"channels\":[{\"name\":\"ticker\",\"product_ids\":[\"BTC-USD\"]}]}"
    }
  }'

The connector opens a WebSocket to Coinbase, sends the subscription message, and starts forwarding ticker updates.

Step 4: See the data

Open Conduktor Console at http://localhost:8080 and navigate to the coinbase-btcusd-trades topic. You should see BTC-USD ticker updates arriving in real time.

Or consume via the command line:

docker exec kafka /opt/kafka/bin/kafka-console-consumer.sh \
  --bootstrap-server localhost:9092 \
  --topic coinbase-btcusd-trades \
  --from-beginning \
  --max-messages 3

The first message is a subscription confirmation from Coinbase. After that, each message is a JSON ticker event:

{"type":"ticker","sequence":124104291706,"product_id":"BTC-USD","price":"70672.78","open_24h":"72422.56","volume_24h":"6951.60","low_24h":"70236.02","high_24h":"72869.07","volume_30d":"302213.79","best_bid":"70672.77","best_bid_size":"0.00002426","best_ask":"70672.78","best_ask_size":"0.79504402","side":"buy","time":"2026-03-14T14:49:14.639873Z","trade_id":980607675,"last_size":"0.00034376"}

Live market data, in Kafka, with zero custom code.

To connect your own WebSocket API, copy the connector config, change the URL and topic. For authenticated endpoints, see the configuration section below.

Configuration

The minimum configuration is two fields:

ParameterDescriptionExample
websocket.urlWebSocket endpoint (ws:// or wss://)wss://stream.example.com/feed
kafka.topicDestination Kafka topicmy-websocket-data
That's enough for a public WebSocket. Most production scenarios need more.

Authentication

For protected APIs, use bearer token authentication:

{
  "name": "authenticated-feed",
  "config": {
    "connector.class": "io.conduktor.connect.websocket.WebSocketSourceConnector",
    "tasks.max": "1",
    "websocket.url": "wss://api.example.com/stream",
    "kafka.topic": "example-stream",
    "websocket.auth.token": "your-bearer-token-here"
  }
}

The connector sends Authorization: Bearer with the connection request.

For custom headers, use websocket.headers:

{
  "websocket.headers": "X-API-Key:abc123,X-Client-ID:myapp"
}

Format is key1:value1,key2:value2 for multiple headers. You can combine both:

{
  "websocket.auth.token": "bearer-token",
  "websocket.headers": "X-Client-ID:myapp,X-Version:2"
}

Subscription messages

Many WebSocket APIs, especially crypto exchanges, require a subscription message after connecting:

{
  "websocket.subscription.message": "{\"type\":\"subscribe\",\"channels\":[\"ticker\"]}"
}

This JSON is sent immediately after the WebSocket connection opens. Binance, Coinbase, Kraken all use this pattern to specify which streams you want.

Message format

The connector preserves WebSocket messages as-is in Kafka record values:

  • Text messages become strings (UTF-8)
  • Binary messages become byte arrays (preserved exactly)
  • Record key is always null (WebSocket has no key concept)
  • Headers are empty (WebSocket messages don't carry headers)

You can apply Kafka Connect SMTs to parse JSON, extract fields, or route messages based on content.

Reconnection

ParameterDefaultDescription
websocket.reconnect.enabledtrueAutomatically reconnect on disconnect
websocket.reconnect.interval.ms5000Wait 5 seconds between reconnection attempts
When the WebSocket disconnects, whether from a server restart, network issue, or idle timeout, the connector waits the configured interval and reconnects. It resends the subscription message if one was configured.

Performance tuning

ParameterDefaultDescription
websocket.message.queue.size10000In-memory buffer for messages
websocket.connection.timeout.ms30000Connection timeout (30 seconds)
The queue buffers messages between the WebSocket and Kafka Connect's polling loop. Increase it for bursty traffic.

When the queue fills, the connector drops incoming messages until space opens. Monitor the MessagesDropped JMX metric. If you see drops, increase websocket.message.queue.size or tune your Kafka producer settings for higher throughput.

Task configuration

Set tasks.max to 1. WebSocket connections are single-threaded by protocol design: one connection, one persistent socket. You cannot parallelize a single WebSocket stream across multiple tasks.

For multiple WebSocket streams, deploy multiple connector instances, one per stream.

Delivery guarantees

The connector provides at-most-once delivery. WebSocket is a fire-and-forget protocol with no replay capability. Messages are lost in two cases: when the internal queue fills (monitor MessagesDropped), and during reconnection windows when the server keeps sending while the connector is disconnected. For use cases requiring completeness, pair with periodic REST API snapshots to detect and fill gaps.

Full configuration reference: configuration documentation.

Use cases

The most obvious fit is crypto and financial markets. Stream price feeds, order books, and trade executions from Coinbase, Binance, Kraken, or Gemini. Coinbase's WebSocket feed covers every trading pair with ticker, order book, and match data. One connector instance per stream, each writing to its own Kafka topic.

IoT platforms with WebSocket feeds work the same way: temperature readings, GPS coordinates, machine status, all into Kafka for stream processing and alerting.

Social and communication platforms push updates over WebSockets. Discord's Gateway API, Bluesky's firehose, Reddit's real-time feeds. Pipe them into Kafka for sentiment analysis, content moderation, or engagement analytics. Game backends, observability platforms, anything that speaks WebSocket works the same way.

Getting started

Three paths depending on what you need.

Docker Compose quickstart (try it out). Run the Docker Compose example from earlier in this article. Full Kafka stack and the connector running in minutes. See the examples directory.

Install from release (production). Download the pre-built JAR from GitHub releases:

wget https://github.com/conduktor/kafka-connect-websocket/releases/download/v1.0.0/kafka-connect-websocket-1.0.0-jar-with-dependencies.jar

Copy to your Kafka Connect plugins directory:

mkdir -p $KAFKA_HOME/plugins/kafka-connect-websocket
cp kafka-connect-websocket-1.0.0-jar-with-dependencies.jar $KAFKA_HOME/plugins/kafka-connect-websocket/

Restart Kafka Connect and the plugin becomes available. Deploy connectors via the REST API as shown in the quickstart.

If the connector doesn't appear in the plugin list (curl http://localhost:8083/connector-plugins), verify the JAR is in the plugins directory and that Kafka Connect has read permissions.

Build from source (contributors). Clone and build:

git clone https://github.com/conduktor/kafka-connect-websocket.git
cd kafka-connect-websocket
mvn clean package

The output JAR with dependencies will be in target/. Requires Java 11+ and Maven 3.6+.

Links

The project is Apache License 2.0. Bug reports, feature requests, and PRs are welcome. See CONTRIBUTING.md for guidelines.


github.com/conduktor/kafka-connect-websocket