Sunday, April 30, 2023
HomeProgrammingDeep-Dive into Spring Boot Kafka Elements

Deep-Dive into Spring Boot Kafka Elements


Within the earlier chapter, we did a fast begin with Kafka in Spring Boot. However on this chapter, we’re going to deep-dive into every of its parts and choices to know if we will use it to implement our personal use-case. In an effort to join with Kafka, we have to focus primarily into the next beans supplied by Kafka:

  • KafkaAdmin – To configure subjects in Kafka programmatically
  • ProducerFactory – To outline or override configurations to supply messages to Kafka
  • ConsumerFactory – To outline or override configurations to eat messages from Kafka

Now earlier than we begin trying into every of the parts, we’re going to once more take into account Steve and Jane’s e-commerce app that we had mentioned in our earlier chapters and choose a primary microservice that we will use to implement.

Since, we might be specializing in easy pub-sub module, let’s choose the Stock Service the place we’ll publish the brand new gadgets added to the stock record into the Kafka subject after which eat it to replace or ahead that data to different providers or subjects.

Configuration of Kafka Matters

Let’s begin with our first element as a part of the Kafka in Spring Boot. Until now, you probably have seen, we have now used command-line instruments to create subjects in Kafka. For instance, within the earlier chapter we ran one thing like this:

bin/kafka-topics.sh 
    --create 
    --zookeeper localhost:2181 
    --replication-factor 3 
    --partitions 3 
    --topic take a look at 
    --config min.insync.replicas=2

However in Spring Boot, they’ve launched and built-in AdminClient from Kafka to create the subjects programmatically. We’ll outline a configuration to make use of KafkaAdmin beans to outline new subjects. We’ll fetch the bootstrap servers and the subject particulars from the properties outlined as a part of utility.yml which we have now seen in our earlier chapter as properly:

server:
  port: 8080
spring:
  kafka:
    shopper:
      bootstrap-servers: localhost:9092
      subject: product-inventory
      forward-to: fulfillment-station
      group-id: group-id
      auto-start: true
      auto-offset-reset: earliest
    producer:
      bootstrap-servers: localhost:9092
      subject: product-inventory

Let’s outline our KafkaTopicConfig class:

@Configuration
public class KafkaTopicConfig {

    @Worth(worth = "${spring.kafka.producer.bootstrap-servers}")
    personal String bootstrapServers;

    @Worth(worth = "${spring.kafka.producer.subject}")
    personal String subject;

    @Bean
    public KafkaAdmin kafkaAdmin() {
        Map<String, Object> configs = new HashMap<>();
        configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        return new KafkaAdmin(configs);
    }

    @Bean
    public NewTopic subject() {
        return TopicBuilder.title(subject)
                .partitions(10)
                .replicas(3)
                .compact()
                .construct();
    }
}

This is able to create the product-inventory subject in Kafka with given configuration. KafkaAdmin additionally supplies functionality to outline a number of subjects in a sequence by defining a number of TopicBuilder config:

@Bean
public KafkaAdmin.NewTopics multipleTopics() {
    return new KafkaAdmin.NewTopics(
           TopicBuilder.title("stackabuse-product-inventory")
                   .construct(),
           TopicBuilder.title("product-inventory-bytes")
                   .replicas(1)
                   .construct(),
           TopicBuilder.title("fulfillment-station")
                   .partitions(5)
                   .construct()
    );
}

As you may see we’re additionally creating few extra subjects like stackabuse-product-inventory, product-inventory-bytes and fulfillment-station. We’ll look into every of its use-case additional on this chapter as we progress.

Publishing Messages to Matter

As soon as we’re accomplished with the creation of subjects, let’s look into our mechanism to publish messages. Since Producer cases are thread secure, if we use a single occasion all through an utility context, it’s going to give greater efficiency. So let’s begin defining a KafkaTemplate occasion and use it to ship messages.

KafkaTemplate

We have to first create a ProducerFactory occasion to set the technique for configuring Kafka Producer occasion. Then we’ll outline a KafkaTemplate bean which might wrap this Producer occasion and supply numerous comfort strategies to ship messages to Kafka subject. So we’ll first outline a HashMap to overload the properties which we’ll move to ProducerFactory which in flip might be once more handed to KafkaTemplate:

@Configuration
public class KafkaProducerConfig {

    @Worth(worth = "${spring.kafka.producer.bootstrap-servers}")
    personal String bootstrapServer;

    @Bean
    public Map<String, Object> producerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        return props;
    }

    @Bean
    public ProducerFactory<String, String> producerFactory() {
        return new DefaultKafkaProducerFactory<>(producerConfigs());
    }

    @Bean
    public KafkaTemplate<String, String> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }
}

Since KakfaTemplate cases are additionally thread secure, it’s at all times really helpful to make use of one occasion of it. Now we’ll use this KafkaTemplate occasion to ship our messages. So let’s outline our Controller class and obtain the messages as API to supply it in subject.

@Slf4j
@RestController
@RequestMapping(worth = "/stock")
public class PublishInventoryController {

    @Worth(worth = "${spring.kafka.producer.subject}")
    personal String subject;

    @Autowired
    personal KafkaTemplate<String, String> kafkaTemplate;

    @PostMapping(worth = "/publish")
    public void sendMessage(@RequestParam("message") String message) {
        log.information(String.format("Message despatched to stock -> %s", message));
        kafkaTemplate.ship(subject, message);
    }
}

Now if we begin our utility then we will publish our messages into the subject by calling the next API:

curl -i -X POST 
   -H "Content material-Kind:utility/json" 
   -d 
'' 
 'http://localhost:8080/stock/publish?message=Hellopercent2520Inventory'

The ship technique in KafkaTemplate is asynchronous in nature. Kafka is sort of a quick stream processing platform. Therefore, it is at all times higher to deal with the outcomes asynchronously in order that the next messages don’t watch for the results of the earlier message. However in any case, if we need to block the sending thread and carry out some actions then it additionally returns a ListenableFuture object. We are able to name the get API of the ListenableFuture object in order that it the thread would watch for the callback consequence. However this may decelerate the producer.

@PostMapping(worth = "/publish-with-callback")
public void sendMessageWithCallback(@RequestParam("message") String message) {
    log.information(String.format("Message with callback despatched to stock -> %s", message));
    ListenableFuture<SendResult<String, String>> future = kafkaTemplate.ship(subject, message);
    future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {

    	@Override
        public void onSuccess(SendResult<String, String> consequence) {
            log.information("Despatched message=[" + item +
                     "] with offset=[" + result.getRecordMetadata().offset() + "]");
        }

        @Override
        public void onFailure(Throwable ex) {
            log.error("Unable to ship message=[" + item + "] as a result of error : " + ex.getMessage());
        }
    });
}

Now we will once more name the API, however this time it’s going to print the messages within the logs with the returned callback response.

curl -i -X POST 
   -H "Content material-Kind:utility/json" 
   -d 
'' 
 'http://localhost:8080/stock/publish-with-callback?message=Hellopercent2520Inventory'

If we don’t need to work with the Future, then we will additionally register a ProducerListener occasion and move it whereas creating the KafkaTemplate occasion.

@Slf4j
@Configuration
public class KafkaProducerConfig {

    ...

    @Bean
    public KafkaTemplate<String, String> kafkaTemplate() {
        KafkaTemplate<String, String> template = new KafkaTemplate<>(producerFactory());
        template.setProducerListener(new ProducerListener<String, String>() {
           	 @Override
    		public void onSuccess(ProducerRecord<String, String> producerRecord,
                                  RecordMetadata recordMetadata) {
      			log.information("ACK acquired from ProducerListener message: {} offset: {}",
                         producerRecord.worth(), recordMetadata.offset());
    		}
        });
        return template;
    }
}

RoutingKafkaTemplate

Suppose if we have now a number of producers with completely different configurations or subject names, then we will use RoutingKafkaTemplate to pick out producers at runtime based mostly upon sure regex of the subject names at runtime.

@Configuration
public class KafkaProducerConfig {

    ...

    @Bean
    public RoutingKafkaTemplate routingTemplate(GenericApplicationContext context) {
        // ProducerFactory with Bytes serializer
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
        DefaultKafkaProducerFactory<Object, Object> bytesPF = new DefaultKafkaProducerFactory<>(props);
        context.registerBean(DefaultKafkaProducerFactory.class, "bytesPF", bytesPF);

        // ProducerFactory with String serializer
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        DefaultKafkaProducerFactory<Object, Object> stringPF = new DefaultKafkaProducerFactory<>(props);
        Map<Sample, ProducerFactory<Object, Object>> map = new LinkedHashMap<>();
        map.put(Sample.compile(".*-bytes"), bytesPF);
        map.put(Sample.compile("stackabuse-.*"), stringPF);
        return new RoutingKafkaTemplate(map);
    }
}

In order we will see, RoutingKafkaTemplate takes a HashMap of java.util.regex.Sample and ProducerFactory occasion after which redirects the messages to the ProducerFactory matching a given subject title. We’ve created two patterns, .*-bytes which is able to use ByteArraySerializer and stackabuse-.* which is able to use StringSerializer respectively. In the event you discover above, we had created this subjects earlier.

Consuming Messages from Matter

As soon as we’re accomplished with publishing messages into Kafka subject, let’s shortly look into numerous methods to eat or obtain messages from a given subject.

Message Listener Containers

We are able to obtain messages by configuring a MessageListenerContainer as a bean. Spring Boot supplies two MessageListenerContainer implementations:

  • KafkaMessageListenerContainer
  • ConcurrentMessageListenerContainer

The KafkaMessageListenerContainer receives all message from all subjects or partitions on a single thread whereas the ConcurrentMessageListenerContainer delegates to a number of KafkaMessageListenerContainer cases to offer multi-threaded consumption.

@KafkaListener at Technique Stage

In an effort to obtain messages asynchronously from Kafka, Spring Boot additionally supplies @KafkaListener annotation which requires to configure a ConsumerFactory and KafkaListenerContainerFactory bean. Yet one more factor to notice is that, @EnableKafka annotation can be required to be outlined on the configuration class to allow detection of @KafkaListener annotation on Spring managed beans.

Let’s first outline KafkaConsumerConfig:

@EnableKafka
@Configuration
public class KafkaConsumerConfig {

    @Worth(worth = "${spring.kafka.shopper.bootstrap-servers}")
    personal String bootstrapServer;

    @Worth(worth = "${spring.kafka.shopper.group-id}")
    personal String groupId;

    @Worth(worth = "${spring.kafka.shopper.auto-offset-reset}")
    personal String autoOffsetResetConfig;

    @Bean
    public ConsumerFactory<String, String> consumerFactory() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetResetConfig);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        return new DefaultKafkaConsumerFactory<>(props);
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> manufacturing facility = 
            new ConcurrentKafkaListenerContainerFactory<>();
        manufacturing facility.setConsumerFactory(consumerFactory());
        manufacturing facility.setBatchListener(true);
        manufacturing facility.setConcurrency(3);
        manufacturing facility.getContainerProperties().setPollTimeout(3000);
        return manufacturing facility;
    }
}

Then we’ll outline a Service layer to eat the messages and print it in console:

@Slf4j
@Service
public class ConsumeInventoryService {

    @KafkaListener(id = "${spring.kafka.shopper.group-id}",
            subjects = "${spring.kafka.shopper.subject}",
            groupId = "${spring.kafka.shopper.group-id}",
            autoStartup = "${spring.kafka.shopper.auto-start:true}")
    public void listenMessage(String message,
                              @Header(title = KafkaHeaders.RECEIVED_MESSAGE_KEY, required = false)
                              Integer key,
                              @Header(KafkaHeaders.RECEIVED_TOPIC) String subject,
                              @Header(KafkaHeaders.GROUP_ID) String groupId,
                              @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,
                              @Header(KafkaHeaders.RECEIVED_TIMESTAMP) lengthy ts) {
        log.information("Obtained Message {} with key {} in subject {} as a part of group {} at partition {} at timestamp {}.",
                message,
                key,
                subject,
                groupId,
                partition,
                ts);
    }
}

As you may see, we’re additionally printing the varied message headers of Kafka utilizing KafkaHeaders object.

@KafkaListener at Class Stage

Within the earlier part, we have now used @KafkaListener on the technique stage. However Spring Boot additionally permits to outline @KafkaListener on the class stage. In an effort to try this, we should outline @KafkaHandler on the technique stage. Often at any time when the message is acquired, the tactic to name is set by the transformed message payload kind. We are able to additionally designate a @KafkaHandler technique because the default technique if there isn’t any match on different strategies.

@Slf4j
@Service
@KafkaListener(id = "multi-"+"${spring.kafka.shopper.group-id}",
        subjects = "${spring.kafka.shopper.subject}",
        groupId = "multi-"+"${spring.kafka.shopper.group-id}",
        autoStartup = "${spring.kafka.shopper.@auto-start:true}")
public class ConsumeInventoryMultiListenerService {

    @KafkaHandler
    void hear(String message) {
        log.information("KafkaHandler-String: {}", message);
    }

    @KafkaHandler(isDefault = true)
    Object listenDefault(Object object) {
        log.information("KafkaHandler-Default: {}", object);
        return object;
    }
}

Message Forwarder utilizing @SendTo annotation

Spring Boot additionally permits us to annotate a @KafkaListener with a @SendTo annotation to ahead the messages as it’s acquired to another subject. In an effort to execute this, we have to modify the configurations to simply accept a reply template on Kafka Listener manufacturing facility.

@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory(
        KafkaTemplate<String, String> kafkaTemplate) {
    ConcurrentKafkaListenerContainerFactory<String, String> manufacturing facility =
        new ConcurrentKafkaListenerContainerFactory<>();
    manufacturing facility.setConsumerFactory(consumerFactory());
    manufacturing facility.setReplyTemplate(kafkaTemplate);
    manufacturing facility.setBatchListener(true);
    manufacturing facility.setConcurrency(3);
    manufacturing facility.getContainerProperties().setPollTimeout(3000);
    return manufacturing facility;
}

And eventually we will add a @SendTo annotation together with @KafkaListener:

@KafkaListener(id = "${spring.kafka.shopper.group-id}",
        subjects = "${spring.kafka.shopper.subject}",
        groupId = "${spring.kafka.shopper.group-id}",
        autoStartup = "${spring.kafka.shopper.auto-start:true}")
@SendTo("${spring.kafka.shopper.forward-to}")
public void listenMessage(String message,
                          @Header(title = KafkaHeaders.RECEIVED_MESSAGE_KEY, required = false)
                          Integer key,
                          @Header(KafkaHeaders.RECEIVED_TOPIC) String subject,
                          @Header(KafkaHeaders.GROUP_ID) String groupId,
                          @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,
                          @Header(KafkaHeaders.RECEIVED_TIMESTAMP) lengthy ts) {
     log.information("Obtained Message {} with key {} in subject {} as a part of group {} at partition {} at timestamp {}.",
             message,
             key,
             subject,
             groupId,
             partition,
             ts);
}
@KafkaListener(id = "multi-"+"${spring.kafka.shopper.group-id}",
        subjects = "${spring.kafka.shopper.subject}",
        groupId = "multi-"+"${spring.kafka.shopper.group-id}",
        autoStartup = "${spring.kafka.shopper.@auto-start:true}")
public class ConsumeInventoryMultiListenerService {

    @KafkaHandler
    @SendTo("${spring.kafka.shopper.forward-to}")
    void hear(String message) {
        log.information("KafkaHandler-String: {}", message);
    }

    @KafkaHandler(isDefault = true)
    @SendTo("${spring.kafka.shopper.forward-to}")
    Object listenDefault(Object object) {
        log.information("KafkaHandler-Default: {}", object);
        return object;
    }
}

Learn from a particular partition

If we have now subjects with a number of partitions, then @KafkaListener can subscribe explicitly to a specific partition of subject with an preliminary offset:

@Slf4j
@Service
@KafkaListener(id = "multi-"+"${spring.kafka.shopper.group-id}",
        subjects = "${spring.kafka.shopper.subject}",
        groupId = "multi-"+"${spring.kafka.shopper.group-id}",
        autoStartup = "${spring.kafka.shopper.@auto-start:true}",
        topicPartitions = @TopicPartition(subject = "${spring.kafka.shopper.subject}", partitionOffsets = {
                @PartitionOffset(partition = "0", initialOffset = "0"),
                @PartitionOffset(partition = "4", initialOffset = "0")}))
public class ConsumeInventoryMultiListenerService {

    ...
}

Every time this explicit listener is initialized, all of the beforehand consumed messages from the partitions 0 and 4 might be re-consumed each time. The reason being that at each partition offset, the initialOffset has been set to 0 within the listener.

Suppose, if there’s any use-case the place we don’t have to set the offset, then we will merely use the partitions property to set the partitions with out the offset.

@KafkaListener(id = "${spring.kafka.shopper.group-id}",
        subjects = "${spring.kafka.shopper.subject}",
        groupId = "${spring.kafka.shopper.group-id}",
        autoStartup = "${spring.kafka.shopper.auto-start:true}",
        topicPartitions = @TopicPartition(subject = "${spring.kafka.shopper.subject}",
                partitions = { "0", "1", "3", "6", "8"}))
@SendTo("${spring.kafka.shopper.forward-to}")
public void listenMessage(String message,
                          @Header(title = KafkaHeaders.RECEIVED_MESSAGE_KEY, required = false)
                          Integer key,
                          @Header(KafkaHeaders.RECEIVED_TOPIC) String subject,
                          @Header(KafkaHeaders.GROUP_ID) String groupId,
                          @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,
                          @Header(KafkaHeaders.RECEIVED_TIMESTAMP) lengthy ts) {
    log.information("Obtained Message {} with key {} in subject {} as a part of group {} at partition {} at timestamp {}.",
            message,
            key,
            subject,
            groupId,
            partition,
            ts);
}

Message filters in Listeners

In case if we have to filter the incoming messages based mostly upon a regex or some content material, then we will add a customized filter to the listener that we have now outlined by setting a RecordFilterStrategy.

@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory(
        KafkaTemplate<String, String> kafkaTemplate) {
    ConcurrentKafkaListenerContainerFactory<String, String> manufacturing facility =
        new ConcurrentKafkaListenerContainerFactory<>();
    manufacturing facility.setConsumerFactory(consumerFactory());
    manufacturing facility.setReplyTemplate(kafkaTemplate);
    manufacturing facility.setBatchListener(true);
    manufacturing facility.setConcurrency(3);
    manufacturing facility.getContainerProperties().setPollTimeout(3000);
    manufacturing facility.setRecordFilterStrategy(
            file -> file.worth().getName().accommodates("Stock"));
    return manufacturing facility;
}

Then we will move this container manufacturing facility to the @KafkaListener annotation:

@KafkaListener(id = "${spring.kafka.shopper.group-id}",
        subjects = "${spring.kafka.shopper.subject}",
        groupId = "${spring.kafka.shopper.group-id}",
        autoStartup = "${spring.kafka.shopper.auto-start:true}",
        topicPartitions = @TopicPartition(subject = "${spring.kafka.shopper.subject}",
                partitions = { "0", "1", "3", "6", "8"}),
        containerFactory = "kafkaListenerContainerFactory")
@SendTo("${spring.kafka.shopper.forward-to}")
public void listenMessage(String message,
                          @Header(title = KafkaHeaders.RECEIVED_MESSAGE_KEY, required = false)
                          Integer key,
                          @Header(KafkaHeaders.RECEIVED_TOPIC) String subject,
                          @Header(KafkaHeaders.GROUP_ID) String groupId,
                          @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,
                          @Header(KafkaHeaders.RECEIVED_TIMESTAMP) lengthy ts) {
    log.information("Obtained Message {} with key {} in subject {} as a part of group {} at partition {} at timestamp {}.",
            message,
            key,
            subject,
            groupId,
            partition,
            ts);
}

Thus, all of the messages that match the matching filter might be discarded on this listener. On this approach, we will subscribe to the messages from the subject utilizing numerous means and additional course of the information as per our comfort.

Customized Serializers/Deserializers

Until now, we have now been sending or receiving String messages. Nevertheless, Spring Boot additionally helps sending and receiving of customized objects. However that requires extra configuration of serializer in ProducerFactory and desrializer in ConsumerFactory.

Within the beginning of this chapter, we determined that we’ll be engaged on the Stock service for the e-commerce app. So let’s now outline the straightforward bean class for an merchandise to be saved within the stock:

@ToString
@Builder
@Knowledge
@AllArgsConstructor
@NoArgsConstructor
public class InventoryItem {

    personal String id;
    personal String title;
    personal int rely;
    personal Date listingDate;
}

Right here we’re utilizing the Lombok annotations to outline a Builder sample and initiating the constructors. We’ll use these builder in our upcoming implementation.

Ship Customized Messages

Let’s outline a JsonSerializer to serialize the article and ship it as a JSON content material to the subject. In an effort to try this we have to modify our current ProducerFactory and replace our KafkaTemplate:

@Configuration
public class KafkaProducerConfig {

    ...
        
    @Bean
    public Map<String, Object> producerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
        return props;
    }

    @Bean
    public ProducerFactory<String, InventoryItem> producerFactory() {
        return new DefaultKafkaProducerFactory<>(producerConfigs());
    }

    @Bean
    public KafkaTemplate<String, InventoryItem> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }
}

Now we will replace our Controller implementation to ship InventoryItem merchandise object to KafkaTemplate:

@Autowired
personal KafkaTemplate<String, InventoryItem> kafkaTemplate;

@PostMapping(worth = "/publish")
@Transactional("kafkaTransactionManager")
public void sendMessage(@RequestParam("title") String title, @RequestParam("rely") int rely) {
    InventoryItem merchandise = InventoryItem.builder()
            .id(UUID.randomUUID().toString())
            .title(title)
            .rely(rely)
            .listingDate(new Date())
            .construct();
    log.information(String.format("Message despatched to stock -> %s", merchandise));
    kafkaTemplate.ship(subject, merchandise);
}

Obtain Customized Messages

In an effort to eat the identical objects, we have to replace our ConsumerFactory and ConcurrentKafkaListenerContainerFactory to simply accept the JSON objects and convert into Java POJOs:

@EnableKafka
@Configuration
public class KafkaConsumerConfig {

    ...

    @Bean
    public ConsumerFactory<String, InventoryItem> consumerFactory() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetResetConfig);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
        return new DefaultKafkaConsumerFactory<>(props,
                new StringDeserializer(),
                new JsonDeserializer<>(InventoryItem.class));
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, InventoryItem> kafkaListenerContainerFactory(
            KafkaTemplate<String, InventoryItem> kafkaTemplate) {
        ConcurrentKafkaListenerContainerFactory<String, InventoryItem> manufacturing facility = new ConcurrentKafkaListenerContainerFactory<>();
        manufacturing facility.setConsumerFactory(consumerFactory());
        manufacturing facility.setReplyTemplate(kafkaTemplate);
        manufacturing facility.setBatchListener(true);
        manufacturing facility.setConcurrency(3);
        manufacturing facility.getContainerProperties().setPollTimeout(3000);
        manufacturing facility.setRecordFilterStrategy(
                file -> file.worth().getName().accommodates("Stock"));
        return manufacturing facility;
    }
}

Now we have to replace our service layer with KafkaListener to eat the Stock merchandise.

@KafkaListener(id = "${spring.kafka.shopper.group-id}",
        subjects = "${spring.kafka.shopper.subject}",
        groupId = "${spring.kafka.shopper.group-id}",
        autoStartup = "${spring.kafka.shopper.auto-start:true}",
        topicPartitions = @TopicPartition(subject = "${spring.kafka.shopper.subject}",
                partitions = { "0", "1", "3", "6", "8"}))
@SendTo("${spring.kafka.shopper.forward-to}")
public void listenMessage(@Payload @Legitimate InventoryItem merchandise,
                          @Header(title = KafkaHeaders.RECEIVED_MESSAGE_KEY, required = false)
                          Integer key,
                          @Header(KafkaHeaders.RECEIVED_TOPIC) String subject,
                          @Header(KafkaHeaders.GROUP_ID) String groupId,
                          @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,
                          @Header(KafkaHeaders.RECEIVED_TIMESTAMP) lengthy ts) {
    log.information("Obtained Message {} with key {} in subject {} as a part of group {} at partition {} at timestamp {}.",
            merchandise,
            key,
            subject,
            groupId,
            partition,
            ts);
}

We are able to annotate our objects with @Legitimate to carry out any sort of validation to simply accept or reject the messages as properly.

Transaction Assist

Kafka helps transactional function the place it ensures exactly-once message supply between producer and shopper purposes via the Transactional API. The purposes that primarily exhibit the “consume-process-produce” sample want to make use of transactions to assist atomic operations. For instance, if we have now the next coding sample then we’d like to verify of the exactly-once message supply:

1. eat(message) {
2.     course of(message);
3.     template.ship("subject", message1);
4.     saveInDb(message);
5.     template.ship("subject", message2);
6. }

Let’s take a fast have a look at numerous prospects the place we will see exceptions:

  • If there’s any sort of exception at line no. 2, then the message could be subsequent consumed when the patron restarts. This may be resolved if the messages is idempotant.
  • Let’s say if an exception is thrown at line no. 3, then the message1 could be revealed when the patron restarts once more. Thus message1 is revealed twice.
  • If an exception is thrown at line no. 4, then the message might be saved in database once more when the patron restarts. Thus the message is saved twice and the message1 can be revealed twice.
  • Let’s say if there’s an exception after line no. 4, then message2 might be revealed once more when the patron restarts.

Now take into account if we have now transactions in place, then at any time when any exception happens, then all of the messages which were revealed and the database transactions that occurred inside a transaction might be rolled again. When the patron restarts, then all of the operations inside the transaction might be dedicated if all of the processes succeed.

Initially, Kafka use to assist solely at-most-once or at-least-once message supply. However with the introduction of transactions between Kafka brokers and shoppers, purposes began guaranteeing exactly-once message supply.

A Transaction-aware Producer

We are able to outline a typical Kafka producer to allow transactions. To start with we have to allow idempotence and specify a prefix for the transaction id:

spring:
  kafka:
    producer:
      bootstrap-servers: localhost:9092
      subject: product-inventory
      enable-idempotence: true
      transaction-id-prefix: tx-

Then we’ll replace our producer configuration to move these values as properties for Kafka Producer:

@Configuration
public class KafkaProducerConfig {

	...

    @Worth(worth = "${spring.kafka.producer.enable-idempotence}")
    personal String enableIdempotence;

    @Worth(worth = "${spring.kafka.producer.transaction-id-prefix}")
    personal String transactionIdPrefix;

    @Bean
    public Map<String, Object> producerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer);
        props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, enableIdempotence);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
        return props;
    }

    @Bean
    public ProducerFactory<String, InventoryItem> producerFactory() {
        DefaultKafkaProducerFactory<String, InventoryItem> defaultKafkaProducerFactory = new DefaultKafkaProducerFactory<>(producerConfigs());
        defaultKafkaProducerFactory.setTransactionIdPrefix(transactionIdPrefix);
        return defaultKafkaProducerFactory;
    }

    @Bean
    public KafkaTransactionManager<String, InventoryItem> kafkaTransactionManager() {
        return new KafkaTransactionManager<>(producerFactory());
    }

    ...
}

As we have now enabled idempotence, Kafka will use this transaction id prefix as a part of its precise algorithm to probably deduplicate any sort of message that’s being despatched by the producer. This settings allow Kafka to note if the producer sends the identical message to the subject greater than as soon as. Now we have to simply ensure that the transaction id is sort of distinct for every producer whereas constant even when it restarts.

We are able to additionally discover that we have now outlined a Transaction Supervisor. We have to annotate the present technique that sends the message with @Transactional and move this transaction supervisor bean. We must also use executeInTransaction() technique as an alternative of ship() to course of the transactions.

@PostMapping(worth = "/publish")
@Transactional("kafkaTransactionManager")
public void sendMessage(@RequestParam("title") String title, @RequestParam("rely") int rely) {
    InventoryItem merchandise = InventoryItem.builder()
             .id(UUID.randomUUID().toString())
             .title(title)
             .rely(rely)
             .listingDate(new Date())
             .construct();
    log.information(String.format("Message despatched to stock -> %s", merchandise));
    kafkaTemplate.executeInTransaction(t -> t.ship(subject, merchandise));
}

This is able to register the dealer with the dealer which might now use transactions. The dealer in flip would use these to carry out write-ahead transactions to a transaction log. Thus the dealer will take away any actions kind that log which belongs to a producer with the same transaction id however from an earlier epoch.

A Transaction-aware Client

Equally, we will ensure that the shoppers adhere to the transactions whereas consuming messages from the subject. Though we learn all of the messages from a subject partition in a specific order, we have now two choices to learn the transactional messages:

  • read_committed: This enables us to not solely learn the messages that aren’t a part of the transaction, but additionally learn those after the transaction is dedicated.
  • read_uncommitted: This enables us to learn all of the messages within the offset order with out ready for the general transactions to be dedicated.

We have to thus outline isolation.stage on the eat config to move on these property. The default worth of isolation.stage is read_uncommitted.

spring:
  kafka:
    shopper:
      bootstrap-servers: localhost:9092
      subject: product-inventory
      forward-to: fulfillment-station
      group-id: group-id
      auto-start: true
      auto-offset-reset: earliest
      enable-auto-commit: false
      isolation-level: read_committed
@EnableKafka
@Configuration
public class KafkaConsumerConfig {

    ...

    @Worth(worth = "${spring.kafka.shopper.isolation-level}")
    personal String isolationLevel;

    @Worth(worth = "${spring.kafka.shopper.enable-auto-commit}")
    personal String enableAutoCommit;

    @Bean
    public ConsumerFactory<String, InventoryItem> consumerFactory() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetResetConfig);
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit);
        props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, isolationLevel);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
        return new DefaultKafkaConsumerFactory<>(props,
                new StringDeserializer(),
                new JsonDeserializer<>(InventoryItem.class));
    }

    ...
}

Then we will additionally annotate our @KafkaListener with @Transactional annotation to verify it follows transactions.

@KafkaListener(id = "${spring.kafka.shopper.group-id}",
        subjects = "${spring.kafka.shopper.subject}",
        groupId = "${spring.kafka.shopper.group-id}",
        autoStartup = "${spring.kafka.shopper.auto-start:true}",
        topicPartitions = @TopicPartition(subject = "${spring.kafka.shopper.subject}",
                partitions = { "0", "1", "3", "6", "8"}))
@SendTo("${spring.kafka.shopper.forward-to}")
@Transactional("kafkaTransactionManager")
public void listenMessage(@Payload InventoryItem merchandise,
                          @Header(title = KafkaHeaders.RECEIVED_MESSAGE_KEY, required = false)
                          Integer key,
                          @Header(KafkaHeaders.RECEIVED_TOPIC) String subject,
                          @Header(KafkaHeaders.GROUP_ID) String groupId,
                          @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,
                          @Header(KafkaHeaders.RECEIVED_TIMESTAMP) lengthy ts) {
    log.information("Obtained Message {} with key {} in subject {} as a part of group {} at partition {} at timestamp {}.",
             merchandise,
             key,
             subject,
             groupId,
             partition,
             ts);
}

Conclusion

On this chapter, we took a fairly deep-dive into the varied options supplied by Kafka assist for Spring. We’ve additionally constructed the Stock Service for the ecommerce-app that we began with to assist out Steve and Jane.

The general implementation for this and the earlier chapter may be present in GitHub.

Within the subsequent chapter, we’ll have a look into the Spring Cloud Stream and its numerous binders to assist Kafka.

RELATED ARTICLES

LEAVE A REPLY

Please enter your comment!
Please enter your name here

- Advertisment -
Google search engine

Most Popular

Recent Comments