Thursday, July 4, 2024
HomeData ScienceTips on how to Learn Kafka Clickstream Occasion Information in Pandas |...

Tips on how to Learn Kafka Clickstream Occasion Information in Pandas | by Wen Yang | Oct, 2022


Construct Kafka shopper to learn occasion knowledge with a particular date vary in Jupyter pocket book

Picture by Jonatan Pie on Unsplash

These days, I’ve been assigned a seemingly easy process →

“We began to seize occasions from a brand new app. May you confirm such take a look at occasions landed in Kafka correctly?”

I’ve a tough thought of how knowledge flows in our system: clickstream knowledge collected from each Net and cell apps would stream to MetaRouter first, after which MetaRouter works as a Kafka producer, which produces occasion logs to a sure Kafka matter. The occasions generated from our new app have a shared writeKey . Due to this fact to be able to learn these occasions, I would want to :

  1. Create a Kafka shopper to hearken to this Kafka matter
  2. Since I do know such take a look at occasions are produced in a sure date vary, I wish to construct the Kafka shopper solely learn occasions for the desired dates.
  3. Retailer the info in a method that I may filter and do evaluation with, ideally in a pandas dataframe.

So my objective is to realize the info stream from Kafka -> Pandas!

After quite a few days of googling round StackOverflow, GitHub, and numerous websites, I lastly obtained it working! And beneath are my code snippets on learn how to implement in two of the preferred python libraries for Kafka.

Prerequisite: pip set up kafka-python (The newest model 2.0.2 is utilized in my pocket book)

First, we have to import beneath libraries and Kafka surroundings variables. Beneath Setup scripts might be reused in answer 2 with one minor change.

Subsequent, construct a Kafka shopper to learn occasions from a particular datetime vary. There are 5 steps:

  • Step 1: Since I do know the take a look at occasions had been despatched between 2022–09–22 between 12 pm to 14 pm (UTC) time, I take advantage of datetime operate to create dt_start and dt_end to sure the time vary.
  • Step 2: In Kafka, solely occasions from the identical partition are so as, so we have to learn occasions from the desired partition. (say you might have 6 partitions for the subject, you may decide any quantity from 0–5 to make use of as partition).
  • Step 3: The fundamental shopper requires matter , bootstrap_servers and group_id . I discovered that in Jupyter pocket book, if I don’t present security_protocol it might throw errors.
  • Step 4: That is the important thing! The way in which it really works is like this:
    – datetime object → get transformed to UTC timestamp in milliseconds → get transformed to the associated to offset quantity in a Matter Partition
    – important operate is shopper.offsets_for_times({tp:dt_start.timestamp()*1000})
  • Step 5: Use search to fetch occasions ranging from the specified beginning time
    – every message has a properties offset , and we evaluate it with the specified ending time offset to determine whether or not proceed or break

Sufficient speaking, and right here’s the complete code →

And after that, I can question thewriteKey of our new app in Pandas! 🐼

The above answer is impressed by a comparable query from StackOverflow. Truly, that is the place I began to do immense search and located there’s no equal answer to make use of confluent-kafka . Since my authentic code is predicated on confluent-kafka as a substitute of kafka-python , I used to be puzzled by their seeming similarity but nuanced variations.

Now I’m completely happy to introduce my very own answer utilizing confluent-kafka😃~~~

Prerequisite: pip set up confluent-kafka (The newest model 1.9.2 is utilized in my pocket book)

Right here you should utilize the identical set-up script from Answer 1 with one minor change:

  • change line 10 to from confluent_kafka import Shopper, TopicPartition

Subsequent, we have to construct a Kafka shopper to learn occasions from particular datetime vary. On high-level, we nonetheless want the identical 5 steps however the principle distinction is that we have to use on_assign to realize what search does — to fetch a particular offset from a Matter Partition.

Step 1: Similar as answer 1, we want datetime objects to sure the search vary.

Step 2: Similar as answer 1. One tough factor is that more often than not you should utilize string as matter corresponding to (matter = 'analytics__pageview' ), however whenever you wish to subscribe , it solely accepts an inventory corresponding to shopper.subscribe(['analytics__pageview']) ! (As Dumbledore would possibly say: “How perculiar~~” 🧙

Step 3: virtually an identical to answer 1 apart from changing = to : in variables project.

Step 4: Right here’s what you’ll see the nuanced variations! We’d like an additional step 4c to assemble on_assign . And this operate is initially from github confluent-kafka-python challenge offered by Magnus Edenhill.

Step 5: as a substitute of utilizing search , right here we use subscribe with each matter (in listing type) and on_assign to fetch the offset from the specified beginning time. And we have to name shut() after fetching.

Yet another element that value noting is that to learn how to get the offset quantity.

In kafka-python , you utilize offset_start[tp].offset and offset_start is a dictionary.

offset_start = shopper.offsets_for_times({tp:dt_start.timestamp() * 1000})# to print out the offset quantity
offset_start[tp].offset

Output: (discover the { } indicating sort is dict)

In confluent-kafka , the offset_start is an inventory, so you should use offset_start[0].offset

tp_in = TopicPartition(matter=matter, partition=partition, 
offset=int(dt_start.timestamp() * 1000))
offset_start = c.offsets_for_times([tp_in])# to print out the offset quantity
offset_start[0].offset

Output: (discover the [ ] indicating sort is listing)

Alrighty, right here’s the complete code for implementing on confluent-kafka

  • It’s a pattern to construct event-driven functions, and I foresee there’s a rising want for Information Scientists to have the ability to rapidly course of and do easy exploration evaluation on occasions knowledge. This might help inform which knowledge fields needs to be additional remodeled and launched to ETL pipeline, which most likely ought to contain Faust and ksql fairly than pandas.
RELATED ARTICLES

LEAVE A REPLY

Please enter your comment!
Please enter your name here

- Advertisment -
Google search engine

Most Popular

Recent Comments