Construct Kafka shopper to learn occasion knowledge with a particular date vary in Jupyter pocket book
These days, I’ve been assigned a seemingly easy process →
“We began to seize occasions from a brand new app. May you confirm such take a look at occasions landed in Kafka correctly?”
I’ve a tough thought of how knowledge flows in our system: clickstream knowledge collected from each Net and cell apps would stream to MetaRouter first, after which MetaRouter works as a Kafka producer, which produces occasion logs to a sure Kafka matter. The occasions generated from our new app have a shared writeKey
. Due to this fact to be able to learn these occasions, I would want to :
- Create a Kafka shopper to hearken to this Kafka matter
- Since I do know such take a look at occasions are produced in a sure date vary, I wish to construct the Kafka shopper solely learn occasions for the desired dates.
- Retailer the info in a method that I may filter and do evaluation with, ideally in a pandas dataframe.
So my objective is to realize the info stream from Kafka -> Pandas!
After quite a few days of googling round StackOverflow, GitHub, and numerous websites, I lastly obtained it working! And beneath are my code snippets on learn how to implement in two of the preferred python libraries for Kafka.
Prerequisite: pip set up kafka-python
(The newest model 2.0.2 is utilized in my pocket book)
First, we have to import beneath libraries and Kafka surroundings variables. Beneath Setup scripts might be reused in answer 2 with one minor change.
Subsequent, construct a Kafka shopper to learn occasions from a particular datetime vary. There are 5 steps:
- Step 1: Since I do know the take a look at occasions had been despatched between 2022–09–22 between 12 pm to 14 pm (UTC) time, I take advantage of
datetime
operate to createdt_start
anddt_end
to sure the time vary. - Step 2: In Kafka, solely occasions from the identical partition are so as, so we have to learn occasions from the desired partition. (say you might have 6 partitions for the subject, you may decide any quantity from 0–5 to make use of as partition).
- Step 3: The fundamental shopper requires
matter
,bootstrap_servers
andgroup_id
. I discovered that in Jupyter pocket book, if I don’t presentsecurity_protocol
it might throw errors. - Step 4: That is the important thing! The way in which it really works is like this:
– datetime object → get transformed to UTC timestamp in milliseconds → get transformed to the associated to offset quantity in a Matter Partition
– important operate isshopper.offsets_for_times({tp:dt_start.timestamp()*1000})
- Step 5: Use
search
to fetch occasions ranging from the specified beginning time
– every message has a propertiesoffset
, and we evaluate it with the specified ending time offset to determine whether or not proceed or break
Sufficient speaking, and right here’s the complete code →
And after that, I can question thewriteKey
of our new app in Pandas! 🐼
The above answer is impressed by a comparable query from StackOverflow. Truly, that is the place I began to do immense search and located there’s no equal answer to make use of confluent-kafka
. Since my authentic code is predicated on confluent-kafka
as a substitute of kafka-python
, I used to be puzzled by their seeming similarity but nuanced variations.
Now I’m completely happy to introduce my very own answer utilizing confluent-kafka
😃~~~
Prerequisite: pip set up confluent-kafka
(The newest model 1.9.2 is utilized in my pocket book)
Right here you should utilize the identical set-up script from Answer 1 with one minor change:
- change line 10 to
from confluent_kafka import Shopper, TopicPartition
Subsequent, we have to construct a Kafka shopper to learn occasions from particular datetime vary. On high-level, we nonetheless want the identical 5 steps however the principle distinction is that we have to use on_assign
to realize what search
does — to fetch a particular offset from a Matter Partition.
Step 1: Similar as answer 1, we want datetime objects to sure the search vary.
Step 2: Similar as answer 1. One tough factor is that more often than not you should utilize string as matter corresponding to (matter = 'analytics__pageview'
), however whenever you wish to subscribe
, it solely accepts an inventory corresponding to shopper.subscribe(['analytics__pageview'])
! (As Dumbledore would possibly say: “How perculiar~~” 🧙
Step 3: virtually an identical to answer 1 apart from changing =
to :
in variables project.
Step 4: Right here’s what you’ll see the nuanced variations! We’d like an additional step 4c
to assemble on_assign
. And this operate is initially from github confluent-kafka-python challenge offered by Magnus Edenhill.
Step 5: as a substitute of utilizing search
, right here we use subscribe
with each matter (in listing type) and on_assign
to fetch the offset from the specified beginning time. And we have to name shut()
after fetching.
Yet another element that value noting is that to learn how to get the offset quantity.
In kafka-python
, you utilize offset_start[tp].offset
and offset_start
is a dictionary.
offset_start = shopper.offsets_for_times({tp:dt_start.timestamp() * 1000})# to print out the offset quantity
offset_start[tp].offset
Output: (discover the { } indicating sort is dict)
In confluent-kafka
, the offset_start
is an inventory, so you should use offset_start[0].offset
tp_in = TopicPartition(matter=matter, partition=partition,
offset=int(dt_start.timestamp() * 1000))offset_start = c.offsets_for_times([tp_in])# to print out the offset quantity
offset_start[0].offset
Output: (discover the [ ] indicating sort is listing)
Alrighty, right here’s the complete code for implementing on confluent-kafka
→
- It’s a pattern to construct event-driven functions, and I foresee there’s a rising want for Information Scientists to have the ability to rapidly course of and do easy exploration evaluation on occasions knowledge. This might help inform which knowledge fields needs to be additional remodeled and launched to ETL pipeline, which most likely ought to contain
Faust
andksql
fairly thanpandas.