Kafka Consumer — Spring Kafka

Shivaganesh
3 min readAug 15, 2023

--

Kafka is widely used technology to implement event driven architecture. It is a distributed publish-subscribe(pub-sub) messaging system and can handle high volume of data. Producers publish messages i.e., kafka events to a topic which is consumed by the consumers who subscribed to the topic.

There are various ways to implement Kafka based applications.

  1. Kafka Connect — Kafka connect is configuration based approach. There is no coding required here. So it is very easy to implement. If any data transformation is required, it can be achieved using Single Message Transformer(SMT). If there are any complex transformations which are not available as SMT, then Kafka streams and ksqldb can be used to achieve the same which requires coding. Confluent provides various source(source for kafka event) and sink(consumed and stored in destination storage) connectors which allows to produce or consume kafka events from/to different systems/storages. Eg: JDBC Source/Sink Connector, MongoDB Sink Connector etc. Kafka configurations are limited the kafka connector configurations.
  2. Spring Cloud Stream — This is a spring cloud approach which is mostly configuration based approach. It allows to easily change the underlying messaging system i.e., with little configuration change kafka, RabbitMq etc. can be used as messaging system. spring cloud stream binder kafka is used to implement kafka based messaging application. Kafka configuration is limited by the features supported by spring cloud stream.
  3. Spring Kafka — Spring based solution. It requires coding effort. It allows to fully configure kafka as per the needs. spring kafka library is used to build the application. Kafka template is used to produce the kafka messages and @KafkaListener is used to configure the consumers.

Kafka Consumer using Spring Kafka

Before implementing the kafka consumer, consumer configuration should be set. 2 main configurations are consumer factory and listener container factory.

@Bean
public ConsumerFactory<String, GenericRecord> consumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress); // kakfa broker address
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); // where to start listening events. latest/earliest
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class);
props.put(ErrorHandlingDeserializer.KEY_DESERIALIZER_CLASS, StringDeserializer.class);
props.put(ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS, KafkaAvroDeserializer.class);
props.put(KafkaAvroDeserializerConfig.SCHEMA_REGISTRY_URL_CONFIG, schemaRegistryUrl); //schema registry address if using avro based messages
props.put(KafkaAvroDeserializerConfig.AUTO_REGISTER_SCHEMAS, false); //auto register new schema setting
props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, true);
return new DefaultKafkaConsumerFactory(props, new StringDeserializer(), new KafkaAvroDeserializer());
}


@Bean
public ConcurrentKafkaListenerContainerFactory<String, GenericRecord>
myListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, GenericRecord> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory()); //set the consumer factory
return factory;
}

Spring kafka allows to implement kafka listener/consumer by using @KafkaListener annotation on a method which has consuming logic.

    @KafkaListener(id = "${spring.kafka.consumer.group-id}", topics = "${topic}", containerFactory = "myListenerContainerFactory")
public void listen(@Headers MessageHeaders headers, @Payload MyRecord record) throws IOException {
log.info("Headers: {}", headers);
log.info("RECEIVED_KEY: {}", headers.get(KafkaHeaders.RECEIVED_KEY));
log.info("RECEIVED_TOPIC: {}", headers.get(KafkaHeaders.RECEIVED_TOPIC));
log.info("Payload: {}", record);
}

Above code is kafka consumer which belongs to a consumer group. Consumer group is a set of consumers who listens to kafka events within a specific parition of a kafka topic. Each kafka message has Message headers and payload. Headers contains kafka headers and custom headers set by the producer application. Payload is the kafka event.

In case of complex applications, there are many chances that consumer may not be able to successfully consume and process the incoming messages in one go. In such cases, retrying the message logic is necessary. On implementing normal retries, there are chances of slow message consumption. If message ordering is not of concern, non blocking retries can be implemented. Implementing this logic via code is quite complex. But now spring kafka allows to implement non-blocking retries using @RetrableTopic annotation.

    @RetryableTopic(
attempts = "3",
topicSuffixingStrategy = TopicSuffixingStrategy.SUFFIX_WITH_INDEX_VALUE,
backoff = @Backoff(delay = 3000, multiplier = 2.0),
exclude = {SerializationException.class, DeserializationException.class},
autoCreateTopics = "true")

It allows to configure no. of retries, exponential backoff strategy to retry. More interestingly it allows to create retry and dead letter topics(DLT) and configure the topics. It also allows to include and exclude the exceptions for which retries should be done or not before moving the message to DLT.

In this article, I have provided brief information related to implementing kafka consumer. Spring Kafka based consumer implementation code can be found here. Thank you. If you liked the article, don’t forget to like it.

--

--

Shivaganesh
Shivaganesh

Written by Shivaganesh

Blockchain Developer experienced in developing Supply chain, Pharma, Telecom domain solutions using Hyperledger Fabric, R3 Corda, Symbiont Assembly and DAML

No responses yet