Testing your Apache Kafka Data with Confidence (2023)

Testing your Apache Kafka Data with Confidence (2023)

In this post we'll explore some methods you can use to test components within your Apache Kafka architecture.

James White

Oct 19, 2023

Testing your Apache Kafka Data with Confidence (2023)
Testing your Apache Kafka Data with Confidence (2023)
Testing your Apache Kafka Data with Confidence (2023)

In this post, we’ll explore some methods you can use to test components within your Apache Kafka architecture.

Note this is an update to the original article I wrote in May last year.

The Case for Quality

The significance of ‘data’ and ‘events’ in modern systems cannot be refuted. As the production of data has grown exponentially, organisations have shifted towards architectures that can more effectively process large volumes of data with millisecond latency.

“The amount of data generated by IoT devices is expected to reach 73.1 ZB (zettabytes) by 2025.” - source

We refer to these methods of system design as EDAs (event-driven architectures), or microservices architecture. At their core, they depend on a messaging backbone, which is often chosen to be Apache Kafka. This messaging backbone is responsible for delivering information between interlinked services. It also helps to decouple components within the system, empowering teams to own, scale and improve their ‘piece’ of the overall system with less dependencies.

With Kafka forming a fundamental part of systems that are often responsible for real-time decision making, it’s more important than ever to establish a robust testing framework. Such a framework will ensure you maintain product quality, reduce the risk of broken contracts, and can even eliminate the possibility of a production failure. But with distributed architectures and huge volumes of data, this can be a daunting task for even the most seasoned Apache Kafka expert. In this post, we’ll explore some methods you can use to test components within your Apache Kafka architecture.

Manual Testing

Suppose we want to collect geospatial data from those multibillion IoT devices previously mentioned. The first step would be to create a Kafka topic and validate that we can produce and consume messages from it accordingly.

Kafka CLI

Let’s assume we’ve already started Kafka. If you’re unsure how to do this, I recommend visiting Conduktor Kafkademy for step-by-step instructions. Using the Kafka Topics CLI, we can create a geo_events topic with 3 partitions and a replication factor of 1 via a Kafka broker running at localhost:9092.

$ kafka-topics.sh --bootstrap-server localhost:9092 --topic geo_events --create --partitions 3 --replication-factor 1

Great, now that our Kafka topic is created we can produce a sample record as a manual test. To do this, we utilize the Kafka producer CLI:

$ kafka-console-producer.sh --bootstrap-server localhost:9092 --topic geo_events
> {"timestamp":"1647515491", "deviceId":"1234-e4b6-5678", "latLong":"38.8951,-77.0364"}

Finally, let’s use the Kafka consumer CLI to read data from our topic, confirming that we can collect our geospatial messages. We will use the --from-beginning option to consume all historical messages, which should include the sample produced in our prior step.

$ kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic geo_events --from-beginning
> {"timestamp":"1647515491","deviceId":"1234-e4b6-5678", "latlong":"38.8951,-77.0364"}

Job done! To summarise, we’ve created a topic and manually tested that we can produce and consume data accordingly. That being said, the keyword here is ‘manual’. This might be sufficient for ad-hoc debugging, but not for something continuous and automated. Additionally, writing those commands was cumbersome for such basic functionality. Surely this can be achieved more easily?

Kafka Consoles

Not all users that need to interact with and test Apache Kafka data are comfortable utilizing CLI tools. Perhaps I’m a data analyst that needs only to sample some messages, or a QA engineer with no prior Kafka experience. In these cases, a console (GUI) is your friend. Equally, they can increase the productivity of the most seasoned developers when it comes to everyday Kafka tasks.

The Conduktor Console has a rich feature set to support manual testing. To revisit our previous example, below demonstrates how to use the producer interface to push messages into the geo_events topic.

Not only can we forget about the Kafka CLI syntax, but we also gain new functionality such as the ‘Flow’ mode. This enables you to produce an automated stream of events with interval rules and lifecycle options. Already, you get an idea of how much more powerful our manual testing efforts can be.

The best part, we needn’t create any additional scripts. By simply navigating the options in the console, we can:

  • append headers to our messages

  • produce randomized data (that can also be inferred from your schemas)

  • configure settings such as acks (to ensure data is safely replicated when producing).

This supports testing under more technical conditions. Pretty neat, right?

As for the consumption side of manual testing, there are complementary features whereby you might otherwise have to refer to a Kafka CLI cheat sheet.

For example:

  • configuring different consumption strategies

  • limiting the records you consume

  • filtering your messages to intercept those that match certain criteria

Most importantly, your topic data will be displayed in an easily readable table that also surfaces record metadata.

While Kafka consoles help advance our manual testing efforts, there are limits to how far we can go. Most organizations are now dependent on continuous integration and continuous delivery, therefore we need a mechanism for automated testing that can form part of the SDLC (software development lifecycle).

Integration Testing with Testcontainers

Advancing from our manual testing efforts, Testcontainers is a Java library that can be used to instantiate disposable Kafka containers via Docker. They provide ports for other languages such as Python and Go, and containers for other common databases such as Postgres and MySQL.

One of the challenges we face with integration testing is ensuring that all dependencies are available for the test to run. Essentially, we want to replicate a ‘realistic’ environment for the test execution, and we want it to be available on demand. This is particularly relevant when integrating tests in our CI pipeline.

For distributed systems dependent on Kafka, Testcontainers helps us spin up a cluster for the lifetime of an integration test. This means that Kafka is disposed of when the execution is completed. To instantiate our Kafka container we can parse a Confluent docker image.

    DockerImageName KAFKA_TEST_IMAGE = DockerImageName.parse("confluentinc/cp-kafka:6.2.1");
    KafkaContainer kafkaContainer = new KafkaContainer(KAFKA_TEST_IMAGE);

Once our Kafka broker is up and running, we can execute our integration tests. Safe in the knowledge that we have a self-contained environment that’s not being shared with other teams.

In a single script, it’s now possible to automate creating our geo_events topic, producing some sample records, consuming them, and asserting the data. You can find an example class in the Testcontainers repository. Below is an adaptation for our use case.

    @Test
    public void testUsage() throws Exception {
      try (KafkaContainer kafka = new KafkaContainer(KAFKA_TEST_IMAGE)) {
        kafka.start();
        testKafkaFunctionality(kafka.getBootstrapServers());
      }
    }
    protected void testKafkaFunctionality
    (String bootstrapServers) throws Exception {
      testKafkaFunctionality(bootstrapServers, 1, 1);
    }
    protected void testKafkaFunctionality
    (String bootstrapServers, int partitions, int rf) throws Exception {
      try (AdminClient adminClient = AdminClient.create(ImmutableMap.of(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers)); KafkaProducer < String, String > producer = new KafkaProducer < > (ImmutableMap.of(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers, ProducerConfig.CLIENT_ID_CONFIG, UUID.randomUUID().toString()), new StringSerializer(), new StringSerializer()); KafkaConsumer < String, String > consumer = new KafkaConsumer < > (ImmutableMap.of(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers, ConsumerConfig.GROUP_ID_CONFIG, "tc-" + UUID.randomUUID(), ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"), new StringDeserializer(), new StringDeserializer());) {
        String topicName = "geo_events";
        String messageValue = "{\"timestamp\":\"1647515491\",\"deviceId\":\"1234-e4b6-5678\",\"latLong\":\"38.8951,-77.0364\"}";
        Collection < NewTopic > topics = singletonList(new NewTopic(topicName, partitions, (short) rf));
        adminClient.createTopics(topics).all().get(30, TimeUnit.SECONDS);
        consumer.subscribe(singletonList(topicName));
        producer.send(new ProducerRecord < > (topicName, null, messageValue)).get();
        Unreliables.retryUntilTrue(10, TimeUnit.SECONDS, () -> {
          ConsumerRecords < String,
          String > records = consumer.poll(Duration.ofMillis(100));
          if (records.isEmpty()) {
            return false;
          }
          assertThat(records).hasSize(1).extracting(ConsumerRecord::topic, ConsumerRecord::key, ConsumerRecord::value).containsExactly(tuple(topicName, null, messageValue));
          return true;
        });
        consumer.unsubscribe();
      }
    }

There are clear benefits of using Testcontainers for Kafka. Primarily, it provides an on-demand testing environment that we do not need to maintain or govern. Additionally, we can embed testing in the software development lifecycle. This ensures that test executions are a pre-requisite to code being deployed.

For all the positives, potential organizational challenges come with utilizing Testcontainers. It’s a very developer-oriented approach, which assumes a deep understanding of Kafka and its architecture.

The best testing cultures encourage co-operation across product managers, data analysts, QA engineers, and developers. With no UI to support understanding, reporting, or orchestration of tests, this could reflect a technical barrier to adoption en masse.

Kafka Streams Test Utilities

Kafka Streams is a frequent choice for real-time stream processing. It’s a service where business logic will likely be injected into Kafka-based applications. For example, a stateless operation like filtering a stream or a stateful operation like a windowed aggregation. To support the testing of Streams applications, Kafka provides a test-utils artifact.

The test-utils package provides a TopologyTestDriver that can be used pipe data through a Topology

… You can use the test driver to verify that your specified processor topology computes the correct result with the manually piped in data records - Source

To elaborate on the official Kafka documentation, the package enables us to produce messages to a mocked input topic, apply our computational logic (Topology), and check the result in a mocked output topic. The advantage here? We don’t need to depend on an embedded or external Kafka cluster.

Let’s revisit our geospatial data, and see how we can test a simple Streams application. Suppose we want to filter the geo_events topic for messages containing a specific deviceId.

    public static Topology
    filterStream() {
      StreamsBuilder builder = new StreamsBuilder();
      KStream < String, String > stream = builder.stream(INPUT_TOPIC);
      stream.filter((k, v) -> v.contains("1234-e4b6-5678")).to(OUTPUT_TOPIC);
      return builder.build();
    }

Using the above topology to filter our stream, we can create a test case that will pipe data in from two different deviceId’s. We will then assert the outputs, validating that only the expected output, messageValid, reaches the outputTopic.

    @Test
    public void
    shouldFilterRecords() {
      topology = App.filterStream();
      td = new TopologyTestDriver(topology, config);
      inputTopic = td.createInputTopic(App.INPUT_TOPIC, Serdes.String().serializer(), Serdes.String().serializer());
      outputTopic = td.createOutputTopic(App.OUTPUT_TOPIC, Serdes.String().deserializer(), Serdes.String().deserializer());
      String messageValid = "{\"timestamp\":\"1647515491\",\"deviceId\":\"1234-e4b6-5678\",\"latLong\":\"38.8951,-77.0364\"}";
      String messageInvalid = "{\"timestamp\":\"1647799800\",\"deviceId\":\"9876-e1p3-6763\",\"latLong\":\"51.5072,0.1276\"}";
      assertThat(outputTopic.isEmpty(), is(true));
      inputTopic.pipeInput(null, messageValid);
      assertThat(outputTopic.readValue(), equalTo(messageValid));
      assertThat(outputTopic.isEmpty(), is(true));
      inputTopic.pipeInput(null, messageInvalid);
      assertThat(outputTopic.isEmpty(), is(true));
    }

Great, so we have demonstrated that our filter logic behaves as expected. In the first case, our message contains the desirable deviceId, and we assert that it reaches the outputTopic. In the second case, when piping messageInvalid into our topology, we can verify that no message is produced in outputTopic.

The lightweight and readable nature of test-utils make the package a great asset for unit testing Kafka Streams applications. The provided input is processed synchronously and without external system dependencies. This makes it very straightforward to confirm your Topology is producing the desired results.

However, there is one major drawback. Because the process does not spin up a real Kafka, working with external applications to test dependencies is impossible. This limits how far our testing efforts can go with this method alone.

Chaos Engineering

While we have explored some structured approaches to testing streaming applications, the realities of operating distributed systems are often less predictable.

One alternative methodology, Chaos Engineering, has gained momentum in recent years. Initially championed by Netflix, the practice is focused on testing system stability through enforced failures such as terminated instances.

But should this principle also be applied to Kafka? As with any other complex system, we desire reliability and fault-tolerance. At the same time, many bad things can happen with Kafka:

  • Brokers can become unavailable and slow; how will applications react?

  • Erroneous or duplicate messages will inevitably arrive; what will happen?

This could impact loss of business, negative publicity, and breach of SLAs. It’s safe to say that Kafka passes the sniff test for chaos engineering.

Operating Kafka in production brings enough challenges, meaning few have adopted a strategy for running chaos experiments. However, new tooling is helping to make the process accessible, enabling teams to simulate production-grade issues to validate streaming application resilience.

Conduktor brings a chaos engineering solution to Kafka via their Kafka proxy: Conduktor Gateway. This proxy sits between your client applications and existing Kafka clusters, and can be used to inject chaos simulations to test how your application responds to specific events. Once Gateway is configured, chaos experiments can be deployed through the Console or via REST APIs.

The configuration in the above example is defined as:

  • rateInPercent: The percentage of requests that will result in a broker not available response

  • errorMap: Map of ApiKeys and Error you want to response

'config':
  {
    'rateInPercent': 30,
    'errorMap': { 'FETCH': 'UNKNOWN_SERVER_ERROR', 'PRODUCE': 'CORRUPT_MESSAGE' }
  }

As well as simulating broken brokers, additional chaos simulations exist for:

  • Message corruption (erroneous messages)

  • Duplicate messages

  • Invalid schemaId

  • Leader election errors

  • Slow brokers

  • Slow producers & consumers

Unlike other strategies this article explores, this brings a unique approach to validating your Kafka application resilience. The goal should be to regularly inject chaos into your systems, continuously learning and promoting a culture of resilience.

Conclusion

As we have demonstrated, testing different components within the Apache Kafka ecosystem is more than possible. At times, it does feel like the process is manual, fragmented, or even incomplete. It comes with a degree of learning and ramp-up to familiarise yourself with the different approaches.

Business people, QA engineers, and developers must work together to build a robust testing culture. These minds think differently; while an engineer might be concerned about working code, a product manager is more likely to consider the end users’ experience.

Conduktor's free console provides the most complete interface for troubleshooting, testing, and monitoring your streaming applications. Combine this with Conduktor Gateway and bring chaos engineering tactics to supercharge your resilience strategy.