Effective Testing of Kafka Streams in Spring Boot

This blog discusses testing Kafka Streams within a Spring Boot environment, focusing on how to effectively use Spring Boot's testing tools, such as the SpringExtension, TestContextManager, and custom annotations like @TopologyTest and @KafkaContainerTest. It explains how to configure these tools to create efficient, comprehensive tests with minimal boilerplate, including testing with a TopologyTestDriver and running tests against an actual Kafka cluster using TestContainers.

Key takeaway #1

Custom annotations can eliminate boilerplate and streamline Kafka Streams testing.

Key takeaway #2

A @TopologyTest annotation provides fast, in-memory unit testing. This setup automatically configures a TopologyTestDriver.

Key takeaway #3

A @KafkaContainerTest annotation enables full integration testing. This approach uses TestContainers to automatically start a real Kafka cluster.

Effective Testing of Kafka Streams in Spring Boot

This blog discusses testing Kafka Streams within a Spring Boot environment, focusing on how to effectively use Spring Boot's testing tools, such as the SpringExtension, TestContextManager, and custom annotations like @TopologyTest and @KafkaContainerTest. It explains how to configure these tools to create efficient, comprehensive tests with minimal boilerplate, including testing with a TopologyTestDriver and running tests against an actual Kafka cluster using TestContainers.

Key takeaway #1

Custom annotations can eliminate boilerplate and streamline Kafka Streams testing.

Key takeaway #2

A @TopologyTest annotation provides fast, in-memory unit testing. This setup automatically configures a TopologyTestDriver.

Key takeaway #3

A @KafkaContainerTest annotation enables full integration testing. This approach uses TestContainers to automatically start a real Kafka cluster.

Effective Testing of Kafka Streams in Spring Boot

This blog discusses testing Kafka Streams within a Spring Boot environment, focusing on how to effectively use Spring Boot's testing tools, such as the SpringExtension, TestContextManager, and custom annotations like @TopologyTest and @KafkaContainerTest. It explains how to configure these tools to create efficient, comprehensive tests with minimal boilerplate, including testing with a TopologyTestDriver and running tests against an actual Kafka cluster using TestContainers.

Key takeaway #1

Custom annotations can eliminate boilerplate and streamline Kafka Streams testing.

Key takeaway #2

A @TopologyTest annotation provides fast, in-memory unit testing. This setup automatically configures a TopologyTestDriver.

Key takeaway #3

A @KafkaContainerTest annotation enables full integration testing. This approach uses TestContainers to automatically start a real Kafka cluster.

In a previous post I described my process of setting up a stream with little to no boilerplate code. There I briefly touched the surface of what Spring Boot offers for testing by customizing tests annotated with the SpringExtension.

When SpringBootTests are sprinkled all over the test package without any regard to what is in- or excluded in the Spring Context, it can cause build pipelines to last longer than the average stage in the Tour the France. When used correctly though, the tests can be performant and cover the bounds of your application better than the average unit test. Let’s cover the tools available to us.

The full source code for this blog post is on github.

What does spring-boot-starter-test have to offer?

This dependency includes multiple components that define what is included in a test that spins up a Spring Boot Context. Let’s go over some of the relevant ones:

SpringExtension

This extension is the bridge between JUnit and Spring. It has many responsibilities, but the one I want to focus on here is handling the TestContextManager. When a test suite is executed, it instantiates the TestContextManager and manages test lifecycle callbacks.

TestContextManager

As the name suggests, this component manages the TestContext. When creating a new instance of this class, it looks for the @BootstrapWith annotation in the class being tested. That annotation has a value for a TestContextBootStrapper, which in turn describes how the TestContext should be created. We can pass our own implementation of a bootstrapper, but usually SpringBootTestContextBootstrapper will suffice.

Seeing that the bootstrapper is responsible for creating the TestContext, we could modify it to suit our needs, but there are many annotations that assist with that, meaning we rarely have to implement a custom bootstrapper.

TestContext

This class is a state object that has access to the Spring ApplicationContext and the current class and method that is being tested. The TestContext will be included in the callbacks used in the TestExecutionListener.

TestExecutionListener

This listener has access to the TestContext and supports the test lifecycle callbacks. This means that we can use reflection to set fields in the class being tested, and we have access to the SpringApplicationContext. This is ideal for setting up components that are used in certain methods that should be instantiated with components from the Spring Context, e.g: a TestInputTopic.

Components annotated with @Autowired in the test instance are decorated in this fashion. This is handled by DependencyInjectionTestExecutionListener, which is a listener that is automatically registered when a test is bootstrapped with SpringBootTestContextBootstrapper, the default bootstrapper when using @SpringBootTest.

@TypeExcludeFilters

When used in concert with SpringBootTestContextBootstrapper, it’s possible to create a custom

TypeExcludeFilter where we can define some resources to in- or exclude for the TestContext we want to build. When using a custom annotation to group all these components, we can define which properties will be processed by the SpringBootTestContextBootstrapper.

@ContextConfiguration

One of the features of ContextConfiguration is to define which initializer should be executed when creating the Spring boot context. This is convenient when working with TestContainers, as we can boot them up and immediately update the correlating configuration for the Spring Boot context.

Enough Spring Boot — back to Kafka

@TopologyTest

@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@ExtendWith(SpringExtension.class)
@BootstrapWith(SpringBootTestContextBootstrapper.class)
@TestExecutionListeners(
   listeners = {
       TopologyTestDriverTestExecutionListener.class,
       TestTopicTestExecutionListener.class
   },
   mergeMode = MergeMode.MERGE_WITH_DEFAULTS
)
@TypeExcludeFilters(TopologyTypeExcludeFilter.class)
@Import({
   TopologyConfiguration.class,
   MockAvroSerdeFactory.class
})
@ActiveProfiles("test")
public @interface TopologyTest {
   ComponentScan.Filter[] includeFilters() default {};
   ComponentScan.Filter[] excludeFilters() default {};
}

Let’s kick it off with the TopologyTest annotation. The purpose of this annotation is to enable tests to include a TopologyTestDriver simply by annotating the class with@TopologyTest. Most of these annotations should now be familiar:

  • SpringExtension: Connects JUnit to Spring
  • SpringBootTestContextBootstrapper: Out-of-the-box bootstrapper
  • @TestExecutionListeners: Listeners with test lifecycle callbacks
  • @TypeExcludeFilters: Manages what’s in the Spring context
  • @Import: Imports relevant for every @TopologyTest

TopologyTestDriverTestExecutionListener

This component manages the TopologyTestDriver. It sets it up before every test method and closes it afterwards. If the test class contains a TopologyTestDriver variable, the value will be injected into the test class.

public void beforeTestMethod(TestContext testContext) throws Exception {
   var applicationContext = testContext.getApplicationContext();

   var driver = testContext.computeAttribute(
       TOPOLOGY_TEST_DRIVER,
       ttd -> createTestDriver(applicationContext)
   );

   for (var field : testContext.getTestClass().getDeclaredFields()) {
       if (field.getType() == TopologyTestDriver.class) {
           field.setAccessible(true);
           field.set(testContext.getTestInstance(), driver);
       }
   }
}

The driver is saved as an attribute in the TestContext, allowing it to be accessed in other TestExecutionListeners. This is demonstrated in TestTopicTestExecutionListener, where the driver is used to create TestInput and TestOutput topics dynamically, which are then injected into the test instance. This enables us to write tests with less boilerplate code and get right into the actual test.

TopologyTypeExcludeFilter

Here we override the exclude method to make sure certain classes are never included in the TopologyTest:

  • KafkaConfig: This class contains the @EnableKafkaStreams annotation and would trigger some autoconfigurations regarding stream setup that we do not want to include in the test context, as we are handling that ourselves.
  • KafkaPropertiesAvroSerdeFactory: This component creates Avro serdes that are expected to connect to an actual Schema registry. We’ll exclude this one and later include one that connects to the MockSchemaRegistry.

TopologyConfiguration

This class makes sure that a Topology is included in the Spring Context, we need to do some manual setup because @EnableKafkaStreams has been excluded. This Topology is used by the TopologyTestDriverTestExecutionListener to create the TopologyTestDriver.

MockAvroSerdeFactory

This class contains an implementation of AvroSerdeFactory which supplies Avro serdes that connect to a MockSchemaRegistryClient.

TopologyTestDriverTest

When everything is used together, we can test classes with minimal setup, with a TopologyTestDriver ready to use, and all the power already supplied by the SpringExtension, so e.g. mocking beans would be possible.

@TopologyTest(
   includeFilters = {
       @ComponentScan.Filter(
           type = REGEX,
           pattern = { "eu.cymo.kafka_streams_demo.adapter.kafka.*" }
       )
   }
)
class TopologyTestDriverTest {

   @TestTopic("orders")
   private TestInputTopic<String, OrderCreated> orders;

   @TestTopic("reseller_orders_count")
   private TestOutputTopic<String, Long> resellerOrdersCount;

   @Test
   void countsOrdersForResellers() {
       // ...
   }
}

@KafkaContainerTest

@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@ExtendWith(SpringExtension.class)
@TestExecutionListeners(
   listeners = {
       ProducerTestExecutionListener.class,
       ConsumerTestExecutionListener.class
   },
   mergeMode = MergeMode.MERGE_WITH_DEFAULTS
)
@BootstrapWith(KafkaContainerTestContextBootstraper.class)
@TypeExcludeFilters(KafkaContainerTestExcludeFilter.class)
@ContextConfiguration(
   initializers = { KafkaContainerInitializer.class }
)
@Import({
   MockAvroSerdeFactory.class,
   TopicInitializer.class
})
@ActiveProfiles("test")
public @interface KafkaContainerTest {
   ComponentScan.Filter[] includeFilters() default {};
   ComponentScan.Filter[] excludeFilters() default {};
   String[] properties() default {};
}

This annotation enables tests to be run with an actual Kafka cluster using TestContainers. TopologyTestDriver is a great tool to test streams, but sometimes we need an actual cluster to add value where the TopologyTestDriver is lacking.

For example, the driver executes a topology as if it only had one partition, so issues related to keys wouldn’t be revealed by the test.

KafkaContainerInitializer

The annotation that sets this class apart from @TopologyTest is @ContextConfiguration. We configure the class to include a custom initializer that allows us to set up a KafkaContainer and inject the connection properties into the Spring Context.

public class KafkaContainerInitializer
       implements ApplicationContextInitializer<ConfigurableApplicationContext> {
   static final KafkaContainer KAFKA_CONTAINER;
   static {
       KAFKA_CONTAINER = new KafkaContainer(
           DockerImageName.parse("confluentinc/cp-kafka:7.6.1")
       );
       KAFKA_CONTAINER.start();
   }

   @Override
   public void initialize(ConfigurableApplicationContext applicationContext) {
       applicationContext.getEnvironment()
           .getPropertySources()
           .addFirst(
               new MapPropertySource(
                   "test-containers",
                   Map.of(
                       "spring.kafka.bootstrap-servers",
                       KAFKA_CONTAINER.getBootstrapServers(),
                       "spring.kafka.properties.security.protocol",
                       "PLAINTEXT"
                   )
               )
           );
   }
}

TopicInitializer

This class is present to allow us to create new topics by bridging the gap between configuration and the Spring Kafka NewTopics component. This way we can configure which topics need to be created with how many partitions. Multiple profiles could be used to test different topic configurations if required.

@TestExecutionListeners

The KafkaContainerTest includes TestExecutionListeners that inject kafka producers and consumers to facialite test setup so no boilerplate code should be written for this.

KafkaTestContainerTest

When everything is used together, tests could look like this:

@KafkaContainerTest(
   includeFilters = {
       @ComponentScan.Filter(
           type = REGEX,
           pattern = { "eu.cymo.kafka_streams_demo.adapter.kafka.*" }
       )
   }
)
public class KafkaTestContainerTest {
   @TestConsumer(topic = "reseller_orders_count")
   private ConsumerAssert<String, Long> consumer;
   private Producer<String, OrderCreated> producer;

   @Test
   void countsOrdersForResellers() {
       // ...
   }
}

Be aware though when using this annotation, the same kafka container will be used over multiple tests. So carefully choose keys when posting messages to make sure your tests are deterministic by using random UUIDs instead of hard coding “key-1” and “key-2” to avoid using the same key accidentally over multiple tests. Also when using consumers, there might already be data present on a topic when doing assertions, so be sure to filter topics appropriately.

Also the kafka container will be reused over multiple spring contexts as it has been defined statically. Either make sure that all your @KafkaContainer tests are run in the same spring context, or have it so that every spring context would spin up it’s own kafka container.

Work with the framework

I always advocate that it’s best to work with your framework and a big part of that is knowing what tools are available, and for this blog that has been the SpringExtension. We have seen how to customize the Spring context and when to use the appropriate annotation to manipulate that context. When possible use JUnit extensions or TestExecutionListeners to avoid doing the same setup code over multiple tests. And most importantly we’ve covered how we can create a TopologyTestDriver and connect a test with an actual kafka cluster using the SpringExtension.

Happy testing.

We value your privacy! We use cookies to enhance your browsing experience and analyse our traffic.
By clicking "Accept All", you consent to our use of cookies.
Dark blue outline of a cookie icon.