Lydtech
Kafka Schema Registry & Avro: Integration Test

Introduction

This article demonstrates using integration tests with the Kafka Schema Registry & Avro demo application. Integration tests are one of a number of types of test that are essential in ensuring the correctness and quality of an application. The demo application integrates with a schema registry, so this integration must be tested. Spring Boot provides a framework for running integration tests that includes the ability to wiremock third party external resources like the schema registry.

The demo application is covered in the Kafka Schema Registry & Avro: Spring Boot Demo article.

The source code for the Spring Boot demo is available here:

https://github.com/lydtechconsulting/kafka-schema-registry-avro/tree/v1.0.0

Integration Tests

Overview

While unit tests test individual units of work, often at the class or method level, integration tests ensure that execution flows are tested. Like unit tests these integration tests live close to the code providing fast feedback to the developer as they write their code. Such flows often involve the application calling out to external resources such as a database, message broker, or a third party service. The integration tests must cater for these calls, and so each resource will typically be substituted with an in-memory version or a mocked version by the test. This ensures there is no time consuming spinning up of actual dependent resources. Each response from each external resource is therefore predictable as it is defined in the test, be it a successful response or an error condition that must be handled correctly.

Spring Boot Tests

The spring-boot-test-starter module includes the libraries required to perform integration testing of a Spring Boot application. It includes JUnit Jupiter, Hamcrest and Mockito, and the framework required to spin up the Spring application test context. This dependency is included in the service pom.xml:

<dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-test</artifactId>
  <scope>test</scope>
</dependency>

The test class is annotated with the @SpringBootTest annotation. When the test is executed the annotation ensures the the test container is bootstrapped with a Spring application context created that is used by the tests. The annotation takes a number of attributes, including a WebEnvironment that ensures the container runs in a mock servlet environment, and the location of the application and test configuration classes to use.

Demo Application Integration Test

Overview

The demo application integration test PaymentIntegrationTest tests both the REST endpoint provided by the application that triggers sending a payment, and also sending events to the Kafka topic that the application consumes from. As covered in the Kafka Schema Registry & Avro: Spring Boot Demo overview of the application, the send payment is a no-op for the purposes of the demo, as it focuses on the Kafka and Schema Registry interactions.

The following diagram shows the testSendPaymentViaKafka() test flow. The test writes a number of send payment command events to the send-payment topic. These are consumed by the application, resulting in payment-sent events being written to the payment-sent topic. The test listener consumes these and the test asserts the expected number of events are received.

Figure 1: Integration test of the consume and produce flow with Schema Registry

Figure 1: Integration test of the consume and produce flow with Schema Registry

The test implementation is as follows, using the Awaitility library to allow time for the expected payment-sent events to be received by the test consumer:

@Test
public void testSendPaymentViaKafka() throws Exception {
   int totalMessages = 10;
   for (int i=0; i<totalMessages; i++) {
       String paymentId = UUID.randomUUID().toString();
       SendPayment command = buildSendPayment(paymentId);
       sendCommandViaKafka(command);
   }

   Awaitility.await().atMost(10, TimeUnit.SECONDS).pollDelay(100, TimeUnit.MILLISECONDS)
       .until(testReceiver.counter::get, equalTo(totalMessages));
}

private void sendCommandViaKafka(SendPayment command) throws Exception {
   final ProducerRecord record = new ProducerRecord(SEND_PAYMENT_TOPIC, null, command.getPaymentId(), command);
   testKafkaTemplate.send(record).get();
}

The test class itself is configured with a number of annotations alongside the @SpringBootTest annotation.

@Slf4j
@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT, classes = { KafkaDemoConfiguration.class, TestKafkaDemoConfiguration.class } )
@DirtiesContext(classMode = DirtiesContext.ClassMode.AFTER_CLASS)
@AutoConfigureWireMock(port=0)
@ActiveProfiles("test")
@EmbeddedKafka(controlledShutdown = true, topics = { "payment-sent" })
public class PaymentIntegrationTest {

Embedded Kafka Broker

The spring-kafka-test test module provides an in-memory Kafka broker. This is utilised by the integration test to prove that events can be consumed from, and written to, a Kafka broker. It can be fully configured as required, such as with the number of broker nodes to use, and the number of partitions and replication factor to use for auto-created topics.

All that is required to bring up the embedded Kafka broker in the test context is to annotate the integration test class with @EmbeddedKafka. In order to locate the in-memory broker the bootstrap-servers property is overridden in the application-test.yml:

kafka:
   bootstrap-servers: ${spring.embedded.kafka.brokers}

The spring.embedded.kafka.brokers value is configured by the spring-kafka-test module. The test and application producers and consumers interact with this in-memory broker in the same way they would with a real broker.

Schema Registry Wiremock

The demo application uses Apache Avro for serialization, with the schemas being registered and served by the Confluent Schema Registry. When a producer writes a message to a topic the message is serialized using Avro, and likewise deserialized using Avro by the consumer. The Kafka Schema Registry & Avro: Introduction article covered these Avro serialization and deserialization flows in detail. The application interacts with the Schema Registry using its REST API. These interactions happen in the Confluent Schema Registry library, but understanding these calls means that they can be stubbed using a wiremock in the integration test.

The spring-cloud-contract-wiremock enables the integration test to seamlessly use a wiremock. The annotation @AutoConfigurationWireMock(port=0) results in a random free port being bound to the wiremock.server.port, such that there are no port conflicts at test runtime. This port is then wired into the application-test.yml for the schema.registry.url when the Spring test application context is initialised, ensuring that the application has the correct URL to use for its REST calls to the Schema Registry:

kafka:
    schema.registry.url: http://localhost:${wiremock.server.port}/

The wiremock-jre8-standalone wiremock library is used to stub the REST calls. As shown in the test flow diagram above, the consumer calls 'get schema' to get the schema associated with the incoming send-payment event. The event has the schema Id serialized as part of it, which is extracted and used for the REST API GET. The response is the JSON schema associated with this Id.

In order to stub the wiremock with the required schema to return for this call, the schema must be first obtained. Each Avro generated PoJO includes a static method that returns its schema:

@org.apache.avro.specific.AvroGenerated
public class SendPayment extends ..

public static final org.apache.avro.Schema SCHEMA$ = new org.apache.avro.Schema.Parser().parse("{\"type\":\"record\",\"name\":\"SendPayment\",\"namespace\":\"demo.kafka.event\",\"fields\":[{\"name\":\"payment_id\",\"type\":\"string\"},{\"name\":\"amount\",\"type\":\"double\"},{\"name\":\"currency\",\"type\":\"string\"},{\"name\":\"to_account\",\"type\":\"string\"},{\"name\":\"from_account\",\"type\":\"string\"}],\"version\":\"1\"}");

public static org.apache.avro.Schema getClassSchema() { return SCHEMA$; }

In the test setup this then is used for stubbing the calls for the two events of interest, send-payment and payment-sent:

registerSchema(1, SEND_PAYMENT_TOPIC, SendPayment.getClassSchema().toString());
registerSchema(2, PAYMENT_SENT_TOPIC, PaymentSent.getClassSchema().toString());

In the registerSchema(..) method in the integration test the wiremock is stubbed for each of these calls:

// Stub for the GET registered schema call for the given schema Id, returning the schema.
// This is used by the Consumer when deserializing.
// /schemas/ids/1?fetchMaxId=false
final SchemaString schemaString = new SchemaString(schema);
stubFor(get(urlPathMatching("/schemas/ids/"+schemaId))
       .willReturn(aResponse().withStatus(200).withHeader("Content-Type", "application/json").withBody(schemaString.toJson())));

Now when the KafkaAvroDeserializer calls the Schema Registry for the schema, it will receive the correct response.

On the producer side when an event is to be serialized using Avro, the KafkaAvroSerializer uses reflection on the class to get the schema, and then POSTs this to the Schema Registry to get the schema Id registered for this schema.

In the registerSchema(..) method again the wiremock is stubbed for this call for both events:

// Stub for the POST of the subject, to return the associated schemaId.
// (The Avro schema, obtained by the serializer by reflection, will be in the body POSTed).
// This is used by the Producer when serializing.
// /subjects/send-payment-value?deleted=false
stubFor(post(urlPathMatching("/subjects/"+topic+"-value"))
       .willReturn(aResponse().withStatus(200).withHeader("Content-Type", "application/json").withBody("{\"id\":"+schemaId+"}")));

The subject in the URL refers to the scope of the schema, defaulting to the topic name for the event followed by key or value. In the demo application only the value is serialized using Avro, so the subjects are send-payment-value and payment-sent-value. The response for this call is the schema Id, which is then serialized by the KafkaAvroSerializer as part of the outbound event.

Test Producer & Consumer

As the application is expecting Avro serialized events, the test must also use an Avro serializer. Likewise the events produced by the application are serialized using Avro, so the test consumer must deserialize accordingly. To achieve this a TestKafkaDemoConfiguration class is defined with the required KafkaAvroSerializer and KafkaAvroDeserializer for the test producer and consumer respectively. The test then uses a TestKafkaTemplate which is wired with these Spring beans when the Spring test application context is initialised, to send the send-payment events to the (in-memory) Kafka broker.

private void sendCommandViaKafka(SendPayment command) throws Exception {
   final ProducerRecord record = new ProducerRecord(SEND_PAYMENT_TOPIC, null, command.getPaymentId(), command);
   testKafkaTemplate.send(record).get();
}

On the receiving end the test consumer is defined with a handle method annotated with @KafkaListener. The containerFactory parameter ensures the required test container factory is wired in, which itself has the KakfaAvroDeserializer registered in the TestKafkaDemoConfiguration.

public static class KafkaTestListener {
   AtomicInteger counter = new AtomicInteger(0);

   @KafkaListener(groupId = "PaymentIntegrationTest", topics = "payment-sent", containerFactory = "testKafkaListenerContainerFactory", autoStartup = "true")
   void receive(@Payload final PaymentSent payload) {
       log.debug("KafkaTestListener - Received payment with Id: " + payload.getPaymentId());
       counter.incrementAndGet();
   }
}

Test Execution

The integration test is executed using the maven-surefire-plugin defined in the application pom.xml as part of the test phase:

mvn clean test

Conclusion

Integration testing is a vital layer of application testing, providing fast feedback to the developer on integration issues that would otherwise not be discovered until later in the pipeline. The in-memory Kafka broker ensures that an application that consumes or produces from Kafka is able to test and prove its integration with the message broker. Understanding the Avro serialization flows ensures that a wiremock can be used with these calls stubbed, proving that integration with the Schema Registry is behaving as expected.

Source Code

The source code for the accompanying Spring Boot demo application is available here:

https://github.com/lydtechconsulting/kafka-schema-registry-avro/tree/v1.0.0

More On Kafka Schema Registry & Avro

The following accompanying articles cover the Schema Registry and Avro:

Kafka Schema Registry & Avro: Introduction: provides an introduction to the schema registry and Avro, and details the serialization and deserialization flows.

Kafka Schema Registry & Avro: Spring Boot Demo (1 of 2): provides an overview of the companion Spring Boot application and steps to run the demo.

Kafka Schema Registry & Avro: Spring Boot Demo (2 of 2): details the Spring Boot application project structure, implementation and configuration which enables it to utilise the Schema Registry and Avro.

Kafka Schema Registry & Avro: Component Testing: looks at component testing the application using Testcontainers and the component-test-framework to bring up the application, Kafka, and the Schema Registry in Docker containers.


View this article on our Medium Publication.