Saturday, April 29, 2023
HomeProgrammingThe Producer API

The Producer API


As mentioned in Chapter 2, the Producer is an software that publishes data or messages to Kafka. With the intention to discover and check out this performance, we’re going to use the Kafka Producer API to publish some messages to the Kafka matters. We are going to primarily use Kafka Shopper API library which is thread-safe module. So let’s simply rapidly begin by implementing a primary Producer instance.

Earlier than we begin with the essential implementation, as promised in our first chapter, we’re going to assist Steve and Jane convert their monolithic software to a message-driven structure. So let’s choose a primary microservice that we are able to use to start out with.

Since, we shall be specializing in easy Writer module, we are going to begin with Notification Service, the place some software would notify a Person with a message for a given Order.

Easy Kafka Producer Implementation

Let’s create our easy maven based mostly Java software in certainly one of our favourite IDE. We are going to add the next libraries as a part of the preliminary POM:

	<dependencies>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka_2.13</artifactId>
            <model>3.1.0</model>
        </dependency>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <model>3.1.0</model>
        </dependency>
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-api</artifactId>
            <model>1.7.36</model>
        </dependency>
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-log4j12</artifactId>
            <model>1.7.36</model>
        </dependency>
        <dependency>
            <groupId>com.fasterxml.jackson.core</groupId>
            <artifactId>jackson-databind</artifactId>
            <model>2.13.1</model>
        </dependency>
        <dependency>
            <groupId>com.github.javafaker</groupId>
            <artifactId>javafaker</artifactId>
            <model>1.0.2</model>
        </dependency>
    </dependencies>

To provide a gist, we have now used the next jars:

  • kafka_2.13-3.1.0.jar
  • kafka-clients-3.1.0.jar
  • slf4j-api-1.7.36.jar
  • slf4j-log4j12-1.7.36.jar
  • jackson-databind-2.13.1.jar
  • javafaker-1.0.2.jar

The primary two libraries act as a Kafka consumer, slf4j is used for logging mechanism, jackson is used to cope with JSON encoding and decoding format and javafaker is used to generate some random faux information to mess around.

Let’s first add a properties file to construction the log messages which we shall be printing in our console.

log4j.rootLogger=DEBUG, stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.Goal=System.out
log4j.appender.stdout.format=org.apache.log4j.PatternLayout
log4j.appender.stdout.format.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{2}: %mpercentn

Then we are able to outline a relentless class to outline the variables that we are going to use repeatedly in a number of locations all through our code.

public class ApplicationConstants {
    public static closing String TOPIC = "notification";
    public static closing String BOOTSTRAP_SERVERS = "localhost:9092,localhost:9093,localhost:9094";
}

We have to create a subject within the Kafka cluster by executing the next command:

bin/kafka-topics.sh 
    --create 
    --zookeeper localhost:2181 
    --replication-factor 3 
    --partitions 3 
    --topic notification 
    --config min.insync.replicas=2

We’ve got set the replication-factor similar as partitions and the ISR worth lower than the partition. Now we are going to create the Notification entity object to serialize our messages.

public class Notification {

    personal closing String date;
    personal closing String orderId;
    personal closing String message;
    personal closing String userId;
    personal closing String title;
    personal closing String e-mail;

    public Notification(String date, String orderId, String message, String userId, String title, String e-mail) {
        this.date = date;
        this.orderId = orderId;
        this.message = message;
        this.userId = userId;
        this.title = title;
        this.e-mail = e-mail;
    }

    public String getDate() {
        return date;
    }

    public String getOrderId() {
        return orderId;
    }

    public String getMessage() {
        return message;
    }

    public String getUserId() {
        return userId;
    }

    public String getName() {
        return title;
    }

    public String getEmail() {
        return e-mail;
    }

    @Override
    public boolean equals(Object o)  getClass() != o.getClass()) return false;
        Notification that = (Notification) o;
        return Objects.equals(orderId, that.orderId)
                && Objects.equals(userId, that.userId)
                && Objects.equals(date, that.date)
                && Objects.equals(message, that.message)
                && Objects.equals(title, that.title)
                && Objects.equals(e-mail, that.e-mail);
    

    @Override
    public int hashCode() {
        return Objects.hash(date, orderId, message, userId, title, e-mail);
    }

    @Override
    public String toString() {
        return "Notification{" +
                "date=" + date +
                ", orderId=" + orderId +
                ", message="" + message + "'' +
                ", userId=" + userId +
                ", title="" + title + "'' +
                ", e-mail="" + e-mail + "'' +
                '}';
    }

    public String toJson() throws JsonProcessingException {
        ObjectMapper mapper = new ObjectMapper();
        return mapper.writeValueAsString(new Notification(date, orderId, message, userId, title, e-mail));
    }
}

Now we are going to create a easy Producer implementation to publish a notification to our matter.

public class SimpleKafkaProducerAPI {

    public static void fundamental(String[] args) throws JsonProcessingException {
        closing Logger logger = LoggerFactory.getLogger(SimpleKafkaProducerAPI.class);
        closing Properties props = new Properties();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, ApplicationConstants.BOOTSTRAP_SERVERS);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

        closing Producer<String, String> producer = new KafkaProducer<>(props);
        logger.data("=================================================================");
        logger.data("Beginning Kafka Producer Course of");
        producer.ship(
                new ProducerRecord<>(
                    ApplicationConstants.TOPIC,
                    "111-11111-1111",
                    new Notification(
                        new Date().toString(),
                        "111-11111-1111",
                        "Your order has been processed and shipped!!",
                        "222-2222-2222",
                        "John Doe",
                        "john.doe@instance.com"
                    ).toJson()
                )
        );
        logger.data("Flushing and shutting producer");
        logger.data("=================================================================");
        producer.flush();
        producer.shut();
    }
}

So we simply initiated a java.util.Properties class the place we overloaded the Bootstrap Server particulars and String Serializer for each key and worth to be parsed. Subsequent we transformed our Notification object to JSON String and added as a ProducerRecord. We additionally offered the subject particulars as a part of the identical ProducerRecord. Lastly we invoked the ship() methodology which acts as an asynchronous methodology to publish the producer report. Once we name this methodology, it provides the report to an output buffer and returns instantly. This output buffer is used to batch the data for environment friendly IO and compression of messages. Lastly, we are going to name flush() methodology to sendany excellent batches and shut() methodology to shut the connection. By default, closing the producer additionally flushes the message.

Superior Kafka Producer Configuration

We simply created a easy message writer which works with default settings. However we wish to create an enterprise grade microservice to help Steve and Jane’s microservice ecosystem. So let’s attempt to improve this code to additionally carry out automated batch message publishing, generate a report for inner Kafka metrics, introduce an interceptor and including a customized serializer.

So let’s divide the entire implementation primarily into six components:

  • Customized Serializer
  • Batching and Compression
  • Retries and Timeouts
  • Sturdiness of data
  • Customized Interceptor
  • Metrics Generator

Customized Serializer

The method of changing the objects into bytes is named serialization. Principally, this course of transforms the content material right into a readable and interpretable info. Kafka has by default designed some pre-built serializer based mostly upon varied information varieties:

  • StringSerializer
  • ShortSerializer
  • IntegerSerializer
  • LongSerializer
  • DoubleSerializer
  • BytesSerializer

Custom Serializer

It additionally offers functionality to implement customized serializer or deserializer by implementing the Serializer interface for serialization and Deserializer interface for deserialization. We have to override three strategies:

  • serialize/deserialize: it holds the precise implementation to transform the bytes to meant object and vice-versa for desrialization.
  • configure: it takes care of the configuration particulars if any.
  • shut: it’s used to shut the Kafka session.

Let’s implement our customized serializer utilizing the above interface and title it as NotificationSerializer:

public class NotificationSerializer implements Serializer<Notification> {

    personal static closing Logger logger = LoggerFactory.getLogger(NotificationSerializer.class);
    personal closing ObjectMapper objectMapper = new ObjectMapper();

    @Override
    public byte[] serialize(String matter, Notification information) {
        strive {
            if (Objects.isNull(information)){
                logger.error("Obtained null worth whereas serializing");
                return new byte[0];
            }
            return objectMapper.writeValueAsBytes(information);
        } catch (Exception e) {
            throw new SerializationException("Error when serializing Notification to byte[]");
        }
    }

    @Override
    public void configure(Map<String, ?> configs, boolean isKey) {
    }

    @Override
    public void shut() {
    }
}

Then we are able to outline a way the place we load the Kafka properties and use this serializer for parsing values a Kafka report.

personal static void setupBootstrapAndSerializers(Properties props) {
   props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, ApplicationConstants.BOOTSTRAP_SERVERS);
   props.put(ProducerConfig.CLIENT_ID_CONFIG, "NotificationKafkaProducer");
   props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

   //Customized Serializer - config "worth.serializer"
   props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, NotificationSerializer.class.getName());
}

Batching and Compression

Kafka shops the data briefly into output buffer earlier than publishing the messages to dealer in a batch. By default 32 MB of reminiscence is allotted to buffer the information to be despatched to the dealer. Kafka permits batching of data both by measurement or by time. Each these settings may be up to date to extend throughput and efficiency.

By default, Kafka allocates 16K bytes of knowledge as batch measurement. We will use bigger batch measurement to make the compression of knowledge extra environment friendly. Let’s say if a report is bigger than the batch measurement being set, then it is not going to be batched. The batches are configured per partition, therefore, smaller the batch measurement, lesser the throughput and efficiency. Thus, batch.measurement parameter is used for this performance.

Kafka additionally offers settings to outline a time interval for which the Producer would wait earlier than sending the data from the buffer if the dimensions of the batch doesn’t exceed. The property linger.ms is used for this objective. This permits the Producer so as to add a slight delay in order that extra data can pile up and bigger batches of data may be flushed out of the buffer.

By default, the Producer doesn’t compress the information. We will obtain end-to-end compression in order that total load may be taken out of the dealer. The compressed information may be initiated from the Producer, written to a subject and forwarded to a client by the dealer utilizing the identical compressed format. On the Producer finish, Kafka has compression.sort property which helps none, gzip, snappy or lz4. With the intention to obtain end-to-end compression, the Kafka dealer config, compression.sort is required to be set as producer.

Now combining all of this properties we are able to outline a way that may maintain batching and compression.

personal static void setupBatchingAndCompression(closing Properties props) {
    //Linger as much as 100 ms earlier than sending batch if measurement not met
    props.put(ProducerConfig.LINGER_MS_CONFIG, 500);

    //Batch as much as 64K buffer sizes.
    props.put(ProducerConfig.BATCH_SIZE_CONFIG,  16_384 * 4);

    //Use Snappy compression for batch compression.
    props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "snappy");
}

Retries and Timeouts

Kafka offers settings to retry sending data if the Producer doesn’t get an acknowledgement from the dealer. However we have to maintain few issues in thoughts whereas setting this parameter. The property retries is ready with default worth as 0. But when retry worth is ready higher than 0, then we additionally have to set max.in.flight.requests.per.connection worth to 1 in any other case there can be a chance {that a} retried message could possibly be delivered out of the order. Lastly, we must also word that in timeouts there can be retries, however we are able to set retry.backoff.ms property to attend after failure earlier than it retries to ship the report once more.

We will additionally outline request timeout by setting request.timeout.ms worth in order that the dealer would anticipate the affirmation from followers to fulfill Producer acknowledgement. So let’s outline a way to deal with retry and timeout properties for Kafka Producer.

personal static void setupRetriesInFlightTimeout(Properties props) {
    //Solely two in-flight messages per Kafka dealer connection
    // - max.in.flight.requests.per.connection (default 5)
    props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 1);

    //Set the variety of retries - retries
    props.put(ProducerConfig.RETRIES_CONFIG, 3);

    //Request timeout - request.timeout.ms
    props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 15_000);

    //Solely retry after one second.
    props.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, 1_000);
}

Sturdiness of data

Sturdiness is a tradeoff between throughput and consistency. With the intention to preserve the sturdiness of the data being produced in Kafka, it offers a setting by means of which we are able to configure the acknowledgement. This setting helps us to retrieve the write-acknowledgement from the partition chief earlier than the write request of the producer is deemed full.

Kafka offers a parameter, acks which controls the variety of partition replicas that will obtain the report earlier than the producer would contemplate the write to partition as profitable. This feature performs a big affect to outline how doubtless the messages can be misplaced. It offers three choices:

  • acks 0 (None): It means the Producer gained’t anticipate any acknowledgement for the Kafka dealer in any respect. Thus there gained’t be any assure of sturdiness of data because the data can be misplaced if the chief is down.
  • acks 1 (Chief): It signifies that the Kafka dealer would acknowledge if the partition chief printed the report to its native log however responds with out the partition followers confirming whether or not the write has been profitable or not. If the chief fails proper after sending the acknowledgement, then the report could possibly be misplaced because the followers won’t have replicated the report but.
  • acks -1 (ALL): It signifies that the leaders get write affirmation from all of the in-sync replicas earlier than sending an acknowledgement again to the producer. This feature ensures {that a} report just isn’t misplaced so long as atleast one in-sync duplicate is alive. Mostly, we would like to make use of acks as all and min.insync.replicas as higher than 1.

Customized Interceptor

Kafka Producer offers ProducerInterceptor interface which intercepts the data that’s being despatched by the producer to the dealer. It additionally intercepts after acknowledgement is shipped by the dealer. We will mutate the data with an interceptor. ProducerInterceptor class offers onSend() and onAcknowledgement() whcih may be overridden so as to add our personal logic. So let’s outline a NotificationProducerInterceptor class:

public class NotificationProducerInterceptor implements ProducerInterceptor {

    personal closing Logger logger = LoggerFactory.getLogger(NotificationProducerInterceptor.class);
    personal int onSendCount;
    personal int onAckCount;

    @Override
    public ProducerRecord onSend(closing ProducerRecord report) {
        onSendCount++;
        if (logger.isDebugEnabled()) {
            logger.debug(String.format("onSend matter=%s key=%s worth=%s %d %n",
                    report.matter(), report.key(), report.worth().toString(),
                    report.partition()
            ));
        } else {
            if (onSendCount % 100 == 0) {
                logger.data(String.format("onSend matter=%s key=%s worth=%s %d %n",
                        report.matter(), report.key(), report.worth().toString(),
                        report.partition()
                ));
            }
        }
        return report;
    }

    @Override
    public void onAcknowledgement(closing RecordMetadata metadata,
                                  closing Exception exception) {
        onAckCount++;
        if (logger.isDebugEnabled()) {
            logger.debug(String.format("onAck matter=%s, half=%d, offset=%d %n",
                    metadata.matter(), metadata.partition(), metadata.offset()
            ));
        } else {
            if (onAckCount % 100 == 0) {
                logger.data(String.format("onAck matter=%s, half=%d, offset=%d %n",
                        metadata.matter(), metadata.partition(), metadata.offset()
                ));
            }
        }
    }

    @Override
    public void shut() {
    }

    @Override
    public void configure(Map<String, ?> configs) {
    }
}

Lastly, we are able to use this interceptor and move it to Kafka properties. We can even add the above acks config as a part of this methodology.

personal static Producer<String, Notification> createProducer() {
    closing Properties props = new Properties();
    setupBootstrapAndSerializers(props);
    setupBatchingAndCompression(props);
    setupRetriesInFlightTimeout(props);

    //Set variety of acknowledgments - acks - default is all
    props.put(ProducerConfig.ACKS_CONFIG, "all");

    //Set up interceptor listing - config "interceptor.courses"
    props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, NotificationProducerInterceptor.class.getName());

    return new KafkaProducer<>(props);
 }

Metrics Generator

Kafka offers metrics() methodology that returns a map of metrics as Map<MetricName, ? extends Metric>. This returns an entire set of producer metrics. The MetricName consists of title, group, description, and tags. A Metric include a Metric title and a worth. So we are going to create an object to intercept the metric title and metric which we are able to name as MetricPair:

public class MetricPair {
    personal closing MetricName metricName;
    personal closing Metric metric;

    public MetricPair(MetricName metricName, Metric metric) {
        this.metricName = metricName;
        this.metric = metric;
    }

    public MetricName getMetricName() {
        return metricName;
    }

    public Metric getMetric() {
        return metric;
    }

    public String toString() {
        return metricName.group() + "." + metricName.title();
    }

    public static closing Map<String, MetricPair> metricsDisplayer(Set<String> metricsNameFilter,
                                                          Map<MetricName, ? extends Metric> metrics) {
        return metrics
                .entrySet()
                .stream()
                //Filter out metrics not in metricsNameFilter
                .filter(metricNameEntry -> metricsNameFilter.comprises(metricNameEntry.getKey().title()))
                //Flip Map<MetricName,Metric> into TreeMap<String, MetricPair>
                .map(entry -> new MetricPair(entry.getKey(), entry.getValue()))
                .acquire(Collectors.toMap(MetricPair::toString, it -> it, (a, b) -> a, TreeMap::new));
    }
}

Lastly, we are going to outline a MetricsGenerator class that will periodically print the output metrics for a producer.

public class MetricsGenerator implements Runnable {
    personal closing Producer<String, Notification> producer;
    personal closing Logger logger = LoggerFactory.getLogger(MetricsGenerator.class);

    // Filter solely the metrics that we require
    personal closing Set<String> metricsNameFilter = new HashSet<>(Arrays.asList(
            "record-queue-time-avg", "record-send-rate", "records-per-request-avg",
            "request-size-max", "network-io-rate", "record-queue-time-avg",
            "incoming-byte-rate", "batch-size-avg", "response-rate", "requests-in-flight"
    ));

    public MetricsGenerator(closing Producer<String, Notification> producer) {
        this.producer = producer;
    }

    @Override
    public void run() {
        whereas (true) {
            closing Map<MetricName, ? extends Metric> metrics = producer.metrics();
            displayAllMetrics(metrics);
            strive {
                Thread.sleep(3_000);
            } catch (InterruptedException e) {
                logger.warn("metrics interrupted");
                Thread.interrupted();
                break;
            }
        }
    }

    personal void displayAllMetrics(Map<MetricName, ? extends Metric> metrics) {
        closing Map<String, MetricPair> metricsDisplayMap = MetricPair.metricsDisplayer(metricsNameFilter, metrics);

        //Output metrics
        closing StringBuilder builder = new StringBuilder(255);
        builder.append("n---------------------------------------n");
        metricsDisplayMap.forEach((title, metricPair) -> builder
                .append(String.format(Locale.US, "%50spercent25sttpercentsttpercentspercentn",
                    title,
                    metricPair.getMetricName().title(),
                    metricPair.getMetric().metricValue(),
                    metricPair.getMetricName().description()
        )));
        builder.append("n---------------------------------------n");
        logger.data(builder.toString());
    }
}

Now let’s implement the ultimate items to combine all of this and full our microservice. As we mentioned earlier, we want to automate and ship steady data to Producer. We are going to attempt to generate some random faux data and publish in Kafka with some delay. So we are going to begin with NotificationSender class:

public class NotificationSender implements Runnable {

    personal closing Logger logger = LoggerFactory.getLogger(NotificationSender.class);
    personal closing Producer<String, Notification> producer;
    personal closing int delayMinMs;
    personal closing int delayMaxMs;
    personal closing String matter;

    public NotificationSender(closing String matter,
                       closing Producer<String, Notification> producer,
                       closing int delayMinMs,
                       closing int delayMaxMs) {
        this.producer = producer;
        this.delayMinMs = delayMinMs;
        this.delayMaxMs = delayMaxMs;
        this.matter = matter;
    }


    public void run() {
        closing Random random = new Random(System.currentTimeMillis());
        int sentCount = 0;

        whereas (true) {
            sentCount++;

            strive {
                closing ProducerRecord<String, Notification> report = createRandomRecord();
                closing int delay = randomIntBetween(random, delayMaxMs, delayMinMs);
                logger.data("Sending report: {}", report);
                closing Future<RecordMetadata> future = producer.ship(report);
                if (sentCount % 100 == 0) {
                    displayRecordMetaData(report, future);
                }
                Thread.sleep(delay);
            } catch (InterruptedException e) {
                if (Thread.interrupted()) {
                    break;
                }
            } catch (ExecutionException e) {
                logger.error("downside sending report to producer", e);
            } catch (ParseException | JsonProcessingException e) {
                logger.error("downside whereas parsing the report", e);
            }
        }
    }

    personal void displayRecordMetaData(closing ProducerRecord<String, Notification> report,
                                       closing Future<RecordMetadata> future)
            throws InterruptedException, ExecutionException {
        closing RecordMetadata recordMetadata = future.get();
        logger.data(String.format("ntttkey=%s, worth=%s " +
                        "ntttsent to matter=%s half=%d off=%d at time=%s",
                report.key(),
                report.worth(),
                recordMetadata.matter(),
                recordMetadata.partition(),
                recordMetadata.offset(),
                new Date(recordMetadata.timestamp())
        ));
    }

    personal closing int randomIntBetween(closing Random random,
                                       closing int max,
                                       closing int min) {
        return random.nextInt(max - min + 1) + min;
    }

    personal ProducerRecord<String, Notification> createRandomRecord()
            throws ParseException, JsonProcessingException {
        Faker faker = new Faker();
        String orderId = faker.idNumber().legitimate();
        closing Notification notification = new Notification(
                faker.date().previous(5, TimeUnit.DAYS).toString(),
                orderId,
                "Your order has been processed and shipped!!",
                faker.idNumber().legitimate(),
                faker.gameOfThrones().character(),
                faker.web().safeEmailAddress()
        );
        return new ProducerRecord<>(matter, orderId, notification);
    }
}

Lastly let’s outline a way to batch the data in a listing and use it later.

personal static Listing<NotificationSender> getNotificationSenderList(
    closing Producer<String, Notification> producer) {
    return Collections.singletonList(new NotificationSender(
            ApplicationConstants.TOPIC,
            producer,
            1,
            10
    ));
}

Lastly, we are going to outline our fundamental methodology which might run a thread to execute publishing these data in an interval.

public static void fundamental(String[] args) {
        logger.data("========================================================================================");
        logger.data("Beginning Kafka Producer Course of");
    
        //Create Kafka Producer
        closing Producer<String, Notification> producer = createProducer();
    
        //Create NotificationSender listing
        closing Listing<NotificationSender> notificationSenders = getNotificationSenderList(producer);

        //Create a thread pool so each notification sender will get it personal.
        // Improve by 1 to suit metrics.
        closing ExecutorService executorService =
                Executors.newFixedThreadPool(notificationSenders.measurement() + 1);

        //Run Metrics Generator which is runnable passing to the producer.
        executorService.submit(new MetricsGenerator(producer));

        //Run every notification sender in its personal thread.
        notificationSenders.forEach(executorService::submit);

        //Register good shutdown of thread pool, then flush and shut producer.
        Runtime.getRuntime().addShutdownHook(new Thread(() -> {
            executorService.shutdown();
            strive {
                executorService.awaitTermination(200, TimeUnit.MILLISECONDS);
            } catch (InterruptedException e) {
                logger.warn("shutting down", e);
            } lastly {
                logger.data("Flushing and shutting producer");
                logger.data("========================================================================================");
                producer.flush();
                producer.shut();
            }
        }));
    }

This completes our Notification microservice implementation. Now if we run the applying then we are able to see the logs being printed within the console. Few generic logs appears to be like like beneath:

Notification Sender and Interceptor:

22/02/23 21:43:38 INFO producer.NotificationSender: Sending report: ProducerRecord(matter=notification, partition=null, headers=RecordHeaders(headers = [], isReadOnly = false), key=802-07-5549, worth=Notification{date=Mon Feb 21 05:50:31 IST 2022, orderId=802-07-5549, message="Your order has been processed and shipped!!", userId=080-28-2988, title="Byan Votyris", e-mail="nguyet.keeling@instance.com"}, timestamp=null)
22/02/23 21:43:38 DEBUG producer.NotificationProducerInterceptor: onSend matter=notification key=802-07-5549 worth=Notification{date=Mon Feb 21 05:50:31 IST 2022, orderId=802-07-5549, message="Your order has been processed and shipped!!", userId=080-28-2988, title="Byan Votyris", e-mail="nguyet.keeling@instance.com"} null 

22/02/23 21:43:39 INFO producer.NotificationSender: Sending report: ProducerRecord(matter=notification, partition=null, headers=RecordHeaders(headers = [], isReadOnly = false), key=152-38-1697, worth=Notification{date=Solar Feb 20 06:47:49 IST 2022, orderId=152-38-1697, message="Your order has been processed and shipped!!", userId=434-86-1947, title="Bryan of Oldtown", e-mail="jeramy.bayer@instance.com"}, timestamp=null)
22/02/23 21:43:39 DEBUG producer.NotificationProducerInterceptor: onSend matter=notification key=152-38-1697 worth=Notification{date=Solar Feb 20 06:47:49 IST 2022, orderId=152-38-1697, message="Your order has been processed and shipped!!", userId=434-86-1947, title="Bryan of Oldtown", e-mail="jeramy.bayer@instance.com"} null

22/02/23 21:43:40 DEBUG producer.NotificationProducerInterceptor: onAck matter=notification, half=0, offset=31869 

22/02/23 21:43:40 DEBUG producer.NotificationProducerInterceptor: onAck matter=notification, half=0, offset=31870

Metrics Generator:

22/02/23 21:43:40 INFO producer.MetricsGenerator: 
---------------------------------------
                   producer-metrics.batch-size-avg           batch-size-avg		542.0		The common variety of bytes despatched per partition per-request.
               producer-metrics.incoming-byte-rate       incoming-byte-rate		27.641464969918765		The variety of bytes learn off all sockets per second
                  producer-metrics.network-io-rate          network-io-rate		0.24952866807142066		The variety of community operations (reads or writes) on all connections per second
            producer-metrics.record-queue-time-avg    record-queue-time-avg		1674.5		The common time in ms report batches spent within the ship buffer.
                 producer-metrics.record-send-rate         record-send-rate		0.1940778529444383		The common variety of data despatched per second.
          producer-metrics.records-per-request-avg  records-per-request-avg		3.5		The common variety of data per request.
                 producer-metrics.request-size-max         request-size-max		678.0		The utmost measurement of any request despatched.
               producer-metrics.requests-in-flight       requests-in-flight		1.0		The present variety of in-flight requests awaiting a response.
                    producer-metrics.response-rate            response-rate		0.11088011088011088		The variety of responses acquired per second
          producer-node-metrics.incoming-byte-rate       incoming-byte-rate		14.497172635547178		The variety of incoming bytes per second
            producer-node-metrics.request-size-max         request-size-max		65.0		The utmost measurement of any request despatched.
               producer-node-metrics.response-rate            response-rate		0.05544005544005544		The variety of responses acquired per second
           producer-topic-metrics.record-send-rate         record-send-rate		0.2279833246482543		The common variety of data despatched per second for a subject.

---------------------------------------

This completes our total Producer API implementation. We are going to have a look into the Shopper API within the subsequent chapter. All of the code implementation may be present in GitHub.

RELATED ARTICLES

LEAVE A REPLY

Please enter your comment!
Please enter your name here

- Advertisment -
Google search engine

Most Popular

Recent Comments