Testing Kafka Applications: Testcontainers, Embedded Kafka, and Mocks
Choose the right Kafka testing strategy. Testcontainers for integration tests, MockProducer for unit tests, TopologyTestDriver for Streams.

I've seen test suites that take 30 minutes because every test spins up a broker. Kafka testing is tricky—messages don't fail immediately, offsets commit in the background, rebalances happen unexpectedly.
Most teams over-rely on integration tests. Start with mocks, escalate to containers only when you need actual broker behavior.
We cut our CI time from 18 minutes to 3 by replacing Testcontainers with MockProducer for business logic tests. Integration tests stayed for the wiring.
Senior Engineer at a logistics company
The Testing Pyramid
| Layer | Tool | Speed | Use When |
|---|---|---|---|
| Unit | MockProducer/MockConsumer | ~1ms | Serialization, routing, error handling |
| Unit (Streams) | TopologyTestDriver | ~10ms | Stream topologies |
| Integration | Testcontainers | ~5-10s | End-to-end, Schema Registry |
| Integration | @EmbeddedKafka | ~3-5s | Spring Boot tests |
Unit Testing with MockProducer
Kafka's client library includes MockProducer and MockConsumer. They're fast, deterministic, and need no external dependencies.
@Test
void shouldSendOrderToCorrectPartition() {
MockProducer<String, String> mockProducer = new MockProducer<>(
true, new StringSerializer(), new StringSerializer());
OrderService service = new OrderService(mockProducer);
service.placeOrder("order-123", "{\"item\": \"widget\"}");
assertEquals(1, mockProducer.history().size());
assertEquals("orders", mockProducer.history().get(0).topic());
} The real value is testing failures:
@Test
void shouldRetryOnTransientFailure() {
MockProducer<String, String> mockProducer = new MockProducer<>(
false, new StringSerializer(), new StringSerializer()); // autoComplete=false
service.placeOrderAsync("order-123", "{}");
mockProducer.errorNext(new TimeoutException("Broker not available"));
assertEquals(2, mockProducer.history().size()); // Original + retry
} Tradeoff: Mocks don't test serialization with the broker, rebalancing, or network issues. Use them for business logic, not "does my config work?"
Kafka Streams with TopologyTestDriver
For Streams applications, TopologyTestDriver executes your topology synchronously:
@Test
void shouldFilterInvalidOrders() {
try (TopologyTestDriver driver = new TopologyTestDriver(topology, props)) {
TestInputTopic<String, Order> input = driver.createInputTopic("orders", ...);
TestOutputTopic<String, Order> output = driver.createOutputTopic("valid-orders", ...);
input.pipeInput("order-1", new Order(100));
input.pipeInput("order-2", new Order(-50)); // Should be filtered
assertEquals(1, output.readValuesToList().size());
}
} For punctuations, advance time manually: driver.advanceWallClockTime(Duration.ofMinutes(1)).
Integration Testing with Testcontainers
When you need actual broker behavior—rebalancing, retention, compaction, Schema Registry—use Testcontainers.
@Testcontainers
class OrderIntegrationTest {
@Container
static KafkaContainer kafka = new KafkaContainer(DockerImageName.parse("apache/kafka:3.7.0"));
@Test
void shouldProduceAndConsume() {
Properties props = new Properties();
props.put("bootstrap.servers", kafka.getBootstrapServers());
// ... test with real broker
}
} For Schema Registry, add a network:
static Network network = Network.newNetwork();
@Container
static ConfluentKafkaContainer kafka = new ConfluentKafkaContainer("confluentinc/cp-kafka:7.6.0")
.withNetwork(network).withNetworkAliases("kafka");
@Container
static GenericContainer<?> schemaRegistry = new GenericContainer<>("confluentinc/cp-schema-registry:7.6.0")
.withNetwork(network)
.withEnv("SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS", "kafka:9092")
.dependsOn(kafka); Handle async properly: Use Awaitility, not Thread.sleep():
await().atMost(Duration.ofSeconds(10))
.untilAsserted(() -> assertFalse(records.isEmpty())); Spring Boot with @EmbeddedKafka
For Spring Kafka, @EmbeddedKafka provides an in-process broker:
@SpringBootTest
@EmbeddedKafka(topics = {"orders"}, bootstrapServersProperty = "spring.kafka.bootstrap-servers")
class OrderServiceTest {
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
@Test
void shouldProcessOrder() {
kafkaTemplate.send("orders", "order-123", "{}").get();
await().until(() -> processor.getProcessedCount() > 0);
}
} Common Errors
"Connection refused on random port" — Use bootstrapServersProperty to override the correct property.
"Container startup timeout" — Increase timeout: .withStartupTimeout(Duration.ofMinutes(3)) or pre-pull images in CI.
Hardcoding localhost:9092 — Always use kafka.getBootstrapServers() for dynamic ports.
EmbeddedKafka vs Testcontainers
| Feature | @EmbeddedKafka | Testcontainers |
|---|---|---|
| Startup | ~3-5s | ~5-10s |
| Schema Registry | Requires mock | Real container |
| Multi-broker | Complex | Easy |
| Spring integration | Native | Manual |
@EmbeddedKafka for simple Spring tests. Use Testcontainers for Schema Registry or multi-broker scenarios. The testing pyramid applies to Kafka. Fast unit tests catch most bugs. Integration tests verify wiring. Don't spin up a broker for every JSON serialization test. For production verification, Console lets you inspect topic contents and consumer behavior without writing additional test code.
Book a demo to see how Conduktor Console lets you compare topic settings and consumer lag across clusters without writing test code.