48 lines
1.2 KiB
Python
48 lines
1.2 KiB
Python
"""
|
|
Unique Worker exposed as decorator by_one_worker
|
|
"""
|
|
|
|
from pathlib import Path
|
|
import os
|
|
from functools import cached_property
|
|
|
|
|
|
class UniqueWorker:
|
|
"""Class to run async tasks in single worker only."""
|
|
|
|
def __init__(self, path):
|
|
self.path = Path(path)
|
|
self.pid = str(os.getpid())
|
|
self.set_id()
|
|
|
|
def set_id(self):
|
|
"""Create path to pid file and write to pid."""
|
|
if not self.path.exists():
|
|
self.path.parents[0].mkdir(parents=True, exist_ok=True)
|
|
|
|
with open(self.path, "w", encoding="utf-8") as pid_file:
|
|
pid_file.write(self.pid)
|
|
|
|
@cached_property
|
|
def is_assigned(self):
|
|
"""Check if worker has been assigned to unique worker."""
|
|
with open(self.path, "r", encoding="utf-8") as pid_file:
|
|
assigned_worker = pid_file.read()
|
|
|
|
return assigned_worker == self.pid
|
|
|
|
|
|
def by_one_worker(worker_pid_path):
|
|
"""Decorator which runs function in unique worker."""
|
|
unique_worker = UniqueWorker(worker_pid_path)
|
|
|
|
def deco(pid_path):
|
|
def wrapped(*args, **kwargs):
|
|
if not unique_worker.is_assigned:
|
|
return ""
|
|
return pid_path(*args, **kwargs)
|
|
|
|
return wrapped
|
|
|
|
return deco
|