Sunday, April 30, 2023
HomeProgrammingThe Streams API - Writing the Processor and the KStreams API

The Streams API – Writing the Processor and the KStreams API


After trying into the essential Producer and Shopper APIs, allow us to have a look into the Kafka Streams API. As there have been explosion of increasingly more large knowledge frameworks and knowledge streaming applied sciences, client-side structure and applied sciences required numerous energy to course of large clusters of information every single day. Thus the power to course of giant portions of information in bulk was launched. This idea of bulk processing was not sufficient for the business. They have been in fixed want for real-time occasion processing greater than the standard batch processing. Thus, a brand new technique was launched named as micro batching. Micro batching is identical as precise batch processing nevertheless it handles smaller portions of information. By decreasing the dimensions of the information, it often produces outcomes rapidly at sooner intervals however doesn’t offer you real-time per-event processing energy.

Thus everybody began on the lookout for stream processing in order that the information could be processed as and when it arrives into the system. There have been many streaming libraries launched available in the market akin to Spark Streaming, Nifi, Flink, Storm, Samza, and many others. Most of them have been both micro-batching course of or per-event processing framework. Thus, Kafka launched Kafka Streams library, which was the one streaming library on this planet of streaming that processes knowledge precisely as soon as. It’s a true streaming framework because it processes one file at a time slightly than micro-batches.

Few of the traits of Kafka Streams are:

  • It’s fairly light-weight and simple. It may be embedded in any sort of Java utility and could be built-in into any present instruments or frameworks.
  • We simply want so as to add the exterior dependency of kafka-streams and nothing else. No different dependency is required.
  • It offers fault-tolerant native state shops to carry out stateful operations like windowed joins and aggregations.
  • Every file can be processed precisely as soon as even when there may be some sort of failure.
  • Its means to course of one file at a time helps in reaching millisecond processing latency.
  • It additionally helps event-time-based windowing operations even with the information which have arrived out-of-order.

Core Ideas of Kafka Stream

  • Processor Topology – The entire Kafka streams utility is represented within the type of processor topology which is taken into account merely a graph of stream processors which can be related with streams. The general computational logic is represented within the type of topology.
  • Stream – A sequential, fully-ordered, immutable knowledge information that may be replayed in case of failures is called Stream. That is equal to knowledge file in Kafka which is outlined as a key-value pair.
  • Stream Processor – This can be a node within the processor topology which receives one file at a time in its supply within the topology, transforms the information and subsequently produce a number of output information to its sinks.
  • Supply Processor – This can be a particular sort of stream processors which consumes knowledge immediately out of the Kafka matters and ahead it to downstream processors. It isn’t accountable for any sort of transformations. It simply consumes knowledge from the subject and ship it to different processors for transformation.
  • Sink Processors – This processor receives enriched or remodeled knowledge from upstream processor and ship it to Kafka matters.

How one can write Topology?

Topology is written in Kafka streams in two methods:

  • Low-level Processor API – This offers low-level APIs for data-processing and native state storage.
  • Excessive-level DSL – This offers high-level APIs generally known as KStreams DSL to carry out frequent transformation operations like filter, map, be a part of, and many others. A KStream is a file stream the place every every key-value pair is an impartial file.

Earlier than we begin with the essential implementation of every Kafka Streams API, we’re going to once more look into Steve and Jane’s e-commerce app and decide a fundamental microservice that we will use to start out with.

Microservice E-commerce Event-Driven app

Since, we shall be specializing in easy streaming module, let’s decide the Fee Service the place we’ll decide the transaction information and course of it to counterpoint and remodel the information. First, we’ll eat the continual transactions, masks the essential card particulars and extract a transaction sample. We may even extract the shopper and their frequent buy data in order that it may be additional utilized for any sort of Rewards program.

DAG of Payment Service

Kafka Streams – The Processor API implementation

The Processor API was primarily aimed to introduce a consumer which may course of knowledge consumed from Kafka and write the outcomes once more again to Kafka. So with a purpose to get conversant in the APIs, lets begin by defining the steps to construct a processing graph to deal with the incoming transactions into our system.

  • First we have to outline a supply from which we will eat our information. We are able to create a “transaction” subject and eat our preliminary information.
  • Subsequent we will outline serializers and deserializers to transform into desired JSON Object to supply into downstream matters.
  • Then we will outline the generic objects and the util strategies that we are going to be utilizing all through our implementation.
  • Subsequent we’ll outline numerous processors to intercept the information and carry out transformations into the specified objects.
  • Lastly, we have to outline a directed acyclic graph to course of the information and kind a topology.

Initializing the Implementation

So let’s rapidly create a subject in Kafka named “transaction” and outline few faux strategies to supply some faux knowledge into this subject.

bin/kafka-topics.sh 
    --create 
    --zookeeper localhost:2181 
    --replication-factor 3 
    --partitions 3 
    --topic transaction 
    --config min.insync.replicas=2

Let’s begin our fundamental implementation by including a Maven venture and including the next dependencies:

        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka_2.13</artifactId>
            <model>3.1.0</model>
        </dependency>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <model>3.1.0</model>
        </dependency>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-streams</artifactId>
            <model>3.1.0</model>
        </dependency>
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-api</artifactId>
            <model>1.7.36</model>
        </dependency>
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-log4j12</artifactId>
            <model>1.7.36</model>
        </dependency>
        <dependency>
            <groupId>com.google.code.gson</groupId>
            <artifactId>gson</artifactId>
            <model>2.9.0</model>
        </dependency>
        <dependency>
            <groupId>com.github.javafaker</groupId>
            <artifactId>javafaker</artifactId>
            <model>1.0.2</model>
        </dependency>

We’re nearly utilizing the identical dependencies that now we have utilized in Producer or Shopper implementation. We are going to moreover use kafka-streams library. Now we will outline an utility fixed to make use of these variables in our additional implementation.

public class ApplicationConstants {
    public static closing String SOURCE_TOPIC = "transaction";
    public static closing String SINK1_TOPIC = "patterns";
    public static closing String SINK2_TOPIC = "rewards";
    public static closing String SINK3_TOPIC = "transactions";
    public static closing String STORE_NAME = "retailer";
    public static closing String BOOTSTRAP_SERVERS = "localhost:9092";
}

Producing Pretend Knowledge into Subject

Subsequent, we will outline a faux knowledge producer implementation to supply some knowledge into our transaction subject.

public class FakeDataProducer {

    personal static closing Logger logger = LoggerFactory.getLogger(FakeDataProducer.class);

    personal static Producer<String, String> producer;
    personal static closing Gson gson = new GsonBuilder().disableHtmlEscaping().create();
    personal static ExecutorService executorService = Executors.newFixedThreadPool(1);
    personal static Callback callback;
    personal static closing String TRANSACTIONS_TOPIC = ApplicationConstants.SOURCE_TOPIC;
    personal static risky boolean keepRunning = true;
    personal static closing Provider<Date> timestampGenerator = () -> new Faker().date().previous(15, TimeUnit.MINUTES, new Date());

    public static Checklist<Transaction> generateTransactions(int quantity, int numberCustomers) {
        Checklist<Transaction> transactions = new ArrayList<>();
        Faker faker = new Faker();
        Checklist<Buyer> prospects = generateCustomers(numberCustomers);

        Random random = new Random();
        for (int i = 0; i < quantity; i++) {
            String itemPurchased = faker.commerce().productName();
            int amount = faker.quantity().numberBetween(1, 5);
            double worth = Double.parseDouble(faker.commerce().worth(4.00, 295.00));
            Date purchaseDate = timestampGenerator.get();

            Buyer buyer = prospects.get(random.nextInt(numberCustomers));
            String zipCode = faker.choices().possibility("471975", "976663", "113469", "334457");

            Transaction transaction = Transaction
                    .builder()
                    .cardNumber(buyer.getCardNumber())
                    .customerId(buyer.getCustomerId())
                    .firstName(buyer.getFirstName())
                    .lastName(buyer.getLastName())
                    .itemPurchased(itemPurchased)
                    .amount(amount)
                    .worth(worth)
                    .purchaseDate(purchaseDate)
                    .zipCode(zipCode)
                    .construct();
            transactions.add(transaction);
        }
        return transactions;
    }

    public static Checklist<Buyer> generateCustomers(int numberCustomers) {
        Checklist<Buyer> prospects = new ArrayList<>(numberCustomers);
        Faker faker = new Faker();
        Checklist<String> playing cards = generateCardNumbers(numberCustomers);
        for (int i = 0; i < numberCustomers; i++) {
            Identify identify = faker.identify();
            prospects.add(new Buyer(identify.firstName(), identify.lastName(), faker.idNumber().legitimate(), playing cards.get(i)));
        }
        return prospects;
    }

    personal static Checklist<String> generateCardNumbers(int numberCards) {
        int counter = 0;
        Sample visaMasterCardAmex = Sample.compile("(d{4}-){3}d{4}");
        Checklist<String> cardNumbers = new ArrayList<>(numberCards);
        Finance finance = new Faker().finance();
        whereas (counter < numberCards) {
            String cardNumber = finance.creditCard();
            if (visaMasterCardAmex.matcher(cardNumber).matches()) {
                cardNumbers.add(cardNumber);
                counter++;
            }
        }
        return cardNumbers;
    }

    public static void produceTransactionsData() {
        produceTransactionData(
                100,
                10,
                100);
    }

    public static void produceTransactionData(int numberPurchases, int numberIterations, int numberCustomers) {
        Runnable generateTask = () -> {
            init();
            int counter = 0;
            whereas (counter++ < numberIterations  && keepRunning) {
                Checklist<Transaction> transactions = generateTransactions(numberPurchases, numberCustomers);
                Checklist<String> jsonValues = convertToJson(transactions);
                for (String worth : jsonValues) {
                    Gson gson = new Gson();
                    String key = gson.fromJson(worth, Transaction.class).getCustomerId();
                    ProducerRecord<String, String> file = new ProducerRecord<>(TRANSACTIONS_TOPIC, key, worth);
                    producer.ship(file, callback);
                }
                logger.data("Document batch despatched");
                attempt {
                    Thread.sleep(6000);
                } catch (InterruptedException e) {
                    Thread.interrupted();
                }
            }
            logger.data("Carried out producing transaction knowledge");

        };
        executorService.submit(generateTask);
    }

    public static void shutdown() {
        logger.data("Shutting down knowledge era");
        keepRunning = false;
        if (Objects.nonNull(executorService)) {
            executorService.shutdownNow();
            executorService = null;
        }
        if (Objects.nonNull(producer)) {
            producer.shut();
            producer = null;
        }

    }

    personal static void init() {
        if (Objects.isNull(producer)) {
            logger.data("Initializing the producer");
            Properties properties = new Properties();
            properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, ApplicationConstants.BOOTSTRAP_SERVERS);
            properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
            properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
            properties.put(ProducerConfig.ACKS_CONFIG, "1");
            properties.put(ProducerConfig.RETRIES_CONFIG, "3");

            producer = new KafkaProducer<>(properties);

            callback = (metadata, exception) -> {
                if (Objects.nonNull(exception)) {
                    exception.printStackTrace();
                }
            };
            logger.data("Producer initialized");
        }
    }


    personal static <T> Checklist<String> convertToJson(Checklist<T> generatedDataItems) {
        Checklist<String> jsonList = new ArrayList<>();
        for (T generatedData : generatedDataItems) {
            jsonList.add(convertToJson(generatedData));
        }
        return jsonList;
    }

    personal static <T> String convertToJson(T generatedDataItem) {
        return gson.toJson(generatedDataItem);
    }
}

Defining Objects

As you possibly can see, now we have referred the Buyer and Transaction object, so let’s outline these objects as effectively.

public class Buyer {
    personal closing String firstName;
    personal closing String lastName;
    personal closing String customerId;
    personal closing String cardNumber;

    public Buyer(String firstName, String lastName, String customerId, String cardNumber) {
        this.firstName = firstName;
        this.lastName = lastName;
        this.customerId = customerId;
        this.cardNumber = cardNumber;
    }

    public String getFirstName() {
        return firstName;
    }

    public String getLastName() {
        return lastName;
    }

    public String getCustomerId() {
        return customerId;
    }

    public String getCardNumber() {
        return cardNumber;
    }
}
public class Transaction {

    personal closing String firstName;
    personal closing String lastName;
    personal closing String cardNumber;
    personal closing String customerId;
    personal closing String itemPurchased;
    int amount;
    double worth;
    personal closing Date purchaseDate;
    personal closing String zipCode;

    personal Transaction(Builder builder) {
        firstName = builder.firstName;
        lastName = builder.lastName;
        cardNumber = builder.cardNumber;
        customerId = builder.customerId;
        itemPurchased = builder.itemPurchased;
        amount = builder.amount;
        worth = builder.worth;
        purchaseDate = builder.purchaseDate;
        zipCode = builder.zipCode;
    }

    public static Builder builder() {
        return new Builder();
    }

    public static Builder builder(Transaction copy) {
        Builder builder = new Builder();
        builder.firstName = copy.firstName;
        builder.lastName = copy.lastName;
        builder.cardNumber = copy.cardNumber;
        builder.customerId = copy.customerId;
        builder.itemPurchased = copy.itemPurchased;
        builder.amount = copy.amount;
        builder.worth = copy.worth;
        builder.purchaseDate = copy.purchaseDate;
        builder.zipCode = copy.zipCode;
        return builder;
    }

    public String getFirstName() {
        return firstName;
    }

    public String getLastName() {
        return lastName;
    }

    public String getCardNumber() {
        return cardNumber;
    }

    public String getCustomerId() { return customerId; }

    public String getItemPurchased() {
        return itemPurchased;
    }

    public int getQuantity() {
        return amount;
    }

    public double getPrice() {
        return worth;
    }

    public Date getPurchaseDate() {
        return purchaseDate;
    }

    public String getZipCode() {
        return zipCode;
    }

    @Override
    public String toString() {
        return "Buy{" +
                "firstName="" + firstName + "'' +
                ", lastName="" + lastName + "'' +
                ", cardNumber="" + cardNumber + "'' +
                ", customerId='" + customerId + ''' +
                ", itemPurchased='" + itemPurchased + ''' +
                ", amount=" + amount +
                ", worth=" + worth +
                ", purchaseDate=" + purchaseDate +
                ", zipCode="" + zipCode + "'' +
                '}';
    }

    public static closing class Builder {
        personal String firstName;
        personal String lastName;
        personal String cardNumber;
        personal String customerId;
        personal String itemPurchased;
        personal int amount;
        personal double worth;
        personal Date purchaseDate;
        personal String zipCode;

        personal static closing String CC_NUMBER_REPLACEMENT="xxxx-xxxx-xxxx-";

        personal Builder() {
        }

        public Builder firstName(String val) {
            firstName = val;
            return this;
        }

        public Builder lastName(String val) {
            lastName = val;
            return this;
        }

        public Builder maskCard(){
            Objects.requireNonNull(this.cardNumber, "Card cannot have null worth");
            String last4Digits = this.cardNumber.break up("-")[3];
            this.cardNumber = CC_NUMBER_REPLACEMENT+last4Digits;
            return this;
        }

        public Builder cardNumber(String val) {
            cardNumber = val;
            return this;
        }

        public Builder customerId(String val) {
            customerId = val;
            return this;
        }

        public Builder itemPurchased(String val) {
            itemPurchased = val;
            return this;
        }

        public Builder amount(int val) {
            amount = val;
            return this;
        }

        public Builder worth(double val) {
            worth = val;
            return this;
        }

        public Builder purchaseDate(Date val) {
            purchaseDate = val;
            return this;
        }

        public Builder zipCode(String val) {
            zipCode = val;
            return this;
        }

        public Transaction construct() {
            return new Transaction(this);
        }
    }
}

As you possibly can see we outlined our utility strategies to masks the cardboard particulars as effectively. The Transaction object is outlined as a Builder sample for our ease of initialization in a number of locations. We are able to additionally outline the opposite objects like TransactionPattern and RewardAccumulator for our downstream processing of the transaction knowledge.

public class TransactionPattern {

    personal closing String zipCode;
    personal closing String merchandise;
    personal closing Date date;

    personal TransactionPattern(Builder builder) {
        zipCode = builder.zipCode;
        merchandise = builder.merchandise;
        date = builder.date;
    }

    public static Builder newBuilder() {
        return new Builder();
    }

    public static Builder builder(Transaction transaction){
        return new Builder(transaction);

    }
    public String getZipCode() {
        return zipCode;
    }

    public String getItem() {
        return merchandise;
    }

    public Date getDate() {
        return date;
    }


    @Override
    public String toString() {
        return "TransactionPattern{" +
                "zipCode="" + zipCode + "'' +
                ", merchandise='" + merchandise + ''' +
                ", date=" + date +
                '}';
    }

    public static closing class Builder {
        personal String zipCode;
        personal String merchandise;
        personal Date date;

        personal  Builder() {
        }

        personal Builder(Transaction transaction) {
            this.zipCode = transaction.getZipCode();
            this.merchandise = transaction.getItemPurchased();
            this.date = transaction.getPurchaseDate();
        }

        public Builder zipCode(String val) {
            zipCode = val;
            return this;
        }

        public Builder merchandise(String val) {
            merchandise = val;
            return this;
        }

        public Builder date(Date val) {
            date = val;
            return this;
        }

        public TransactionPattern construct() {
            return new TransactionPattern(this);
        }
    }
}
public class RewardAccumulator {

    personal closing String customerName;
    personal closing double purchaseTotal;
    personal int totalRewardPoints;
    personal closing int currentRewardPoints;

    personal RewardAccumulator(String customerName, double purchaseTotal, int rewardPoints) {
        this.customerName = customerName;
        this.purchaseTotal = purchaseTotal;
        this.currentRewardPoints = rewardPoints;
        this.totalRewardPoints = rewardPoints;
    }

    public String getCustomerName() {
        return customerName;
    }

    public double getPurchaseTotal() {
        return purchaseTotal;
    }

    public int getTotalRewardPoints() {
        return totalRewardPoints;
    }

    public void addRewardPoints(int previousTotalPoints) {
        this.totalRewardPoints += previousTotalPoints;
    }

    @Override
    public String toString() {
        return "RewardAccumulator{" +
                "customerName="" + customerName + "'' +
                ", purchaseTotal=" + purchaseTotal +
                ", totalRewardPoints=" + totalRewardPoints +
                ", currentRewardPoints=" + currentRewardPoints +
                '}';
    }

    public static Builder builder(Transaction transaction){return new Builder(transaction);}

    public static closing class Builder {
        personal closing String customerName;
        personal closing double purchaseTotal;
        personal closing int rewardPoints;

        personal Builder(Transaction transaction){
            this.customerName = transaction.getLastName() + "," + transaction.getFirstName();
            this.purchaseTotal = transaction.getPrice() * transaction.getQuantity();
            this.rewardPoints = (int) purchaseTotal;
        }


        public RewardAccumulator construct(){
            return new RewardAccumulator(customerName, purchaseTotal, rewardPoints);
        }

    }
}

Defining Serializers/Deserializers

Subsequent, we will outline a JSON Serializer and Deserializer with a purpose to remodel the content material and produce/eat to/from Kafka subject.

public class JsonSerializer<T> implements Serializer<T> {

    personal closing Gson gson = new Gson();

    @Override
    public void configure(Map<String, ?> map, boolean b) {
    }

    @Override
    public byte[] serialize(String subject, T t) {
        return gson.toJson(t).getBytes(StandardCharsets.UTF_8);
    }

    @Override
    public void shut() {
    }
}
public class JsonDeserializer<T> implements Deserializer<T> {

    personal closing Gson gson = new Gson();
    personal Class<T> deserializedClass;

    public JsonDeserializer(Class<T> deserializedClass) {
        this.deserializedClass = deserializedClass;
    }

    public JsonDeserializer() {
    }

    @Override
    @SuppressWarnings("unchecked")
    public void configure(Map<String, ?> map, boolean b) {
        if(Objects.isNull(deserializedClass)) {
            deserializedClass = (Class<T>) map.get("serializedClass");
        }
    }

    @Override
    public T deserialize(String s, byte[] bytes) {
        if(Objects.isNull(bytes)) {
            return null;
        }
        return gson.fromJson(new String(bytes), deserializedClass);
    }

    @Override
    public void shut() {
    }
}

Defining Processors

Subsequent, we will outline the Processor objects to remodel or course of the objects from one kind to a different. We are going to prolong the AbstractProcessor class, which supplied no-op overrides for punctuate and shut strategies and requires to implement course of strategies.

public class CardAnonymizer extends AbstractProcessor<String, Transaction> {

    personal static closing String CC_NUMBER_REPLACEMENT="xxxx-xxxx-xxxx-";

    @Override
    public void course of(String key, Transaction transaction) {
        String last4Digits = transaction.getCardNumber().break up("-")[3];
        Transaction up to date = Transaction.builder(transaction).cardNumber(CC_NUMBER_REPLACEMENT+last4Digits).construct();
        context().ahead(key,up to date);
        context().commit();
    }
}
public class TransactionPatterns extends AbstractProcessor<String, Transaction> {

    @Override
    public void course of(String key, Transaction transaction) {
        TransactionPattern transactionPattern = TransactionPattern.newBuilder().date(transaction.getPurchaseDate())
                .merchandise(transaction.getItemPurchased())
                .zipCode(transaction.getZipCode()).construct();
        context().ahead(key, transactionPattern);
        context().commit();
    }
}
public class CustomerRewards extends AbstractProcessor<String, Transaction> {

    @Override
    public void course of(String key, Transaction worth) {
        RewardAccumulator accumulator = RewardAccumulator.builder(worth).construct();
        context().ahead(key, accumulator);
        context().commit();
    }
}

As you possibly can see, as soon as the processing is accomplished, the context().ahead() technique forwards the content material to downstream shoppers. Lastly, the context().commit() technique commits the present state of the stream together with the message offset.

Constructing the DAG Graph of Processor

Lastly, we’ll outline a directed acyclic graph(DAG) of processors to construct a topology. We are going to first provoke the serializers and deserializers after which outline a topology so as to add supply, processor and sink.

public class KafkaStreamsProcessorAPI {

    public static void foremost(String[] args) throws InterruptedException {
        closing Logger logger = LoggerFactory.getLogger(KafkaStreamsProcessorAPI.class);

        // Setting the Properties
        StreamsConfig streamingConfig = new StreamsConfig(setProperties());

        // Serializing and Deserializing the objects
        JsonDeserializer<Transaction> transactionJsonDeserializer = new JsonDeserializer<>(Transaction.class);
        JsonSerializer<Transaction> transactionJsonSerializer = new JsonSerializer<>();
        JsonSerializer<RewardAccumulator> rewardAccumulatorJsonSerializer = new JsonSerializer<>();
        JsonSerializer<TransactionPattern> transactionPatternJsonSerializer = new JsonSerializer<>();
        StringDeserializer stringDeserializer = new StringDeserializer();
        StringSerializer stringSerializer = new StringSerializer();

        // Constructing the Topology to course of the streams
        Topology topologyBuilder = new Topology();
        topologyBuilder
                .addSource("SOURCE", stringDeserializer, transactionJsonDeserializer, ApplicationConstants.SOURCE_TOPIC)
                .addProcessor("PROCESS1", CardAnonymizer::new, "SOURCE")
                .addProcessor("PROCESS2", TransactionPatterns::new, "PROCESS1")
                .addProcessor("PROCESS3", CustomerRewards::new, "PROCESS1")
                .addSink("SINK1", ApplicationConstants.SINK1_TOPIC, stringSerializer, transactionPatternJsonSerializer, "PROCESS2")
                .addSink("SINK2", ApplicationConstants.SINK2_TOPIC,stringSerializer, rewardAccumulatorJsonSerializer, "PROCESS3")
                .addSink("SINK3", ApplicationConstants.SINK3_TOPIC, stringSerializer, transactionJsonSerializer, "PROCESS1");

        // Initiating the Producer to supply transactions knowledge
        logger.data("Producing Transaction Processor Messages to Kafka");
        FakeDataProducer.produceTransactionsData();

        // Managing the Transaction stream and the producer
        logger.data("Beginning Kafka Streams Transaction Processor");
        KafkaStreams streaming = new KafkaStreams(topologyBuilder, streamingConfig);
        streaming.begin();
        logger.data("Now began Kafka Streams Transaction Processor");
        Thread.sleep(65000);
        logger.data("Shutting down the Kafka Streams Transaction Processor now");
        streaming.shut();
        FakeDataProducer.shutdown();
    }

    personal static Properties setProperties() {
        Properties props = new Properties();
        props.put(StreamsConfig.CLIENT_ID_CONFIG, "TransactionProcessorJob");
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "transaction-consumer-group");
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "transaction-processor-api");
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, ApplicationConstants.BOOTSTRAP_SERVERS);
        props.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 1);
        props.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, WallclockTimestampExtractor.class);
        return props;
    }
}

Because the Processor API offers higher management over the small print whereas constructing knowledge streaming functions, it offers low-level abstraction to KStream functions.

Kafka Streams – The KStream DSL API implementation

Within the earlier part, we took a glance into the low-level KStream Processor APIs the place we constructed a processor topology to deal with transaction streaming necessities and derive patterns and masks some delicate data. Though we constructed a nontrivial processing topology, the entire course of, i.e., the transformations and the operations have been utterly stateless. Which means that we thought-about every transaction in isolation, with out taking different occasions processing on the identical time into consideration.

Now we’ll have a look into the high-level Kafka Streams DSL API. The previous Processor API took extra of a declarative method however the KStream makes use of extra of a practical model. The KStream DSL consists of two foremost abstractions: KStream and KTable interfaces.

  • KStream is an infinite, unbounded stream of immutable recorded streams, the place every key-value pair is an impartial file. At any time when a brand new file is consumed from Kafka subject, it will get appended to KStream.

    Kstreams

  • KTable alternatively are fairly just like databases. Its a changelog stream the place the information that attain in later time is taken into account as an replace to earlier information with the identical key. It often deletes the information in case of null worth whereas it updates the information if the identical key re-appears once more.

    KTable

We’re going to predominantly look into the KStream as we need to resolve our use-case. So we’ll first convert the identical stateless transformation utilizing KStream after which we will look into the stateful transformation and keep a contextual state retailer.

Stateless Transformation utilizing Identical Processor Topology

We’re going to make use of the identical set of topology processor and convert into KStream. So we first outline the serializers and deserializers and overload it into Serde. Each Kafka Streams utility should present SerDes (Serializer/Deserializer) for the information sorts of file keys and file values (e.g. java.lang.String) to materialize the information when mandatory. Then we provoke a Stream Builder and map values to the streams for a similar workflow that we tried earlier.

public class KafkaStreamsDSLAPI {

    public static void foremost(String[] args) throws InterruptedException {
        closing Logger logger = LoggerFactory.getLogger(KafkaStreamsDSLAPI.class);

        // Setting the Properties
        StreamsConfig streamsConfig = new StreamsConfig(setProperties());

        // Serializing and Deserializing the transaction
        JsonDeserializer<Transaction> transactionJsonDeserializer = new JsonDeserializer<>(Transaction.class);
        JsonSerializer<Transaction> transactionJsonSerializer = new JsonSerializer<>();

        // Serializing and Deserializing the rewards
        JsonSerializer<RewardAccumulator> rewardAccumulatorJsonSerializer = new JsonSerializer<>();
        JsonDeserializer<RewardAccumulator> rewardAccumulatorJsonDeserializer = new JsonDeserializer<>(RewardAccumulator.class);

        // Serializing and Deserializing the transaction sample
        JsonSerializer<TransactionPattern> transactionPatternJsonSerializer = new JsonSerializer<>();
        JsonDeserializer<TransactionPattern> transactionPatternJsonDeserializer = new JsonDeserializer<>(TransactionPattern.class);

        // Producing Serde out of serializers and deserializers
        Serde<RewardAccumulator> rewardAccumulatorSerde = Serdes.serdeFrom(rewardAccumulatorJsonSerializer, rewardAccumulatorJsonDeserializer);
        Serde<TransactionPattern> transactionPatternSerde = Serdes.serdeFrom(transactionPatternJsonSerializer, transactionPatternJsonDeserializer);
        Serde<Transaction> transactionSerde = Serdes.serdeFrom(transactionJsonSerializer, transactionJsonDeserializer);
        Serde<String> stringSerde = Serdes.String();

        // Initiating the Stream Builder
        StreamsBuilder kStreamBuilder = new StreamsBuilder();

        // Masking the cardboard particulars in transaction objects
        KStream<String, Transaction> transactionKStream = kStreamBuilder
                .stream(ApplicationConstants.SOURCE_TOPIC, Consumed.with(stringSerde, transactionSerde))
                .mapValues(p -> Transaction.builder(p).maskCard().construct());

        // Retrieving a standard sample between transactions and forwarding to patterns subject
        KStream<String, TransactionPattern> patternKStream = transactionKStream
                .mapValues(transaction -> TransactionPattern.builder(transaction).construct());
        patternKStream.print(Printed.<String, TransactionPattern>toSysOut().withLabel(ApplicationConstants.SINK1_TOPIC));
        patternKStream.to(ApplicationConstants.SINK1_TOPIC, Produced.with(stringSerde, transactionPatternSerde));

        // Retrieving rewards from transactions and forwarding to rewards subject
        KStream<String, RewardAccumulator> rewardsKStream = transactionKStream
                .mapValues(transaction -> RewardAccumulator.builder(transaction).construct());
        rewardsKStream.print(Printed.<String, RewardAccumulator>toSysOut().withLabel(ApplicationConstants.SINK2_TOPIC));
        rewardsKStream.to(ApplicationConstants.SINK2_TOPIC, Produced.with(stringSerde, rewardAccumulatorSerde));

        // Forwarding the masked transaction to transactions subject
        transactionKStream.print(Printed.<String, Transaction>toSysOut().withLabel(ApplicationConstants.SINK3_TOPIC));
        transactionKStream.to(ApplicationConstants.SINK3_TOPIC, Produced.with(stringSerde, transactionSerde));

        // Initiating the Producer to supply transactions knowledge
        logger.data("Producing Transaction Processor Messages to Kafka");
        FakeDataProducer.produceTransactionsData();

        // Managing the Transaction stream and the producer
        logger.data("Beginning Transaction Streams DSL");
        KafkaStreams kafkaStreams = new KafkaStreams(kStreamBuilder.construct(), streamsConfig);
        kafkaStreams.begin();
        logger.data("Now began Transaction Streams DSL");
        Thread.sleep(65000);
        logger.data("Shutting down the Kafka Streams Transaction Processor now");
        kafkaStreams.shut();
        FakeDataProducer.shutdown();
    }

    personal static Properties setProperties() {
        Properties props = new Properties();
        props.put(StreamsConfig.CLIENT_ID_CONFIG, "KafkaStreamsDSLJob");
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "streams-transaction");
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-dsl-api");
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, ApplicationConstants.BOOTSTRAP_SERVERS);
        props.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 1);
        props.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, WallclockTimestampExtractor.class);
        return props;
    }
}

As you possibly can see that now we have used few stateless capabilities to remodel the occasions or messages from one kind to different. So, let’s categorize these stateless capabilities and have a look into its operations in order that we will get a transparent image on its precise utilization.

Stateless Operations utilizing KStream

KStream DSL offers numerous practical strategies, however we’ll particularly look into few of them for our stateless operations:

  • filter/filterNot
  • map
  • mapValues
  • flatMap
  • flatMapValues
  • department
  • merge
  • selectKey
  • to
  • by way of
  • print
  • peek

filter/filterNot

If we need to apply a situation on KStream primarily based upon which we anticipate that the ultimate stream filters out numerous information, then we will use filter technique. Within the beneath instance, we attempt to filter out the information which has a worth higher than 100.

transactionKStream.filter((key, worth) -> worth.getPrice() > 1000);

Filter

filterNot is precisely reverse to the filter technique.

transactionKStream.filterNot((key, worth) -> worth.getPrice() < 100);

map

This is among the mostly used stateless operation. It may be used to remodel a given file within the enter KStream by merely making use of a mapper perform. Let’s say if we need to convert the important thing to uppercase, then we will use the next map technique:

transactionKStream.map((KeyValueMapper<String, Transaction, KeyValue<String, Transaction>>) (okay, v) ->
                new KeyValue<>(okay.toUpperCase(), v));

Map

mapValues

That is identical as map however applies solely if you wish to alter its values.

transactionKStream.mapValues(worth -> worth.getFirstName().toUpperCase());

MapValues

flatMap

The flatMap technique is just like map technique. It means that you can return a number of information. FlatMap adjustments the keys and set off repartition.

Within the following instance, every file from the stream would get flat mapped so that every comma-separated worth of the objects bought is first break up into its constituents primarily based upon a situation after which mapped right into a KeyValue pair for every string to kind an array or an inventory.

transactionKStream.flatMap((okay, csv) -> Arrays.stream(csv.getItemPurchased().break up(","))
                .map(worth -> new KeyValue<>(okay, worth))
                .gather(Collectors.toList()));

FlatMap

flatMapValues

The flatMapValues is just like flatMap however applies solely on the values. FlatMapValues don’t change the keys, therefore it doesn’t set off any repartition as effectively.

transactionKStream.flatMapValues(csv -> Arrays.stream(csv.getItemPurchased().break up(","))
                .map(worth -> new KeyValue<>(csv.getCustomerId(), worth))
                .gather(Collectors.toList()));

department

If we need to break up a KStream into a number of KStreams utilizing a number of predicates, then we will use department technique. Within the following instance, we will outline predicates which could be evaluated in a specific order. If the file accommodates objects whose worth is lower than 1000 then it would develop into a part of first KStream. If the file accommodates objects whose worth is between 1000 and 5000 then it will likely be a part of second KStream. Lastly all the opposite information could be a part of the final KStream.

transactionKStream.department(
                (key, worth) -> worth.getPrice() < 1000,
                (key, worth) -> worth.getPrice() < 5000,
                (key, worth) -> worth.getPrice() > 5000);

Branch

merge

merge is used to merge two KStreams right into a single KStream. One factor to notice is that the resultant stream could not have all of the information so as.

StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> stream1 = builder.stream("topic1");
KStream<String, String> stream2 = builder.stream("topic2");
stream1.merge(stream2).to("final-topic");

selectKey

selectKey assigns a brand new key to the file. Within the following instance, the selectKey will assign the primary letter of the previous key as the brand new key.

transactionKStream.selectKey((key, worth) -> key.substring(0, 1));

SelectKey

to

to is taken into account as a terminal operation in Kafka Stream which returns void as a substitute of an intermediate. This technique can be utilized to retailer the information of a KStream to a Kafka subject. It may also be overloaded to specify a Produced object to customise the Serdes and the partitioner.

transactionKStream.to("subject");
transactionKStream.to(
                "another-topic",
                Produced.with(
                        Serdes.String(),
                        Serdes.serdeFrom(
                            new JsonSerializer<>(),
                            new JsonDeserializer<>(Transaction.class))
                )
);

Generally, we could come throughout a state of affairs, the place as a substitute of defining a static subject identify may have to decide on a subject dynamically by forming a string. KStream offers TopicNameExtractor technique which overrides and permits to extract the RecordContext that specifies the metadata of the file from which we will simply retrieve the subject identify and append something to it to kind a dynamic subject.

transactionKStream.to(String.valueOf((TopicNameExtractor<String, String>) (okay, v, rc) -> rc.subject() + "_intermediate"));

by way of

Whereas dealing with multi-step transformation, we frequently should carry out intermediate push to matters after which lastly course of it to a resultant subject. We are able to use by way of technique to push intermediate transformation information.

transactionKStream.mapValues(v -> v.getFirstName().toUpperCase())
                .by way of("intermediate-topic")
                .filter((okay,v) -> v.size() > 1)
                .to("output-topic");

print

print is one other terminal operation that prints the information both to a console or to a file.

transactionKStream.print(Printed.<String, Transaction>toSysOut().withLabel("subject"));

peek

peek technique works the identical approach as print however not like print is a terminal technique, peek returns the identical occasion of KStream. It accepts a ForeachAction, which can be utilized to specify what you need to do for every file, prefer to log the important thing and the worth.

transactionKStream.mapValues(v -> v.getFirstName().toUpperCase())
                .peek((okay,v) -> logger.data("key={}, worth={}", okay, v))
                .to("output-topic");

Stateful Operations utilizing KStream

Often whereas coping with large knowledge, we frequently want good context to make significant selections. Within the context of stream processing this mechanism is termed as context state. Though stream processing implies a relentless circulation of information in a discrete vogue, including a context state helps to carry out stateful processing of information inside a shifting window body. If we inherit the identical topology of transaction processing, we will add a context state to maintain a cumulative observe of reward factors and course of it to offer higher reward factors.

Essentially the most fundamental perform to remodel the stateless processor into stateful processor is transformValues. This technique is semantically just like mapValues however tansformValues has entry to a contextual StateStore occasion to carry out its job. So with a purpose to begin with this implementation, now we have to first outline an occasion of ValueTransformer which can eat an incoming transaction file and convert into RewardAccumulator object.

public class TransactionRewardTransformer implements ValueTransformer<Transaction, RewardAccumulator> {

    personal KeyValueStore<String, Integer> stateStore;
    personal closing String storeName;

    public TransactionRewardTransformer(String storeName) {
        Objects.requireNonNull(storeName, "Retailer Identify cannot be null");
        this.storeName = storeName;
    }

    @Override
    public void init(ProcessorContext context) {
        stateStore = context.getStateStore(storeName);
    }

    @Override
    public RewardAccumulator remodel(Transaction worth) {
        RewardAccumulator rewardAccumulator = RewardAccumulator.builder(worth).construct();
        Integer accumulatedSoFar = stateStore.get(rewardAccumulator.getCustomerName());

        if (Objects.nonNull(accumulatedSoFar)) {
            rewardAccumulator.addRewardPoints(accumulatedSoFar);
        }
        stateStore.put(rewardAccumulator.getCustomerName(), rewardAccumulator.getTotalRewardPoints());
        return rewardAccumulator;
    }

    @Override
    public void shut() {}
}

Now a Kafka Producer distributes the information into a subject evenly in a round-robin vogue. However we may see an issue right here. Because of round-robin project, the transactions for a given buyer gained’t land into identical partitions each time. It’s essential to supply the transactions with identical ID on the identical partition. In any other case if the purchasers with identical ID can be unfold throughout completely different partitions, it might require us to look for a similar buyer in a number of state shops. Therefore, we would want to repartition the information by buyer ID. Thus we will outline an occasion of StreamPartitioner to recalculate the partition project by by taking the hash of an object modulus the variety of partitions.

public class RewardsStreamPartitioner implements StreamPartitioner<String, Transaction> {

    @Override
    public Integer partition(String s, String s2, Transaction transaction, int numPartitions) {
        return transaction.getCustomerId().hashCode() % numPartitions;
    }
}

The repartitioning in Kafka Streams could be simply achieved by utilizing the by way of() technique that now we have mentioned above. This technique creates an intermediate subject, and the present KStream occasion will begin writing information to that subject. Then a brand new KStream occasion is returned from the by way of() technique name, utilizing the identical intermediate subject for its supply. Lastly on this approach, the information could be repartitioned in a seamless method.

Lastly, we will remodel the above stateless implementation right into a stateful processor within the following approach:

public class KafkaStreamsDSLStatefulAPI {

    public static void foremost(String[] args) throws InterruptedException {
        closing Logger logger = LoggerFactory.getLogger(KafkaStreamsDSLStatefulAPI.class);

        // Setting the Properties
        StreamsConfig streamsConfig = new StreamsConfig(setProperties());

        // Serializing and Deserializing the transaction
        JsonDeserializer<Transaction> transactionJsonDeserializer = new JsonDeserializer<>(Transaction.class);
        JsonSerializer<Transaction> transactionJsonSerializer = new JsonSerializer<>();

        // Serializing and Deserializing the rewards
        JsonSerializer<RewardAccumulator> rewardAccumulatorJsonSerializer = new JsonSerializer<>();
        JsonDeserializer<RewardAccumulator> rewardAccumulatorJsonDeserializer = new JsonDeserializer<>(RewardAccumulator.class);

        // Serializing and Deserializing the transaction sample
        JsonSerializer<TransactionPattern> transactionPatternJsonSerializer = new JsonSerializer<>();
        JsonDeserializer<TransactionPattern> transactionPatternJsonDeserializer = new JsonDeserializer<>(TransactionPattern.class);

        // Producing Serde out of serializers and deserializers
        Serde<RewardAccumulator> rewardAccumulatorSerde = Serdes.serdeFrom(rewardAccumulatorJsonSerializer, rewardAccumulatorJsonDeserializer);
        Serde<TransactionPattern> transactionPatternSerde = Serdes.serdeFrom(transactionPatternJsonSerializer, transactionPatternJsonDeserializer);
        Serde<Transaction> transactionSerde = Serdes.serdeFrom(transactionJsonSerializer, transactionJsonDeserializer);
        Serde<String> stringSerde = Serdes.String();

        // Initiating the Stream Builder
        StreamsBuilder kStreamBuilder = new StreamsBuilder();

        // Masking the cardboard particulars in transaction objects
        KStream<String, Transaction> transactionKStream = kStreamBuilder
                .stream( ApplicationConstants.SOURCE_TOPIC, Consumed.with(stringSerde, transactionSerde))
                .mapValues(p -> Transaction.builder(p).maskCard().construct());

        // Retrieving a standard sample between transactions and forwarding to patterns subject
        KStream<String, TransactionPattern> patternKStream = transactionKStream
                .mapValues(transaction -> TransactionPattern.builder(transaction).construct());
        patternKStream.print(Printed.<String, TransactionPattern>toSysOut().withLabel(ApplicationConstants.SINK1_TOPIC));
        patternKStream.to(ApplicationConstants.SINK1_TOPIC, Produced.with(stringSerde, transactionPatternSerde));

        // including State to processor
        String rewardsStateStoreName = "rewardsPointsStore";
        RewardsStreamPartitioner streamPartitioner = new RewardsStreamPartitioner();
        KeyValueBytesStoreSupplier storeSupplier = Shops.inMemoryKeyValueStore(rewardsStateStoreName);
        StoreBuilder<KeyValueStore<String, Integer>> storeBuilder = Shops
                .keyValueStoreBuilder(storeSupplier, Serdes.String(), Serdes.Integer());
        kStreamBuilder.addStateStore(storeBuilder);
        KStream<String, Transaction> transByCustomerStream = transactionKStream
                .by way of( "customer_transactions", Produced.with(stringSerde, transactionSerde, streamPartitioner));

        // Retrieving rewards from transactions and forwarding to rewards subject
        KStream<String, RewardAccumulator> statefulRewardAccumulator = transByCustomerStream
                .transformValues(() ->  new TransactionRewardTransformer(rewardsStateStoreName), rewardsStateStoreName);
        statefulRewardAccumulator.print(Printed.<String, RewardAccumulator>toSysOut().withLabel(ApplicationConstants.SINK2_TOPIC));
        statefulRewardAccumulator.to(ApplicationConstants.SINK2_TOPIC, Produced.with(stringSerde, rewardAccumulatorSerde));

        // Forwarding the masked transaction to transactions subject
        transactionKStream.print(Printed.<String, Transaction>toSysOut().withLabel(ApplicationConstants.SINK3_TOPIC));
        transactionKStream.to(ApplicationConstants.SINK3_TOPIC, Produced.with(stringSerde, transactionSerde));

        // Initiating the Producer to supply transactions knowledge
        logger.data("Producing Transaction Processor Messages to Kafka");
        FakeDataProducer.produceTransactionsData();

        // Managing the Transaction stream and the producer
        logger.data("Beginning Transaction Streams Stateful DSL");
        KafkaStreams kafkaStreams = new KafkaStreams(kStreamBuilder.construct(), streamsConfig);
        kafkaStreams.begin();
        logger.data("Now began Transaction Streams Stateful DSL");
        Thread.sleep(65000);
        logger.data("Shutting down the Kafka Streams Transaction Processor now");
        kafkaStreams.shut();
        FakeDataProducer.shutdown();
    }

    personal static Properties setProperties() {
        Properties props = new Properties();
        props.put(StreamsConfig.CLIENT_ID_CONFIG, "KafkaStreamsDSLJob");
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "streams-transaction");
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-dsl-stateful-api");
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "newest");
        props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "5000");
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, ApplicationConstants.BOOTSTRAP_SERVERS);
        props.put(ConsumerConfig.METADATA_MAX_AGE_CONFIG, "10000");
        props.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 1);
        props.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, TransactionTimestampExtractor.class);
        return props;
    }
}

The entire stateful mechanism is dealt with by the transformValues processor. Therefore, it’s important to take a deep-dive into its working.

transformValues

Joined Streams

Generally we get a requirement to combination occasions with identical class after which ahead for additional processing. Let’s say in our context, we have to categorize the transactions into the purchasers who’ve purchased computer systems and those who had purchased baggage. Then we will combination and ahead for additional processing. This type of data would assist the businesses to supply instantaneous coupons primarily based upon sale of a specific merchandise to the purchasers.

As streams want state to deal with this sort of eventualities, we will attempt becoming a member of occasions from two streams with the identical key and mix them to kind a brand new occasion. We are able to use department() technique to categorize the objects after which be a part of the streams inside a window body to combination the occasions.

So let’s outline an occasion of ValueJoiner to combination the transactions right into a Correlated transaction. We are able to first instantiate the CorrelatedTransaction.

public class CorrelatedTransaction {

    personal closing String customerName;
    personal closing Checklist<String> itemsPurchased;
    personal closing double totalAmount;
    personal closing Date firstPurchaseTime;
    personal closing Date secondPurchaseTime;

    personal CorrelatedTransaction(Builder builder) {
        customerName = builder.customerName;
        itemsPurchased = builder.itemsPurchased;
        totalAmount = builder.totalAmount;
        firstPurchaseTime = builder.firstPurchasedItem;
        secondPurchaseTime = builder.secondPurchasedItem;
    }

    public static Builder newBuilder() {
        return new Builder();
    }

    public String getCustomerName() {
        return customerName;
    }

    public Checklist<String> getItemsPurchased() {
        return itemsPurchased;
    }

    public double getTotalAmount() {
        return totalAmount;
    }

    public Date getFirstPurchaseTime() {
        return firstPurchaseTime;
    }

    public Date getSecondPurchaseTime() {
        return secondPurchaseTime;
    }


    @Override
    public String toString() {
        return "CorrelatedTransaction{" +
                "customerId='" + customerName + ''' +
                ", itemsPurchased=" + itemsPurchased +
                ", totalAmount=" + totalAmount +
                ", firstPurchaseTime=" + firstPurchaseTime +
                ", secondPurchaseTime=" + secondPurchaseTime +
                '}';
    }

    public static closing class Builder {
        personal String customerName;
        personal Checklist<String> itemsPurchased;
        personal double totalAmount;
        personal Date firstPurchasedItem;
        personal Date secondPurchasedItem;

        personal Builder() {
        }

        public Builder withCustomerName(String val) {
            customerName = val;
            return this;
        }

        public Builder withItemsPurchased(Checklist<String> val) {
            itemsPurchased = val;
            return this;
        }

        public Builder withTotalAmount(double val) {
            totalAmount = val;
            return this;
        }

        public Builder withFirstPurchaseDate(Date val) {
            firstPurchasedItem = val;
            return this;
        }

        public Builder withSecondPurchaseDate(Date val) {
            secondPurchasedItem = val;
            return this;
        }

        public CorrelatedTransaction construct() {
            return new CorrelatedTransaction(this);
        }
    }
}
public class TransactionJoiner implements ValueJoiner<Transaction, Transaction, CorrelatedTransaction> {

    @Override
    public CorrelatedTransaction apply(Transaction transaction, Transaction otherTransaction) {

        CorrelatedTransaction.Builder builder = CorrelatedTransaction.newBuilder();

        Date purchaseDate = Objects.nonNull(transaction) ? transaction.getPurchaseDate() : null;
        Double worth = Objects.nonNull(transaction) ? transaction.getPrice() : 0.0;
        String itemPurchased = Objects.nonNull(transaction) ? transaction.getItemPurchased() : null;

        Date otherPurchaseDate = Objects.nonNull(otherTransaction) ? otherTransaction.getPurchaseDate() : null;
        Double otherPrice = Objects.nonNull(otherTransaction) ? otherTransaction.getPrice() : 0.0;
        String otherItemPurchased = Objects.nonNull(otherTransaction) ? otherTransaction.getItemPurchased() : null;

        Checklist<String> purchasedItems = new ArrayList<>();

        if (Objects.nonNull(itemPurchased)) {
            purchasedItems.add(itemPurchased);
        }

        if (Objects.nonNull(otherItemPurchased)) {
            purchasedItems.add(otherItemPurchased);
        }

        String customerName = Objects.nonNull(transaction)
                ? transaction.getFirstName() + " " + transaction.getLastName() : null;
        String otherCustomerName = Objects.nonNull(otherTransaction)
                ? otherTransaction.getFirstName() + " " + otherTransaction.getLastName() : null;

        builder.withCustomerName(Objects.nonNull(customerName) ? customerName : otherCustomerName)
                .withFirstPurchaseDate(purchaseDate)
                .withSecondPurchaseDate(otherPurchaseDate)
                .withItemsPurchased(purchasedItems)
                .withTotalAmount(worth + otherPrice);

        return builder.construct();
    }
}

Lastly, we will outline a department and carry out joins to combination and print the KStream information.

        // Predicate to department out transactions to carry out Joins
        Predicate<String, Transaction> computerTransaction = (key, transaction) -> transaction.getItemPurchased().accommodates("Pc");
        Predicate<String, Transaction> bagTransaction = (key, transaction) -> transaction.getItemPurchased().accommodates("Bag");

        // Initiating the Stream Builder
        StreamsBuilder kStreamBuilder = new StreamsBuilder();

        // Masking the cardboard particulars in transaction objects
        KStream<String, Transaction> transactionKStream = kStreamBuilder
                .stream( ApplicationConstants.SOURCE_TOPIC, Consumed.with(stringSerde, transactionSerde))
                .mapValues(p -> Transaction.builder(p).maskCard().construct());

        // Department and Be part of Streams by customerID for a given merchandise class
        closing int COMPUTER_TRANSACTION = 0;
        closing int BAG_TRANSACTION = 0;
        KStream<String, Transaction>[] branchesStream = transactionKStream
                .selectKey((okay, v)-> v.getCustomerId())
                .department(computerTransaction, bagTransaction);
        KStream<String, Transaction> computerStream = branchesStream[COMPUTER_TRANSACTION];
        KStream<String, Transaction> bagStream = branchesStream[BAG_TRANSACTION];
        ValueJoiner<Transaction, Transaction, CorrelatedTransaction> transactionJoiner = new TransactionJoiner();
        JoinWindows twentyMinuteWindow =  JoinWindows.of(Period.ofMinutes(60 * (lengthy)1000 * 20));
        KStream<String, CorrelatedTransaction> joinedKStream = computerStream.be a part of(
                bagStream,
                transactionJoiner,
                twentyMinuteWindow,
                StreamJoined.with(stringSerde, transactionSerde, transactionSerde));
        joinedKStream.print(Printed.<String, CorrelatedTransaction>toSysOut().withLabel("joined KStream"));

Kstream Joins

Conclusion

On this chapter, we took a deep-dive into the Kafka Streams and all its APIs. Kafka Streams additionally offers related sort of APIs to assist KTable which works fairly equal to database. Now as soon as now we have the information streaming course of being sorted and utterly dealt with, all we would want is to supply configurations and assist to combine a number of knowledge sources. So we’ll have a look into the Kafka Join APIs in our subsequent chapter. All of the code applied as a part of this chapter could be present in GitHub.

RELATED ARTICLES

LEAVE A REPLY

Please enter your comment!
Please enter your name here

- Advertisment -
Google search engine

Most Popular

Recent Comments