diff --git a/backend/btrixcloud/db.py b/backend/btrixcloud/db.py index 0b62c645..0153401e 100644 --- a/backend/btrixcloud/db.py +++ b/backend/btrixcloud/db.py @@ -10,7 +10,10 @@ import motor.motor_asyncio from pydantic import BaseModel, UUID4 from pymongo.errors import InvalidName -from .worker import by_one_worker +from .migrations import BaseMigration + + +CURR_DB_VERSION = "0005" # ============================================================================ @@ -47,7 +50,6 @@ def init_db(): # ============================================================================ -@by_one_worker("/app/btrixcloud/worker-pid.file") async def update_and_prepare_db( # pylint: disable=R0913 mdb, @@ -64,9 +66,9 @@ async def update_and_prepare_db( - Create/update superuser - Create/update default org - Run all tasks in order in a single worker. """ - if await run_db_migrations(mdb): + print("Database setup started", flush=True) + if await run_db_migrations(mdb, user_manager): await drop_indexes(mdb) await create_indexes(org_ops, crawl_config_ops, coll_ops, invite_ops) await user_manager.create_super_user() @@ -75,8 +77,19 @@ async def update_and_prepare_db( # ============================================================================ -async def run_db_migrations(mdb): +async def run_db_migrations(mdb, user_manager): """Run database migrations.""" + + # if first run, just set version and exit + if not await user_manager.get_superuser(): + base_migration = BaseMigration(mdb, CURR_DB_VERSION) + await base_migration.set_db_version() + print( + "New DB, no migration needed, set version to: " + CURR_DB_VERSION, + flush=True, + ) + return False + migrations_run = False migrations_path = "/app/btrixcloud/migrations" module_files = [ diff --git a/backend/btrixcloud/main.py b/backend/btrixcloud/main.py index db815938..c543563b 100644 --- a/backend/btrixcloud/main.py +++ b/backend/btrixcloud/main.py @@ -3,9 +3,8 @@ main file for browsertrix-api system supports docker and kubernetes based deployments of multiple browsertrix-crawlers """ import os -import signal -import sys import asyncio +import sys from fastapi import FastAPI from fastapi.routing import APIRouter @@ -26,8 +25,8 @@ from .colls import init_collections_api from .crawls import init_crawls_api from .crawlmanager import CrawlManager +from .utils import run_once_lock, register_exit_handler -# pylint: disable=duplicate-code API_PREFIX = "/api" app_root = FastAPI( @@ -111,11 +110,13 @@ def main(): crawl_config_ops.set_coll_ops(coll_ops) - asyncio.create_task( - update_and_prepare_db( - mdb, user_manager, org_ops, crawl_config_ops, coll_ops, invites + # run only in first worker + if run_once_lock("btrix-init-db"): + asyncio.create_task( + update_and_prepare_db( + mdb, user_manager, org_ops, crawl_config_ops, coll_ops, invites + ) ) - ) app.include_router(org_ops.router) @@ -140,13 +141,5 @@ def main(): @app_root.on_event("startup") async def startup(): """init on startup""" - loop = asyncio.get_running_loop() - loop.add_signal_handler(signal.SIGTERM, exit_handler) - + register_exit_handler() main() - - -def exit_handler(): - """sigterm handler""" - print("SIGTERM received, exiting") - sys.exit(1) diff --git a/backend/btrixcloud/main_op.py b/backend/btrixcloud/main_op.py index 21655aff..73077104 100644 --- a/backend/btrixcloud/main_op.py +++ b/backend/btrixcloud/main_op.py @@ -1,21 +1,16 @@ """ entrypoint module for operator """ -import signal -import sys -import asyncio from fastapi import FastAPI + from .operator import init_operator_webhook +from .utils import register_exit_handler -API_PREFIX = "/api" -app_root = FastAPI( - docs_url=API_PREFIX + "/docs", - redoc_url=API_PREFIX + "/redoc", - openapi_url=API_PREFIX + "/openapi.json", -) +app_root = FastAPI() +# ============================================================================ def main(): """main init""" init_operator_webhook(app_root) @@ -25,13 +20,5 @@ def main(): @app_root.on_event("startup") async def startup(): """init on startup""" - loop = asyncio.get_running_loop() - loop.add_signal_handler(signal.SIGTERM, exit_handler) - + register_exit_handler() main() - - -def exit_handler(): - """sigterm handler""" - print("SIGTERM received, exiting") - sys.exit(1) diff --git a/backend/btrixcloud/main_scheduled_job.py b/backend/btrixcloud/main_scheduled_job.py index e2434af6..f4cb04bf 100644 --- a/backend/btrixcloud/main_scheduled_job.py +++ b/backend/btrixcloud/main_scheduled_job.py @@ -8,6 +8,7 @@ from .k8sapi import K8sAPI from .db import init_db from .crawlconfigs import get_crawl_config, inc_crawl_count from .crawls import add_new_crawl +from .utils import register_exit_handler # ============================================================================ @@ -58,4 +59,5 @@ def main(): if __name__ == "__main__": + register_exit_handler() main() diff --git a/backend/btrixcloud/users.py b/backend/btrixcloud/users.py index e827d01e..a9a8ae90 100644 --- a/backend/btrixcloud/users.py +++ b/backend/btrixcloud/users.py @@ -156,6 +156,10 @@ class UserManager(BaseUserManager[UserCreate, UserDB]): ) return await cursor.to_list(length=1000) + async def get_superuser(self): + """return current superuser, if any""" + return await self.user_db.collection.find_one({"is_superuser": True}) + async def create_super_user(self): """Initialize a super user from env vars""" email = os.environ.get("SUPERUSER_EMAIL") @@ -167,9 +171,7 @@ class UserManager(BaseUserManager[UserCreate, UserDB]): if not password: password = passlib.pwd.genword() - curr_superuser_res = await self.user_db.collection.find_one( - {"is_superuser": True} - ) + curr_superuser_res = await self.get_superuser() if curr_superuser_res: user = UserDB(**curr_superuser_res) update = {"password": password} diff --git a/backend/btrixcloud/utils.py b/backend/btrixcloud/utils.py index 03ff0c7c..c82f48c5 100644 --- a/backend/btrixcloud/utils.py +++ b/backend/btrixcloud/utils.py @@ -1,6 +1,11 @@ """ k8s utils """ import os +import asyncio +import sys +import signal +import atexit + from datetime import datetime from redis import asyncio as exceptions @@ -46,3 +51,35 @@ async def get_redis_crawl_stats(redis, crawl_id): stats = {"found": pages_found, "done": pages_done, "size": archive_size} return stats + + +def run_once_lock(name): + """run once lock via temp directory + - if dir doesn't exist, return true + - if exists, return false""" + lock_dir = "/tmp/." + name + try: + os.mkdir(lock_dir) + # pylint: disable=bare-except + except: + return False + + # just in case, delete dir on exit + def del_dir(): + print("release lock: " + lock_dir, flush=True) + os.rmdir(lock_dir) + + atexit.register(del_dir) + return True + + +def register_exit_handler(): + """register exit handler to exit on SIGTERM""" + loop = asyncio.get_running_loop() + + def exit_handler(): + """sigterm handler""" + print("SIGTERM received, exiting") + sys.exit(1) + + loop.add_signal_handler(signal.SIGTERM, exit_handler) diff --git a/backend/btrixcloud/worker.py b/backend/btrixcloud/worker.py deleted file mode 100644 index 749dc8be..00000000 --- a/backend/btrixcloud/worker.py +++ /dev/null @@ -1,47 +0,0 @@ -""" -Unique Worker exposed as decorator by_one_worker -""" - -from pathlib import Path -import os -from functools import cached_property - - -class UniqueWorker: - """Class to run async tasks in single worker only.""" - - def __init__(self, path): - self.path = Path(path) - self.pid = str(os.getpid()) - self.set_id() - - def set_id(self): - """Create path to pid file and write to pid.""" - if not self.path.exists(): - self.path.parents[0].mkdir(parents=True, exist_ok=True) - - with open(self.path, "w", encoding="utf-8") as pid_file: - pid_file.write(self.pid) - - @cached_property - def is_assigned(self): - """Check if worker has been assigned to unique worker.""" - with open(self.path, "r", encoding="utf-8") as pid_file: - assigned_worker = pid_file.read() - - return assigned_worker == self.pid - - -def by_one_worker(worker_pid_path): - """Decorator which runs function in unique worker.""" - unique_worker = UniqueWorker(worker_pid_path) - - def deco(pid_path): - def wrapped(*args, **kwargs): - if not unique_worker.is_assigned: - return "" - return pid_path(*args, **kwargs) - - return wrapped - - return deco