Within the earlier chapter, we did a fast begin with Kafka in Spring Boot. However on this chapter, we’re going to deep-dive into every of its parts and choices to know if we will use it to implement our personal use-case. In an effort to join with Kafka, we have to focus primarily into the next beans supplied by Kafka:
KafkaAdmin
– To configure subjects in Kafka programmaticallyProducerFactory
– To outline or override configurations to supply messages to KafkaConsumerFactory
– To outline or override configurations to eat messages from Kafka
Now earlier than we begin trying into every of the parts, we’re going to once more take into account Steve and Jane’s e-commerce app that we had mentioned in our earlier chapters and choose a primary microservice that we will use to implement.
Since, we might be specializing in easy pub-sub module, let’s choose the Stock Service the place we’ll publish the brand new gadgets added to the stock record into the Kafka subject after which eat it to replace or ahead that data to different providers or subjects.
Configuration of Kafka Matters
Let’s begin with our first element as a part of the Kafka in Spring Boot. Until now, you probably have seen, we have now used command-line instruments to create subjects in Kafka. For instance, within the earlier chapter we ran one thing like this:
bin/kafka-topics.sh
--create
--zookeeper localhost:2181
--replication-factor 3
--partitions 3
--topic take a look at
--config min.insync.replicas=2
However in Spring Boot, they’ve launched and built-in AdminClient
from Kafka to create the subjects programmatically. We’ll outline a configuration to make use of KafkaAdmin
beans to outline new subjects. We’ll fetch the bootstrap servers and the subject particulars from the properties outlined as a part of utility.yml
which we have now seen in our earlier chapter as properly:
server:
port: 8080
spring:
kafka:
shopper:
bootstrap-servers: localhost:9092
subject: product-inventory
forward-to: fulfillment-station
group-id: group-id
auto-start: true
auto-offset-reset: earliest
producer:
bootstrap-servers: localhost:9092
subject: product-inventory
Let’s outline our KafkaTopicConfig
class:
@Configuration
public class KafkaTopicConfig {
@Worth(worth = "${spring.kafka.producer.bootstrap-servers}")
personal String bootstrapServers;
@Worth(worth = "${spring.kafka.producer.subject}")
personal String subject;
@Bean
public KafkaAdmin kafkaAdmin() {
Map<String, Object> configs = new HashMap<>();
configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
return new KafkaAdmin(configs);
}
@Bean
public NewTopic subject() {
return TopicBuilder.title(subject)
.partitions(10)
.replicas(3)
.compact()
.construct();
}
}
This is able to create the product-inventory subject in Kafka with given configuration. KafkaAdmin
additionally supplies functionality to outline a number of subjects in a sequence by defining a number of TopicBuilder
config:
@Bean
public KafkaAdmin.NewTopics multipleTopics() {
return new KafkaAdmin.NewTopics(
TopicBuilder.title("stackabuse-product-inventory")
.construct(),
TopicBuilder.title("product-inventory-bytes")
.replicas(1)
.construct(),
TopicBuilder.title("fulfillment-station")
.partitions(5)
.construct()
);
}
As you may see we’re additionally creating few extra subjects like stackabuse-product-inventory, product-inventory-bytes and fulfillment-station. We’ll look into every of its use-case additional on this chapter as we progress.
Publishing Messages to Matter
As soon as we’re accomplished with the creation of subjects, let’s look into our mechanism to publish messages. Since Producer cases are thread secure, if we use a single occasion all through an utility context, it’s going to give greater efficiency. So let’s begin defining a KafkaTemplate
occasion and use it to ship messages.
KafkaTemplate
We have to first create a ProducerFactory
occasion to set the technique for configuring Kafka Producer occasion. Then we’ll outline a KafkaTemplate
bean which might wrap this Producer occasion and supply numerous comfort strategies to ship messages to Kafka subject. So we’ll first outline a HashMap
to overload the properties which we’ll move to ProducerFactory
which in flip might be once more handed to KafkaTemplate
:
@Configuration
public class KafkaProducerConfig {
@Worth(worth = "${spring.kafka.producer.bootstrap-servers}")
personal String bootstrapServer;
@Bean
public Map<String, Object> producerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
return props;
}
@Bean
public ProducerFactory<String, String> producerFactory() {
return new DefaultKafkaProducerFactory<>(producerConfigs());
}
@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
}
Since KakfaTemplate cases are additionally thread secure, it’s at all times really helpful to make use of one occasion of it. Now we’ll use this KafkaTemplate
occasion to ship our messages. So let’s outline our Controller class and obtain the messages as API to supply it in subject.
@Slf4j
@RestController
@RequestMapping(worth = "/stock")
public class PublishInventoryController {
@Worth(worth = "${spring.kafka.producer.subject}")
personal String subject;
@Autowired
personal KafkaTemplate<String, String> kafkaTemplate;
@PostMapping(worth = "/publish")
public void sendMessage(@RequestParam("message") String message) {
log.information(String.format("Message despatched to stock -> %s", message));
kafkaTemplate.ship(subject, message);
}
}
Now if we begin our utility then we will publish our messages into the subject by calling the next API:
curl -i -X POST
-H "Content material-Kind:utility/json"
-d
''
'http://localhost:8080/stock/publish?message=Hellopercent2520Inventory'
The ship
technique in KafkaTemplate
is asynchronous in nature. Kafka is sort of a quick stream processing platform. Therefore, it is at all times higher to deal with the outcomes asynchronously in order that the next messages don’t watch for the results of the earlier message. However in any case, if we need to block the sending thread and carry out some actions then it additionally returns a ListenableFuture
object. We are able to name the get
API of the ListenableFuture
object in order that it the thread would watch for the callback consequence. However this may decelerate the producer.
@PostMapping(worth = "/publish-with-callback")
public void sendMessageWithCallback(@RequestParam("message") String message) {
log.information(String.format("Message with callback despatched to stock -> %s", message));
ListenableFuture<SendResult<String, String>> future = kafkaTemplate.ship(subject, message);
future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {
@Override
public void onSuccess(SendResult<String, String> consequence) {
log.information("Despatched message=[" + item +
"] with offset=[" + result.getRecordMetadata().offset() + "]");
}
@Override
public void onFailure(Throwable ex) {
log.error("Unable to ship message=[" + item + "] as a result of error : " + ex.getMessage());
}
});
}
Now we will once more name the API, however this time it’s going to print the messages within the logs with the returned callback response.
curl -i -X POST
-H "Content material-Kind:utility/json"
-d
''
'http://localhost:8080/stock/publish-with-callback?message=Hellopercent2520Inventory'
If we don’t need to work with the Future
, then we will additionally register a ProducerListener
occasion and move it whereas creating the KafkaTemplate
occasion.
@Slf4j
@Configuration
public class KafkaProducerConfig {
...
@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
KafkaTemplate<String, String> template = new KafkaTemplate<>(producerFactory());
template.setProducerListener(new ProducerListener<String, String>() {
@Override
public void onSuccess(ProducerRecord<String, String> producerRecord,
RecordMetadata recordMetadata) {
log.information("ACK acquired from ProducerListener message: {} offset: {}",
producerRecord.worth(), recordMetadata.offset());
}
});
return template;
}
}
RoutingKafkaTemplate
Suppose if we have now a number of producers with completely different configurations or subject names, then we will use RoutingKafkaTemplate
to pick out producers at runtime based mostly upon sure regex of the subject names at runtime.
@Configuration
public class KafkaProducerConfig {
...
@Bean
public RoutingKafkaTemplate routingTemplate(GenericApplicationContext context) {
// ProducerFactory with Bytes serializer
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
DefaultKafkaProducerFactory<Object, Object> bytesPF = new DefaultKafkaProducerFactory<>(props);
context.registerBean(DefaultKafkaProducerFactory.class, "bytesPF", bytesPF);
// ProducerFactory with String serializer
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
DefaultKafkaProducerFactory<Object, Object> stringPF = new DefaultKafkaProducerFactory<>(props);
Map<Sample, ProducerFactory<Object, Object>> map = new LinkedHashMap<>();
map.put(Sample.compile(".*-bytes"), bytesPF);
map.put(Sample.compile("stackabuse-.*"), stringPF);
return new RoutingKafkaTemplate(map);
}
}
In order we will see, RoutingKafkaTemplate
takes a HashMap of java.util.regex.Sample
and ProducerFactory
occasion after which redirects the messages to the ProducerFactory
matching a given subject title. We’ve created two patterns, .*-bytes
which is able to use ByteArraySerializer
and stackabuse-.*
which is able to use StringSerializer
respectively. In the event you discover above, we had created this subjects earlier.
Consuming Messages from Matter
As soon as we’re accomplished with publishing messages into Kafka subject, let’s shortly look into numerous methods to eat or obtain messages from a given subject.
Message Listener Containers
We are able to obtain messages by configuring a MessageListenerContainer
as a bean. Spring Boot supplies two MessageListenerContainer
implementations:
KafkaMessageListenerContainer
ConcurrentMessageListenerContainer
The KafkaMessageListenerContainer
receives all message from all subjects or partitions on a single thread whereas the ConcurrentMessageListenerContainer
delegates to a number of KafkaMessageListenerContainer
cases to offer multi-threaded consumption.
@KafkaListener at Technique Stage
In an effort to obtain messages asynchronously from Kafka, Spring Boot additionally supplies @KafkaListener
annotation which requires to configure a ConsumerFactory
and KafkaListenerContainerFactory
bean. Yet one more factor to notice is that, @EnableKafka
annotation can be required to be outlined on the configuration class to allow detection of @KafkaListener
annotation on Spring managed beans.
Let’s first outline KafkaConsumerConfig
:
@EnableKafka
@Configuration
public class KafkaConsumerConfig {
@Worth(worth = "${spring.kafka.shopper.bootstrap-servers}")
personal String bootstrapServer;
@Worth(worth = "${spring.kafka.shopper.group-id}")
personal String groupId;
@Worth(worth = "${spring.kafka.shopper.auto-offset-reset}")
personal String autoOffsetResetConfig;
@Bean
public ConsumerFactory<String, String> consumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer);
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetResetConfig);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
return new DefaultKafkaConsumerFactory<>(props);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> manufacturing facility =
new ConcurrentKafkaListenerContainerFactory<>();
manufacturing facility.setConsumerFactory(consumerFactory());
manufacturing facility.setBatchListener(true);
manufacturing facility.setConcurrency(3);
manufacturing facility.getContainerProperties().setPollTimeout(3000);
return manufacturing facility;
}
}
Then we’ll outline a Service layer to eat the messages and print it in console:
@Slf4j
@Service
public class ConsumeInventoryService {
@KafkaListener(id = "${spring.kafka.shopper.group-id}",
subjects = "${spring.kafka.shopper.subject}",
groupId = "${spring.kafka.shopper.group-id}",
autoStartup = "${spring.kafka.shopper.auto-start:true}")
public void listenMessage(String message,
@Header(title = KafkaHeaders.RECEIVED_MESSAGE_KEY, required = false)
Integer key,
@Header(KafkaHeaders.RECEIVED_TOPIC) String subject,
@Header(KafkaHeaders.GROUP_ID) String groupId,
@Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,
@Header(KafkaHeaders.RECEIVED_TIMESTAMP) lengthy ts) {
log.information("Obtained Message {} with key {} in subject {} as a part of group {} at partition {} at timestamp {}.",
message,
key,
subject,
groupId,
partition,
ts);
}
}
As you may see, we’re additionally printing the varied message headers of Kafka utilizing KafkaHeaders
object.
@KafkaListener at Class Stage
Within the earlier part, we have now used @KafkaListener
on the technique stage. However Spring Boot additionally permits to outline @KafkaListener
on the class stage. In an effort to try this, we should outline @KafkaHandler
on the technique stage. Often at any time when the message is acquired, the tactic to name is set by the transformed message payload kind. We are able to additionally designate a @KafkaHandler
technique because the default technique if there isn’t any match on different strategies.
@Slf4j
@Service
@KafkaListener(id = "multi-"+"${spring.kafka.shopper.group-id}",
subjects = "${spring.kafka.shopper.subject}",
groupId = "multi-"+"${spring.kafka.shopper.group-id}",
autoStartup = "${spring.kafka.shopper.@auto-start:true}")
public class ConsumeInventoryMultiListenerService {
@KafkaHandler
void hear(String message) {
log.information("KafkaHandler-String: {}", message);
}
@KafkaHandler(isDefault = true)
Object listenDefault(Object object) {
log.information("KafkaHandler-Default: {}", object);
return object;
}
}
Message Forwarder utilizing @SendTo annotation
Spring Boot additionally permits us to annotate a @KafkaListener
with a @SendTo
annotation to ahead the messages as it’s acquired to another subject. In an effort to execute this, we have to modify the configurations to simply accept a reply template on Kafka Listener manufacturing facility.
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory(
KafkaTemplate<String, String> kafkaTemplate) {
ConcurrentKafkaListenerContainerFactory<String, String> manufacturing facility =
new ConcurrentKafkaListenerContainerFactory<>();
manufacturing facility.setConsumerFactory(consumerFactory());
manufacturing facility.setReplyTemplate(kafkaTemplate);
manufacturing facility.setBatchListener(true);
manufacturing facility.setConcurrency(3);
manufacturing facility.getContainerProperties().setPollTimeout(3000);
return manufacturing facility;
}
And eventually we will add a @SendTo
annotation together with @KafkaListener
:
@KafkaListener(id = "${spring.kafka.shopper.group-id}",
subjects = "${spring.kafka.shopper.subject}",
groupId = "${spring.kafka.shopper.group-id}",
autoStartup = "${spring.kafka.shopper.auto-start:true}")
@SendTo("${spring.kafka.shopper.forward-to}")
public void listenMessage(String message,
@Header(title = KafkaHeaders.RECEIVED_MESSAGE_KEY, required = false)
Integer key,
@Header(KafkaHeaders.RECEIVED_TOPIC) String subject,
@Header(KafkaHeaders.GROUP_ID) String groupId,
@Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,
@Header(KafkaHeaders.RECEIVED_TIMESTAMP) lengthy ts) {
log.information("Obtained Message {} with key {} in subject {} as a part of group {} at partition {} at timestamp {}.",
message,
key,
subject,
groupId,
partition,
ts);
}
@KafkaListener(id = "multi-"+"${spring.kafka.shopper.group-id}",
subjects = "${spring.kafka.shopper.subject}",
groupId = "multi-"+"${spring.kafka.shopper.group-id}",
autoStartup = "${spring.kafka.shopper.@auto-start:true}")
public class ConsumeInventoryMultiListenerService {
@KafkaHandler
@SendTo("${spring.kafka.shopper.forward-to}")
void hear(String message) {
log.information("KafkaHandler-String: {}", message);
}
@KafkaHandler(isDefault = true)
@SendTo("${spring.kafka.shopper.forward-to}")
Object listenDefault(Object object) {
log.information("KafkaHandler-Default: {}", object);
return object;
}
}
Learn from a particular partition
If we have now subjects with a number of partitions, then @KafkaListener
can subscribe explicitly to a specific partition of subject with an preliminary offset:
@Slf4j
@Service
@KafkaListener(id = "multi-"+"${spring.kafka.shopper.group-id}",
subjects = "${spring.kafka.shopper.subject}",
groupId = "multi-"+"${spring.kafka.shopper.group-id}",
autoStartup = "${spring.kafka.shopper.@auto-start:true}",
topicPartitions = @TopicPartition(subject = "${spring.kafka.shopper.subject}", partitionOffsets = {
@PartitionOffset(partition = "0", initialOffset = "0"),
@PartitionOffset(partition = "4", initialOffset = "0")}))
public class ConsumeInventoryMultiListenerService {
...
}
Every time this explicit listener is initialized, all of the beforehand consumed messages from the partitions 0 and 4 might be re-consumed each time. The reason being that at each partition offset, the initialOffset
has been set to 0 within the listener.
Suppose, if there’s any use-case the place we don’t have to set the offset, then we will merely use the partitions property to set the partitions with out the offset.
@KafkaListener(id = "${spring.kafka.shopper.group-id}",
subjects = "${spring.kafka.shopper.subject}",
groupId = "${spring.kafka.shopper.group-id}",
autoStartup = "${spring.kafka.shopper.auto-start:true}",
topicPartitions = @TopicPartition(subject = "${spring.kafka.shopper.subject}",
partitions = { "0", "1", "3", "6", "8"}))
@SendTo("${spring.kafka.shopper.forward-to}")
public void listenMessage(String message,
@Header(title = KafkaHeaders.RECEIVED_MESSAGE_KEY, required = false)
Integer key,
@Header(KafkaHeaders.RECEIVED_TOPIC) String subject,
@Header(KafkaHeaders.GROUP_ID) String groupId,
@Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,
@Header(KafkaHeaders.RECEIVED_TIMESTAMP) lengthy ts) {
log.information("Obtained Message {} with key {} in subject {} as a part of group {} at partition {} at timestamp {}.",
message,
key,
subject,
groupId,
partition,
ts);
}
Message filters in Listeners
In case if we have to filter the incoming messages based mostly upon a regex or some content material, then we will add a customized filter to the listener that we have now outlined by setting a RecordFilterStrategy
.
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory(
KafkaTemplate<String, String> kafkaTemplate) {
ConcurrentKafkaListenerContainerFactory<String, String> manufacturing facility =
new ConcurrentKafkaListenerContainerFactory<>();
manufacturing facility.setConsumerFactory(consumerFactory());
manufacturing facility.setReplyTemplate(kafkaTemplate);
manufacturing facility.setBatchListener(true);
manufacturing facility.setConcurrency(3);
manufacturing facility.getContainerProperties().setPollTimeout(3000);
manufacturing facility.setRecordFilterStrategy(
file -> file.worth().getName().accommodates("Stock"));
return manufacturing facility;
}
Then we will move this container manufacturing facility to the @KafkaListener
annotation:
@KafkaListener(id = "${spring.kafka.shopper.group-id}",
subjects = "${spring.kafka.shopper.subject}",
groupId = "${spring.kafka.shopper.group-id}",
autoStartup = "${spring.kafka.shopper.auto-start:true}",
topicPartitions = @TopicPartition(subject = "${spring.kafka.shopper.subject}",
partitions = { "0", "1", "3", "6", "8"}),
containerFactory = "kafkaListenerContainerFactory")
@SendTo("${spring.kafka.shopper.forward-to}")
public void listenMessage(String message,
@Header(title = KafkaHeaders.RECEIVED_MESSAGE_KEY, required = false)
Integer key,
@Header(KafkaHeaders.RECEIVED_TOPIC) String subject,
@Header(KafkaHeaders.GROUP_ID) String groupId,
@Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,
@Header(KafkaHeaders.RECEIVED_TIMESTAMP) lengthy ts) {
log.information("Obtained Message {} with key {} in subject {} as a part of group {} at partition {} at timestamp {}.",
message,
key,
subject,
groupId,
partition,
ts);
}
Thus, all of the messages that match the matching filter might be discarded on this listener. On this approach, we will subscribe to the messages from the subject utilizing numerous means and additional course of the information as per our comfort.
Customized Serializers/Deserializers
Until now, we have now been sending or receiving String
messages. Nevertheless, Spring Boot additionally helps sending and receiving of customized objects. However that requires extra configuration of serializer in ProducerFactory
and desrializer in ConsumerFactory
.
Within the beginning of this chapter, we determined that we’ll be engaged on the Stock service for the e-commerce app. So let’s now outline the straightforward bean class for an merchandise to be saved within the stock:
@ToString
@Builder
@Knowledge
@AllArgsConstructor
@NoArgsConstructor
public class InventoryItem {
personal String id;
personal String title;
personal int rely;
personal Date listingDate;
}
Right here we’re utilizing the Lombok annotations to outline a Builder sample and initiating the constructors. We’ll use these builder in our upcoming implementation.
Ship Customized Messages
Let’s outline a JsonSerializer
to serialize the article and ship it as a JSON content material to the subject. In an effort to try this we have to modify our current ProducerFactory
and replace our KafkaTemplate
:
@Configuration
public class KafkaProducerConfig {
...
@Bean
public Map<String, Object> producerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
return props;
}
@Bean
public ProducerFactory<String, InventoryItem> producerFactory() {
return new DefaultKafkaProducerFactory<>(producerConfigs());
}
@Bean
public KafkaTemplate<String, InventoryItem> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
}
Now we will replace our Controller implementation to ship InventoryItem merchandise object to KafkaTemplate
:
@Autowired
personal KafkaTemplate<String, InventoryItem> kafkaTemplate;
@PostMapping(worth = "/publish")
@Transactional("kafkaTransactionManager")
public void sendMessage(@RequestParam("title") String title, @RequestParam("rely") int rely) {
InventoryItem merchandise = InventoryItem.builder()
.id(UUID.randomUUID().toString())
.title(title)
.rely(rely)
.listingDate(new Date())
.construct();
log.information(String.format("Message despatched to stock -> %s", merchandise));
kafkaTemplate.ship(subject, merchandise);
}
Obtain Customized Messages
In an effort to eat the identical objects, we have to replace our ConsumerFactory
and ConcurrentKafkaListenerContainerFactory
to simply accept the JSON objects and convert into Java POJOs:
@EnableKafka
@Configuration
public class KafkaConsumerConfig {
...
@Bean
public ConsumerFactory<String, InventoryItem> consumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer);
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetResetConfig);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
return new DefaultKafkaConsumerFactory<>(props,
new StringDeserializer(),
new JsonDeserializer<>(InventoryItem.class));
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, InventoryItem> kafkaListenerContainerFactory(
KafkaTemplate<String, InventoryItem> kafkaTemplate) {
ConcurrentKafkaListenerContainerFactory<String, InventoryItem> manufacturing facility = new ConcurrentKafkaListenerContainerFactory<>();
manufacturing facility.setConsumerFactory(consumerFactory());
manufacturing facility.setReplyTemplate(kafkaTemplate);
manufacturing facility.setBatchListener(true);
manufacturing facility.setConcurrency(3);
manufacturing facility.getContainerProperties().setPollTimeout(3000);
manufacturing facility.setRecordFilterStrategy(
file -> file.worth().getName().accommodates("Stock"));
return manufacturing facility;
}
}
Now we have to replace our service layer with KafkaListener
to eat the Stock merchandise.
@KafkaListener(id = "${spring.kafka.shopper.group-id}",
subjects = "${spring.kafka.shopper.subject}",
groupId = "${spring.kafka.shopper.group-id}",
autoStartup = "${spring.kafka.shopper.auto-start:true}",
topicPartitions = @TopicPartition(subject = "${spring.kafka.shopper.subject}",
partitions = { "0", "1", "3", "6", "8"}))
@SendTo("${spring.kafka.shopper.forward-to}")
public void listenMessage(@Payload @Legitimate InventoryItem merchandise,
@Header(title = KafkaHeaders.RECEIVED_MESSAGE_KEY, required = false)
Integer key,
@Header(KafkaHeaders.RECEIVED_TOPIC) String subject,
@Header(KafkaHeaders.GROUP_ID) String groupId,
@Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,
@Header(KafkaHeaders.RECEIVED_TIMESTAMP) lengthy ts) {
log.information("Obtained Message {} with key {} in subject {} as a part of group {} at partition {} at timestamp {}.",
merchandise,
key,
subject,
groupId,
partition,
ts);
}
We are able to annotate our objects with @Legitimate
to carry out any sort of validation to simply accept or reject the messages as properly.
Transaction Assist
Kafka helps transactional function the place it ensures exactly-once message supply between producer and shopper purposes via the Transactional API. The purposes that primarily exhibit the “consume-process-produce” sample want to make use of transactions to assist atomic operations. For instance, if we have now the next coding sample then we’d like to verify of the exactly-once message supply:
1. eat(message) {
2. course of(message);
3. template.ship("subject", message1);
4. saveInDb(message);
5. template.ship("subject", message2);
6. }
Let’s take a fast have a look at numerous prospects the place we will see exceptions:
- If there’s any sort of exception at line no. 2, then the message could be subsequent consumed when the patron restarts. This may be resolved if the messages is idempotant.
- Let’s say if an exception is thrown at line no. 3, then the message1 could be revealed when the patron restarts once more. Thus message1 is revealed twice.
- If an exception is thrown at line no. 4, then the message might be saved in database once more when the patron restarts. Thus the message is saved twice and the message1 can be revealed twice.
- Let’s say if there’s an exception after line no. 4, then message2 might be revealed once more when the patron restarts.
Now take into account if we have now transactions in place, then at any time when any exception happens, then all of the messages which were revealed and the database transactions that occurred inside a transaction might be rolled again. When the patron restarts, then all of the operations inside the transaction might be dedicated if all of the processes succeed.
Initially, Kafka use to assist solely at-most-once or at-least-once message supply. However with the introduction of transactions between Kafka brokers and shoppers, purposes began guaranteeing exactly-once message supply.
A Transaction-aware Producer
We are able to outline a typical Kafka producer to allow transactions. To start with we have to allow idempotence and specify a prefix for the transaction id:
spring:
kafka:
producer:
bootstrap-servers: localhost:9092
subject: product-inventory
enable-idempotence: true
transaction-id-prefix: tx-
Then we’ll replace our producer configuration to move these values as properties for Kafka Producer:
@Configuration
public class KafkaProducerConfig {
...
@Worth(worth = "${spring.kafka.producer.enable-idempotence}")
personal String enableIdempotence;
@Worth(worth = "${spring.kafka.producer.transaction-id-prefix}")
personal String transactionIdPrefix;
@Bean
public Map<String, Object> producerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer);
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, enableIdempotence);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
return props;
}
@Bean
public ProducerFactory<String, InventoryItem> producerFactory() {
DefaultKafkaProducerFactory<String, InventoryItem> defaultKafkaProducerFactory = new DefaultKafkaProducerFactory<>(producerConfigs());
defaultKafkaProducerFactory.setTransactionIdPrefix(transactionIdPrefix);
return defaultKafkaProducerFactory;
}
@Bean
public KafkaTransactionManager<String, InventoryItem> kafkaTransactionManager() {
return new KafkaTransactionManager<>(producerFactory());
}
...
}
As we have now enabled idempotence, Kafka will use this transaction id prefix as a part of its precise algorithm to probably deduplicate any sort of message that’s being despatched by the producer. This settings allow Kafka to note if the producer sends the identical message to the subject greater than as soon as. Now we have to simply ensure that the transaction id is sort of distinct for every producer whereas constant even when it restarts.
We are able to additionally discover that we have now outlined a Transaction Supervisor. We have to annotate the present technique that sends the message with @Transactional
and move this transaction supervisor bean. We must also use executeInTransaction()
technique as an alternative of ship()
to course of the transactions.
@PostMapping(worth = "/publish")
@Transactional("kafkaTransactionManager")
public void sendMessage(@RequestParam("title") String title, @RequestParam("rely") int rely) {
InventoryItem merchandise = InventoryItem.builder()
.id(UUID.randomUUID().toString())
.title(title)
.rely(rely)
.listingDate(new Date())
.construct();
log.information(String.format("Message despatched to stock -> %s", merchandise));
kafkaTemplate.executeInTransaction(t -> t.ship(subject, merchandise));
}
This is able to register the dealer with the dealer which might now use transactions. The dealer in flip would use these to carry out write-ahead transactions to a transaction log. Thus the dealer will take away any actions kind that log which belongs to a producer with the same transaction id however from an earlier epoch.
A Transaction-aware Client
Equally, we will ensure that the shoppers adhere to the transactions whereas consuming messages from the subject. Though we learn all of the messages from a subject partition in a specific order, we have now two choices to learn the transactional messages:
- read_committed: This enables us to not solely learn the messages that aren’t a part of the transaction, but additionally learn those after the transaction is dedicated.
- read_uncommitted: This enables us to learn all of the messages within the offset order with out ready for the general transactions to be dedicated.
We have to thus outline isolation.stage on the eat config to move on these property. The default worth of isolation.stage is read_uncommitted.
spring:
kafka:
shopper:
bootstrap-servers: localhost:9092
subject: product-inventory
forward-to: fulfillment-station
group-id: group-id
auto-start: true
auto-offset-reset: earliest
enable-auto-commit: false
isolation-level: read_committed
@EnableKafka
@Configuration
public class KafkaConsumerConfig {
...
@Worth(worth = "${spring.kafka.shopper.isolation-level}")
personal String isolationLevel;
@Worth(worth = "${spring.kafka.shopper.enable-auto-commit}")
personal String enableAutoCommit;
@Bean
public ConsumerFactory<String, InventoryItem> consumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer);
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetResetConfig);
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit);
props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, isolationLevel);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
return new DefaultKafkaConsumerFactory<>(props,
new StringDeserializer(),
new JsonDeserializer<>(InventoryItem.class));
}
...
}
Then we will additionally annotate our @KafkaListener
with @Transactional
annotation to verify it follows transactions.
@KafkaListener(id = "${spring.kafka.shopper.group-id}",
subjects = "${spring.kafka.shopper.subject}",
groupId = "${spring.kafka.shopper.group-id}",
autoStartup = "${spring.kafka.shopper.auto-start:true}",
topicPartitions = @TopicPartition(subject = "${spring.kafka.shopper.subject}",
partitions = { "0", "1", "3", "6", "8"}))
@SendTo("${spring.kafka.shopper.forward-to}")
@Transactional("kafkaTransactionManager")
public void listenMessage(@Payload InventoryItem merchandise,
@Header(title = KafkaHeaders.RECEIVED_MESSAGE_KEY, required = false)
Integer key,
@Header(KafkaHeaders.RECEIVED_TOPIC) String subject,
@Header(KafkaHeaders.GROUP_ID) String groupId,
@Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,
@Header(KafkaHeaders.RECEIVED_TIMESTAMP) lengthy ts) {
log.information("Obtained Message {} with key {} in subject {} as a part of group {} at partition {} at timestamp {}.",
merchandise,
key,
subject,
groupId,
partition,
ts);
}
Conclusion
On this chapter, we took a fairly deep-dive into the varied options supplied by Kafka assist for Spring. We’ve additionally constructed the Stock Service for the ecommerce-app that we began with to assist out Steve and Jane.
The general implementation for this and the earlier chapter may be present in GitHub.
Within the subsequent chapter, we’ll have a look into the Spring Cloud Stream and its numerous binders to assist Kafka.