Wednesday, June 22, 2022
HomeData ScienceHandy Scheduler in Python. Small-scale handy method to… | by Oleg Żero...

Handy Scheduler in Python. Small-scale handy method to… | by Oleg Żero | Jun, 2022


A small-scale handy method to scheduling ETL duties with python.

An example of an ethernally pending task…
An instance of an eternally pending process, Poland 2021. (Picture by the creator).

Python has turn out to be an all-purpose language. It’s particularly generally utilized in analytics and fixing algorithmic issues inside knowledge science however can also be standard in internet growth. This mix makes it an affordable alternative for varied extract-transform-load (ETL) duties.

Nevertheless, many of those duties are fairly small and don’t require giant frameworks reminiscent of Airflow or Luigi. When polling one or a number of internet pages for knowledge, a easy python script plus crontab is greater than adequate. Nonetheless, when a mission will get just a little larger, managing a number of jobs utilizing cron might turn out to be cumbersome. On the similar time, naked set up of Airflow for “small jobs” wants a minimum of 4GB RAM and a pair of CPUs (right here). Fascinated by AWS prices, it’s a minimum of a t2.small occasion operating always.

Is there something in between? Sufficiently small to make use of, say t2.nano (very low-cost) and pretty “maintainable” and “extendable”?

On this put up, I wish to share with you a easy method that makes use of python’s schedule package deal with a number of modifications.

Python schedule library presents easy process scheduling. It’s installable utilizing pip, and pretty simple to make use of. Sadly, the documentation does not present examples of utilizing it inside a bigger mission:

import schedule
import time

def job():
print("I am working...")

# Run job each 3 second/minute/hour/day/week,
# Beginning 3 second/minute/hour/day/week from now
schedule.each(3).seconds.do(job)
schedule.each(3).minutes.do(job)
schedule.each(3).hours.do(job)
schedule.each(3).days.do(job)
schedule.each(3).weeks.do(job)

# Run job each minute on the twenty third second
schedule.each().minute.at(":23").do(job)

# Run job each hour on the 42rd minute
schedule.each().hour.at(":42").do(job)

# Run jobs each fifth hour, 20 minutes and 30 seconds in.
# If present time is 02:00, first execution is at 06:20:30
schedule.each(5).hours.at("20:30").do(job)

# Run job day by day at particular HH:MM and subsequent HH:MM:SS
schedule.each().day.at("10:30").do(job)
schedule.each().day.at("10:30:42").do(job)

# Run job on a selected day of the week
schedule.each().monday.do(job)
schedule.each().wednesday.at("13:15").do(job)
schedule.each().minute.at(":17").do(job)

whereas True:
schedule.run_pending()
time.sleep(1)

As you possibly can see, all features are referred to as on the stage of the module, which is OK for putting it in a script. Nevertheless, if in case you have a number of completely different jobs, the code shortly turns into cluttered, particularly if completely different callables require completely different parameters.

In different phrases, it could preferable to reap the benefits of the object-oriented method and outline some “structure” round it.

Let’s say, for the sake of an argument, that we’ve got a set of devoted ETL duties, modeled utilizing the next summary class:

from abc import ABC, abstractmethod
from typing import Any, Dict, TypeVar

E = TypeVar("ETL")

class BaseETL(ABC):
def __init__(self, **kwargs: Dict) -> None:
self.raw_data = None
self.transformed_data = None

@abstractmethod
def extract(self, **kwargs: Dict) -> E:
...

@abstractmethod
def remodel(self, **kwargs: Dict) -> E:
...

@abstractmethod
def load(self, **kwargs: Dict) -> Any:
...

def run(self, **kwargs: Dict) -> None:
self.extract(**kwargs).remodel(**kwargs).load(**kwargs)

Any class that may implement an ETL course of would inherit from this base class. The extract methodology may, for instance, fetch a web site. Then remodel would remodel the uncooked HTML right into a format acceptable by a database. Lastly, the load would save the information to the database. All strategies, executed on this order could be wrapped utilizing the run methodology.

Now, after the ETL lessons are outlined, we wish to schedule every of them by way of the schedule module in a pleasant style.

For brevity, within the following examples, let’s skip the inheritance and solely give attention to the run methodology. Assume, that their extract, remodel and load strategies are carried out elsewhere.

etl.py

class DummyETL:  # usually DummyETL(BaseETL)
def __init__(self, init_param: int) -> None:
# tremendous().__init__() # - not wanted right here
self.init_param = init_param

def run(self, p1: int, p2: int) -> None:
identify = self.__class__.__name__
print(f"{identify}({self.init_param}, p1={p1}, p2={p1})")

class EvenDummierETL: # similar...
def __init__(self, init_param: int) -> None:
# tremendous().__init__() # - similar
self.init_param = init_param

def run(self, p1: int) -> None:
identify = self.__class__.__name__
print(f"{identify}({self.init_param}, p1={p1})")

The constructors’ parameters can, as an example, specify the URLs of the pages for scraping. The run strategies’ parameters, for a change, can be utilized to cross secrets and techniques.

Now, that we’ve got the ETL lessons outlined, let’s create a separate registry to affiliate the processes with some kind of schedule.

registry.py

import schedule

from etl import DummyETL, EvenDummierETL

def get_registry():
dummy_etl = DummyETL(init_param=13)
dummier_etl = EvenDummierETL(init_param=15)

return [
(dummy_etl, schedule.every(1).seconds),
(dummier_etl, schedule.every(1).minutes.at(":05")),
]

The get_registry perform is a spot to outline the schedule. Though the parameters’ values are hard-coded, you possibly can consider a state of affairs the place the perform hundreds them from a config file. Both means, it returns a listing of tuples that matches the ETL objects with Jobs (from schedule). Word that that is our conference. The roles usually are not but related to any explicit Scheduler (once more, from schedule). Nevertheless, the conference permits us to take action in every other a part of the mission. We do not have to bind them with the module-level object, as proven within the documentation instance.

Lastly, let’s create a brand new class that can activate the entire mechanism.

scheduler.py

import time
from typing import Dict, Record, Tuple, TypeVar

from schedule import Job, Scheduler

from etl import DummyETL, EvenDummierETL
from etl import E # we may accomplish that from e.g. etl.base

S = TypeVar("Scheduler")

class TaskScheduler:
def __init__(self, registry: Record[Tuple[E, Job]]) -> None:
self.scheduler = Scheduler()
self.registry = []

for process, job in registry:
self.registry.append(process)
self.scheduler.jobs.append(job)

def register(self, run_params: Dict) -> S:
jobs = self.scheduler.get_jobs()
for process, job in zip(self.registry, jobs):
params = run_params.get(process.__class__.__name__)
job.do(process.run, **params)

return self

def run(self, polling_seconds: int) -> None:
whereas True:
time.sleep(polling_seconds)
self.scheduler.run_pending()

Our TaskScheduler makes use of composition to create a single Scheduler occasion and add beforehand registered jobs to it. Though not enforced, we use typing to provide a powerful trace on what needs to be offered to the constructor to correctly register the roles. Then, the register methodology is a separate methodology that gives the binding. Final, however not least, run prompts the equipment.

A script that makes use of this implementation would seem like this:

run.py

from registry import get_registry
from scheduler import TaskScheduler

if __name__ == "__main__":
run_params = {
"DummyETL": dict(p1=1, p2=2), # e.g. from env vars
"EvenDummierETL": dict(p1=3),
}

registry = get_registry() # e.g. from script's args or config
task_scheduler = TaskScheduler(registry).register(run_params)
task_scheduler.run()

Most likely the weakest level of this answer is the conference that makes use of the __class__.__name__ as keys within the run_params dictionary. Nevertheless, contemplating the simplicity of the method, it could be OK, particularly if these parameters could be outlined at runtime. There are various alternate options, certainly one of which could possibly be creating a further abstraction layer with e.g. objects like DummyTask that may function a bridge between ETL objects and the registry.

Coming again to the TaskScheduler, we are able to additionally outline it by way of inheritance versus composition (as earlier than). That might imply increasing the performance of the schedule‘s native Scheduler class. On this case, the TaskScheduler could be the next:

class TaskScheduler(Scheduler):  # <- right here
def __init__(self, registry: Record[Tuple[E, Job]]) -> None:
tremendous().__init__() # <- right here
self.registry = []

for process, job in registry:
self.registry.append(process)
self.jobs.append(job) # <- right here

def register(self, run_params: Dict) -> S:
jobs = self.get_jobs() # <- right here
for process, job in zip(self.registry, jobs):
params = run_params.get(process.__class__.__name__)
job.do(process.run, **params)

return self

def run(self, polling_seconds: int) -> None:
whereas True:
time.sleep(polling_seconds)
self.run_pending() # <- and right here

You determine which means is healthier if any ;).

On this transient article, we’ve got proven how the easy schedule module could be expanded to create a small ETL working machine. Most significantly, the method permits to raised set up the code inside a small mission with out having to fetch the large cannons.

RELATED ARTICLES

LEAVE A REPLY

Please enter your comment!
Please enter your name here

- Advertisment -
Google search engine

Most Popular

Recent Comments