A small-scale handy method to scheduling ETL duties with python.
Python has turn out to be an all-purpose language. It’s particularly generally utilized in analytics and fixing algorithmic issues inside knowledge science however can also be standard in internet growth. This mix makes it an affordable alternative for varied extract-transform-load (ETL) duties.
Nevertheless, many of those duties are fairly small and don’t require giant frameworks reminiscent of Airflow or Luigi. When polling one or a number of internet pages for knowledge, a easy python script plus crontab is greater than adequate. Nonetheless, when a mission will get just a little larger, managing a number of jobs utilizing cron might turn out to be cumbersome. On the similar time, naked set up of Airflow for “small jobs” wants a minimum of 4GB RAM and a pair of CPUs (right here). Fascinated by AWS prices, it’s a minimum of a t2.small occasion operating always.
Is there something in between? Sufficiently small to make use of, say t2.nano (very low-cost) and pretty “maintainable” and “extendable”?
On this put up, I wish to share with you a easy method that makes use of python’s schedule package deal with a number of modifications.
Python schedule library presents easy process scheduling. It’s installable utilizing pip
, and pretty simple to make use of. Sadly, the documentation does not present examples of utilizing it inside a bigger mission:
import schedule
import timedef job():
print("I am working...")
# Run job each 3 second/minute/hour/day/week,
# Beginning 3 second/minute/hour/day/week from now
schedule.each(3).seconds.do(job)
schedule.each(3).minutes.do(job)
schedule.each(3).hours.do(job)
schedule.each(3).days.do(job)
schedule.each(3).weeks.do(job)
# Run job each minute on the twenty third second
schedule.each().minute.at(":23").do(job)
# Run job each hour on the 42rd minute
schedule.each().hour.at(":42").do(job)
# Run jobs each fifth hour, 20 minutes and 30 seconds in.
# If present time is 02:00, first execution is at 06:20:30
schedule.each(5).hours.at("20:30").do(job)
# Run job day by day at particular HH:MM and subsequent HH:MM:SS
schedule.each().day.at("10:30").do(job)
schedule.each().day.at("10:30:42").do(job)
# Run job on a selected day of the week
schedule.each().monday.do(job)
schedule.each().wednesday.at("13:15").do(job)
schedule.each().minute.at(":17").do(job)
whereas True:
schedule.run_pending()
time.sleep(1)
As you possibly can see, all features are referred to as on the stage of the module, which is OK for putting it in a script. Nevertheless, if in case you have a number of completely different jobs, the code shortly turns into cluttered, particularly if completely different callables require completely different parameters.
In different phrases, it could preferable to reap the benefits of the object-oriented method and outline some “structure” round it.
Let’s say, for the sake of an argument, that we’ve got a set of devoted ETL duties, modeled utilizing the next summary class:
from abc import ABC, abstractmethod
from typing import Any, Dict, TypeVarE = TypeVar("ETL")
class BaseETL(ABC):
def __init__(self, **kwargs: Dict) -> None:
self.raw_data = None
self.transformed_data = None
@abstractmethod
def extract(self, **kwargs: Dict) -> E:
...
@abstractmethod
def remodel(self, **kwargs: Dict) -> E:
...
@abstractmethod
def load(self, **kwargs: Dict) -> Any:
...
def run(self, **kwargs: Dict) -> None:
self.extract(**kwargs).remodel(**kwargs).load(**kwargs)
Any class that may implement an ETL course of would inherit from this base class. The extract
methodology may, for instance, fetch a web site. Then remodel
would remodel the uncooked HTML right into a format acceptable by a database. Lastly, the load
would save the information to the database. All strategies, executed on this order could be wrapped utilizing the run
methodology.
Now, after the ETL lessons are outlined, we wish to schedule every of them by way of the schedule
module in a pleasant style.
For brevity, within the following examples, let’s skip the inheritance and solely give attention to the run
methodology. Assume, that their extract
, remodel
and load
strategies are carried out elsewhere.
etl.py
class DummyETL: # usually DummyETL(BaseETL)
def __init__(self, init_param: int) -> None:
# tremendous().__init__() # - not wanted right here
self.init_param = init_paramdef run(self, p1: int, p2: int) -> None:
identify = self.__class__.__name__
print(f"{identify}({self.init_param}, p1={p1}, p2={p1})")
class EvenDummierETL: # similar...
def __init__(self, init_param: int) -> None:
# tremendous().__init__() # - similar
self.init_param = init_param
def run(self, p1: int) -> None:
identify = self.__class__.__name__
print(f"{identify}({self.init_param}, p1={p1})")
The constructors’ parameters can, as an example, specify the URLs of the pages for scraping. The run
strategies’ parameters, for a change, can be utilized to cross secrets and techniques.
Now, that we’ve got the ETL lessons outlined, let’s create a separate registry to affiliate the processes with some kind of schedule.
registry.py
import schedulefrom etl import DummyETL, EvenDummierETL
def get_registry():
dummy_etl = DummyETL(init_param=13)
dummier_etl = EvenDummierETL(init_param=15)
return [
(dummy_etl, schedule.every(1).seconds),
(dummier_etl, schedule.every(1).minutes.at(":05")),
]
The get_registry
perform is a spot to outline the schedule. Though the parameters’ values are hard-coded, you possibly can consider a state of affairs the place the perform hundreds them from a config file. Both means, it returns a listing of tuples that matches the ETL objects with Job
s (from schedule
). Word that that is our conference. The roles usually are not but related to any explicit Scheduler
(once more, from schedule
). Nevertheless, the conference permits us to take action in every other a part of the mission. We do not have to bind them with the module-level object, as proven within the documentation instance.
Lastly, let’s create a brand new class that can activate the entire mechanism.
scheduler.py
import time
from typing import Dict, Record, Tuple, TypeVarfrom schedule import Job, Scheduler
from etl import DummyETL, EvenDummierETL
from etl import E # we may accomplish that from e.g. etl.base
S = TypeVar("Scheduler")
class TaskScheduler:
def __init__(self, registry: Record[Tuple[E, Job]]) -> None:
self.scheduler = Scheduler()
self.registry = []
for process, job in registry:
self.registry.append(process)
self.scheduler.jobs.append(job)
def register(self, run_params: Dict) -> S:
jobs = self.scheduler.get_jobs()
for process, job in zip(self.registry, jobs):
params = run_params.get(process.__class__.__name__)
job.do(process.run, **params)
return self
def run(self, polling_seconds: int) -> None:
whereas True:
time.sleep(polling_seconds)
self.scheduler.run_pending()
Our TaskScheduler
makes use of composition to create a single Scheduler
occasion and add beforehand registered jobs to it. Though not enforced, we use typing
to provide a powerful trace on what needs to be offered to the constructor to correctly register the roles. Then, the register
methodology is a separate methodology that gives the binding. Final, however not least, run
prompts the equipment.
A script that makes use of this implementation would seem like this:
run.py
from registry import get_registry
from scheduler import TaskSchedulerif __name__ == "__main__":
run_params = {
"DummyETL": dict(p1=1, p2=2), # e.g. from env vars
"EvenDummierETL": dict(p1=3),
}
registry = get_registry() # e.g. from script's args or config
task_scheduler = TaskScheduler(registry).register(run_params)
task_scheduler.run()
Most likely the weakest level of this answer is the conference that makes use of the __class__.__name__
as keys within the run_params
dictionary. Nevertheless, contemplating the simplicity of the method, it could be OK, particularly if these parameters could be outlined at runtime. There are various alternate options, certainly one of which could possibly be creating a further abstraction layer with e.g. objects like DummyTask
that may function a bridge between ETL objects and the registry.
Coming again to the TaskScheduler
, we are able to additionally outline it by way of inheritance versus composition (as earlier than). That might imply increasing the performance of the schedule
‘s native Scheduler
class. On this case, the TaskScheduler
could be the next:
class TaskScheduler(Scheduler): # <- right here
def __init__(self, registry: Record[Tuple[E, Job]]) -> None:
tremendous().__init__() # <- right here
self.registry = []for process, job in registry:
self.registry.append(process)
self.jobs.append(job) # <- right here
def register(self, run_params: Dict) -> S:
jobs = self.get_jobs() # <- right here
for process, job in zip(self.registry, jobs):
params = run_params.get(process.__class__.__name__)
job.do(process.run, **params)
return self
def run(self, polling_seconds: int) -> None:
whereas True:
time.sleep(polling_seconds)
self.run_pending() # <- and right here
You determine which means is healthier if any ;).
On this transient article, we’ve got proven how the easy schedule
module could be expanded to create a small ETL working machine. Most significantly, the method permits to raised set up the code inside a small mission with out having to fetch the large cannons.