From 9a3356ad0d0046d45788113729937144b910831d Mon Sep 17 00:00:00 2001 From: Ilya Kreymer Date: Wed, 25 Aug 2021 16:18:53 -0700 Subject: [PATCH] add missing scheduler! --- backend/scheduler.py | 61 ++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 61 insertions(+) create mode 100644 backend/scheduler.py diff --git a/backend/scheduler.py b/backend/scheduler.py new file mode 100644 index 00000000..773dddb6 --- /dev/null +++ b/backend/scheduler.py @@ -0,0 +1,61 @@ +""" Standalone scheduler app for Docker deployment""" + +# import json + +from apscheduler.schedulers.background import BackgroundScheduler +from apscheduler.triggers.cron import CronTrigger +from apscheduler.jobstores.mongodb import MongoDBJobStore +from pymongo import MongoClient + +from db import DATABASE_URL + +# pylint: disable=invalid-name +global_trigger_q = None + + +def trigger_crawl(**kwargs): + """ send crawl trigger message """ + print("crawl triggered", kwargs, flush=True) + global_trigger_q.put(kwargs) + + +def run_scheduler(event_q, trigger_q): + """ init scheduler + start tcp server """ + + # pylint: disable=global-statement + global global_trigger_q + global_trigger_q = trigger_q + + print("Initializing Scheduler...", flush=True) + + scheduler = BackgroundScheduler() + + mongoclient = MongoClient(DATABASE_URL) + + scheduler.add_jobstore(MongoDBJobStore(client=mongoclient)) + + scheduler.start() + + print("Scheduler Ready", flush=True) + + while True: + msg = event_q.get() + + try: + if msg.get("schedule"): + print(f"Setting Schedule: {msg['cid']} {msg['schedule']}", flush=True) + scheduler.add_job( + func=trigger_crawl, + trigger=CronTrigger.from_crontab(msg["schedule"]), + id=msg["cid"], + kwargs={"cid": msg["cid"], "schedule": msg["schedule"]}, + replace_existing=True, + ) + + else: + print(f"Removing Schedule: {msg['cid']}", flush=True) + scheduler.remove_job(job_id=msg["cid"]) + + # pylint: disable=broad-except + except Exception as exc: + print(exc)