From 8281ba723eb794b549a086d65a68a35d342878ad Mon Sep 17 00:00:00 2001 From: Tessa Walsh Date: Fri, 5 May 2023 18:12:52 -0400 Subject: [PATCH] Pre-compute workflow last crawl information (#812) * Precompute config crawl stats * Includes a database migration to move preciously dynamically computed crawl stats for workflows into the CrawlConfig model. * Add crawls.finished descending index * Add last crawl fields to workflow tests --- backend/btrixcloud/crawlconfigs.py | 204 ++++++++---------- backend/btrixcloud/crawls.py | 64 ++---- backend/btrixcloud/db.py | 8 +- backend/btrixcloud/main.py | 2 +- .../migration_0006_precompute_crawl_stats.py | 31 +++ backend/btrixcloud/operator.py | 7 +- backend/test/test_crawlconfigs.py | 20 +- 7 files changed, 170 insertions(+), 166 deletions(-) create mode 100644 backend/btrixcloud/migrations/migration_0006_precompute_crawl_stats.py diff --git a/backend/btrixcloud/crawlconfigs.py b/backend/btrixcloud/crawlconfigs.py index 0220bfe2..907fd88b 100644 --- a/backend/btrixcloud/crawlconfigs.py +++ b/backend/btrixcloud/crawlconfigs.py @@ -178,6 +178,16 @@ class CrawlConfig(CrawlConfigCore): rev: int = 0 + crawlCount: Optional[int] = 0 + totalSize: Optional[int] = 0 + + lastCrawlId: Optional[str] + lastCrawlStartTime: Optional[datetime] + lastStartedBy: Optional[UUID4] + lastCrawlTime: Optional[datetime] + lastCrawlState: Optional[str] + lastCrawlSize: Optional[int] + def get_raw_config(self): """serialize config for browsertrix-crawler""" return self.config.dict(exclude_unset=True, exclude_none=True) @@ -200,15 +210,6 @@ class CrawlConfigOut(CrawlConfig): firstSeed: Optional[str] - totalSize: Optional[int] = 0 - - crawlCount: Optional[int] = 0 - lastCrawlId: Optional[str] - lastCrawlStartTime: Optional[datetime] - lastCrawlTime: Optional[datetime] - lastCrawlState: Optional[str] - lastCrawlSize: Optional[int] - # ============================================================================ class CrawlConfigIdNameOut(BaseMongoModel): @@ -241,6 +242,7 @@ class CrawlConfigOps: def __init__(self, dbclient, mdb, user_manager, org_ops, crawl_manager, profiles): self.dbclient = dbclient + self.crawls = mdb["crawls"] self.crawl_configs = mdb["crawl_configs"] self.config_revs = mdb["configs_revs"] self.user_manager = user_manager @@ -264,7 +266,7 @@ class CrawlConfigOps: self.crawl_ops = ops async def init_index(self): - """init index for crawls db""" + """init index for crawlconfigs db collection""" await self.crawl_configs.create_index( [("oid", pymongo.HASHED), ("inactive", pymongo.ASCENDING)] ) @@ -509,84 +511,6 @@ class CrawlConfigOps: # Set firstSeed {"$set": {"firstSeed": "$firstSeedObject.url"}}, {"$unset": ["firstSeedObject"]}, - { - "$lookup": { - "from": "crawls", - "localField": "_id", - "foreignField": "cid", - "as": "configCrawls", - }, - }, - # Filter workflow crawls on finished and active - { - "$set": { - "finishedCrawls": { - "$filter": { - "input": "$configCrawls", - "as": "filterCrawls", - "cond": { - "$and": [ - {"$ne": ["$$filterCrawls.finished", None]}, - {"$ne": ["$$filterCrawls.inactive", True]}, - ] - }, - } - } - } - }, - # Set crawl count to number of finished crawls - {"$set": {"crawlCount": {"$size": "$finishedCrawls"}}}, - # Sort finished crawls by finished time descending to get latest - { - "$set": { - "sortedCrawls": { - "$sortArray": { - "input": "$finishedCrawls", - "sortBy": {"finished": -1}, - } - } - } - }, - {"$unset": ["finishedCrawls"]}, - {"$set": {"lastCrawl": {"$arrayElemAt": ["$sortedCrawls", 0]}}}, - {"$set": {"lastCrawlId": "$lastCrawl._id"}}, - {"$set": {"lastCrawlStartTime": "$lastCrawl.started"}}, - {"$set": {"lastCrawlTime": "$lastCrawl.finished"}}, - {"$set": {"lastCrawlState": "$lastCrawl.state"}}, - # Get userid of last started crawl - {"$set": {"lastStartedBy": "$lastCrawl.userid"}}, - {"$set": {"lastCrawlSize": {"$sum": "$lastCrawl.files.size"}}}, - { - "$lookup": { - "from": "users", - "localField": "lastStartedBy", - "foreignField": "id", - "as": "lastStartedByName", - }, - }, - { - "$set": { - "lastStartedByName": { - "$arrayElemAt": ["$lastStartedByName.name", 0] - } - } - }, - { - "$set": { - "totalSize": { - "$sum": { - "$map": { - "input": "$sortedCrawls.files", - "as": "crawlFile", - "in": {"$arrayElemAt": ["$$crawlFile.size", 0]}, - } - } - } - } - }, - # unset - {"$unset": ["lastCrawl"]}, - {"$unset": ["sortedCrawls"]}, ] if first_seed: @@ -611,6 +535,19 @@ class CrawlConfigOps: }, }, {"$set": {"createdByName": {"$arrayElemAt": ["$userName.name", 0]}}}, + { + "$lookup": { + "from": "users", + "localField": "lastStartedBy", + "foreignField": "id", + "as": "startedName", + }, + }, + { + "$set": { + "lastStartedByName": {"$arrayElemAt": ["$startedName.name", 0]} + } + }, { "$lookup": { "from": "users", @@ -648,22 +585,12 @@ class CrawlConfigOps: except (IndexError, ValueError): total = 0 - # crawls = await self.crawl_manager.list_running_crawls(oid=org.id) - crawls, _ = await self.crawl_ops.list_crawls( - org=org, - running_only=True, - # Set high so that when we lower default we still get all running crawls - page_size=1_000, - ) - running = {} - for crawl in crawls: - running[crawl.cid] = crawl - configs = [] for res in items: config = CrawlConfigOut.from_dict(res) # pylint: disable=invalid-name - self._add_curr_crawl_stats(config, running.get(config.id)) + if not config.inactive: + self._add_curr_crawl_stats(config, await self.get_running_crawl(config)) configs.append(config) return configs, total @@ -693,20 +620,9 @@ class CrawlConfigOps: return None - async def _annotate_with_crawl_stats(self, crawlconfig: CrawlConfigOut): - """Annotate crawlconfig with information about associated crawls""" - crawl_stats = await self.crawl_ops.get_latest_crawl_and_count_by_config( - cid=crawlconfig.id - ) - crawlconfig.crawlCount = crawl_stats["crawl_count"] - crawlconfig.totalSize = crawl_stats["total_size"] - crawlconfig.lastCrawlId = crawl_stats["last_crawl_id"] - crawlconfig.lastCrawlStartTime = crawl_stats["last_crawl_started"] - crawlconfig.lastCrawlTime = crawl_stats["last_crawl_finished"] - crawlconfig.lastStartedByName = crawl_stats["last_started_by"] - crawlconfig.lastCrawlState = crawl_stats["last_crawl_state"] - crawlconfig.lastCrawlSize = crawl_stats["last_crawl_size"] - return crawlconfig + async def update_crawl_stats(self, cid: uuid.UUID): + """Update crawl count, total size, and last crawl information for config.""" + await update_config_crawl_stats(self.crawl_configs, self.crawls, cid) def _add_curr_crawl_stats(self, crawlconfig, crawl): """Add stats from current running crawl, if any""" @@ -745,13 +661,17 @@ class CrawlConfigOps: if modified_user: crawlconfig.modifiedByName = modified_user.name + if crawlconfig.lastStartedBy: + last_started_user = await self.user_manager.get(crawlconfig.lastStartedBy) + # pylint: disable=invalid-name + if last_started_user: + crawlconfig.lastStartedByName = last_started_user.name + if crawlconfig.profileid: crawlconfig.profileName = await self.profiles.get_profile_name( crawlconfig.profileid, org ) - crawlconfig = await self._annotate_with_crawl_stats(crawlconfig) - return crawlconfig async def get_crawl_config( @@ -957,6 +877,58 @@ async def inc_crawl_count(crawl_configs, cid: uuid.UUID): ) +# ============================================================================ +# pylint: disable=too-many-locals +async def update_config_crawl_stats(crawl_configs, crawls, cid: uuid.UUID): + """re-calculate and update crawl statistics for config""" + update_query = { + "crawlCount": 0, + "totalSize": 0, + "lastCrawlId": None, + "lastCrawlStartTime": None, + "lastStartedBy": None, + "lastCrawlTime": None, + "lastCrawlState": None, + "lastCrawlSize": None, + } + + match_query = {"cid": cid, "finished": {"$ne": None}, "inactive": {"$ne": True}} + cursor = crawls.find(match_query).sort("finished", pymongo.DESCENDING) + results = await cursor.to_list(length=10_000) + if results: + update_query["crawlCount"] = len(results) + + last_crawl = results[0] + update_query["lastCrawlId"] = str(last_crawl.get("_id")) + update_query["lastCrawlStartTime"] = last_crawl.get("started") + update_query["lastStartedBy"] = last_crawl.get("userid") + update_query["lastCrawlTime"] = last_crawl.get("finished") + update_query["lastCrawlState"] = last_crawl.get("state") + update_query["lastCrawlSize"] = sum( + file_.get("size", 0) for file_ in last_crawl.get("files", []) + ) + + total_size = 0 + for res in results: + files = res.get("files", []) + for file in files: + total_size += file.get("size", 0) + update_query["totalSize"] = total_size + + result = await crawl_configs.find_one_and_update( + {"_id": cid, "inactive": {"$ne": True}}, + {"$set": update_query}, + return_document=pymongo.ReturnDocument.AFTER, + ) + + if not result: + raise HTTPException( + status_code=404, detail=f"Crawl Config '{cid}' not found to update" + ) + + return result + + # ============================================================================ # pylint: disable=redefined-builtin,invalid-name,too-many-locals,too-many-arguments def init_crawl_config_api( diff --git a/backend/btrixcloud/crawls.py b/backend/btrixcloud/crawls.py index df1d8f61..bdbeddc3 100644 --- a/backend/btrixcloud/crawls.py +++ b/backend/btrixcloud/crawls.py @@ -196,6 +196,10 @@ class CrawlOps: self.presign_duration = int(os.environ.get("PRESIGN_DURATION_SECONDS", 3600)) + async def init_index(self): + """init index for crawls db collection""" + await self.crawls.create_index([("finished", pymongo.DESCENDING)]) + async def list_crawls( self, org: Optional[Organization] = None, @@ -380,45 +384,6 @@ class CrawlOps: return await self._resolve_crawl_refs(crawl, org) - async def get_latest_crawl_and_count_by_config(self, cid: str): - """Get crawl statistics for a crawl_config with id cid.""" - stats = { - "crawl_count": 0, - "total_size": 0, - "last_crawl_id": None, - "last_crawl_started": None, - "last_crawl_finished": None, - "last_crawl_state": None, - "last_started_by": None, - "last_crawl_size": 0, - } - - match_query = {"cid": cid, "finished": {"$ne": None}, "inactive": {"$ne": True}} - cursor = self.crawls.find(match_query).sort("finished", pymongo.DESCENDING) - results = await cursor.to_list(length=1000) - if results: - stats["crawl_count"] = len(results) - - last_crawl = Crawl.from_dict(results[0]) - stats["last_crawl_id"] = str(last_crawl.id) - stats["last_crawl_started"] = last_crawl.started - stats["last_crawl_finished"] = last_crawl.finished - stats["last_crawl_state"] = last_crawl.state - stats["last_crawl_size"] = sum(file_.size for file_ in last_crawl.files) - - user = await self.user_manager.get(last_crawl.userid) - if user: - stats["last_started_by"] = user.name - - total_size = 0 - for res in results: - files = res["files"] - for file in files: - total_size += file["size"] - stats["total_size"] = total_size - - return stats - async def _resolve_crawl_refs( self, crawl: Union[CrawlOut, ListCrawlOut], @@ -527,13 +492,22 @@ class CrawlOps: async def delete_crawls(self, org: Organization, delete_list: DeleteCrawlList): """Delete a list of crawls by id for given org""" + cids_to_update = set() + for crawl_id in delete_list.crawl_ids: await self._delete_crawl_files(org, crawl_id) + await self.remove_crawl_from_collections(org, crawl_id) + + crawl = await self.get_crawl_raw(crawl_id, org) + cids_to_update.add(crawl["cid"]) res = await self.crawls.delete_many( {"_id": {"$in": delete_list.crawl_ids}, "oid": org.id} ) + for cid in cids_to_update: + await self.crawl_configs.update_crawl_stats(cid) + return res.deleted_count async def _delete_crawl_files(self, org: Organization, crawl_id: str): @@ -837,7 +811,7 @@ class CrawlOps: return resp - async def remove_crawl_from_collections(self, oid: uuid.UUID, crawl_id: str): + async def remove_crawl_from_collections(self, org: Organization, crawl_id: str): """Remove crawl with given crawl_id from all collections it belongs to""" collections = [ coll["name"] @@ -845,7 +819,7 @@ class CrawlOps: ] for collection_name in collections: await self.collections.find_one_and_update( - {"name": collection_name, "oid": oid}, + {"name": collection_name, "oid": org.id}, {"$pull": {"crawlIds": crawl_id}}, ) @@ -884,7 +858,11 @@ async def add_new_crawl( # ============================================================================ async def update_crawl(crawls, crawl_id, **kwargs): """update crawl state in db""" - await crawls.find_one_and_update({"_id": crawl_id}, {"$set": kwargs}) + return await crawls.find_one_and_update( + {"_id": crawl_id}, + {"$set": kwargs}, + return_document=pymongo.ReturnDocument.AFTER, + ) # ============================================================================ @@ -1056,8 +1034,6 @@ def init_crawls_api(app, mdb, users, crawl_manager, crawl_config_ops, orgs, user status_code=400, detail=f"Error Stopping Crawl: {exc}" ) - await ops.remove_crawl_from_collections(crawl.oid, crawl.id) - res = await ops.delete_crawls(org, delete_list) return {"deleted": res} diff --git a/backend/btrixcloud/db.py b/backend/btrixcloud/db.py index 0153401e..29c1cb2b 100644 --- a/backend/btrixcloud/db.py +++ b/backend/btrixcloud/db.py @@ -13,7 +13,7 @@ from pymongo.errors import InvalidName from .migrations import BaseMigration -CURR_DB_VERSION = "0005" +CURR_DB_VERSION = "0006" # ============================================================================ @@ -55,6 +55,7 @@ async def update_and_prepare_db( mdb, user_manager, org_ops, + crawl_ops, crawl_config_ops, coll_ops, invite_ops, @@ -70,7 +71,7 @@ async def update_and_prepare_db( print("Database setup started", flush=True) if await run_db_migrations(mdb, user_manager): await drop_indexes(mdb) - await create_indexes(org_ops, crawl_config_ops, coll_ops, invite_ops) + await create_indexes(org_ops, crawl_ops, crawl_config_ops, coll_ops, invite_ops) await user_manager.create_super_user() await org_ops.create_default_org() print("Database updated and ready", flush=True) @@ -134,10 +135,11 @@ async def drop_indexes(mdb): # ============================================================================ -async def create_indexes(org_ops, crawl_config_ops, coll_ops, invite_ops): +async def create_indexes(org_ops, crawl_ops, crawl_config_ops, coll_ops, invite_ops): """Create database indexes.""" print("Creating database indexes", flush=True) await org_ops.init_index() + await crawl_ops.init_index() await crawl_config_ops.init_index() await coll_ops.init_index() await invite_ops.init_index() diff --git a/backend/btrixcloud/main.py b/backend/btrixcloud/main.py index c543563b..dc8a123c 100644 --- a/backend/btrixcloud/main.py +++ b/backend/btrixcloud/main.py @@ -114,7 +114,7 @@ def main(): if run_once_lock("btrix-init-db"): asyncio.create_task( update_and_prepare_db( - mdb, user_manager, org_ops, crawl_config_ops, coll_ops, invites + mdb, user_manager, org_ops, crawls, crawl_config_ops, coll_ops, invites ) ) diff --git a/backend/btrixcloud/migrations/migration_0006_precompute_crawl_stats.py b/backend/btrixcloud/migrations/migration_0006_precompute_crawl_stats.py new file mode 100644 index 00000000..83f0961d --- /dev/null +++ b/backend/btrixcloud/migrations/migration_0006_precompute_crawl_stats.py @@ -0,0 +1,31 @@ +""" +Migration 0006 - Precomputing workflow crawl stats +""" +from btrixcloud.crawlconfigs import update_config_crawl_stats +from btrixcloud.migrations import BaseMigration + + +MIGRATION_VERSION = "0006" + + +class Migration(BaseMigration): + """Migration class.""" + + def __init__(self, mdb, migration_version=MIGRATION_VERSION): + super().__init__(mdb, migration_version) + + async def migrate_up(self): + """Perform migration up. + + Add data on workflow crawl statistics that was previously dynamically + computed when needed to the database. + """ + crawl_configs = self.mdb["crawl_configs"] + crawls = self.mdb["crawls"] + + configs = [res async for res in crawl_configs.find({})] + if not configs: + return + + for config in configs: + await update_config_crawl_stats(crawl_configs, crawls, config["_id"]) diff --git a/backend/btrixcloud/operator.py b/backend/btrixcloud/operator.py index 22da514a..5f178917 100644 --- a/backend/btrixcloud/operator.py +++ b/backend/btrixcloud/operator.py @@ -19,6 +19,7 @@ from .k8sapi import K8sAPI from .db import init_db from .orgs import inc_org_stats +from .crawlconfigs import update_config_crawl_stats from .crawls import ( CrawlFile, CrawlCompleteIn, @@ -97,6 +98,7 @@ class BtrixOperator(K8sAPI): _, mdb = init_db() self.crawls = mdb["crawls"] + self.crawl_configs = mdb["crawl_configs"] self.orgs = mdb["organizations"] self.done_key = "crawls-done" @@ -446,7 +448,10 @@ class BtrixOperator(K8sAPI): if stats: kwargs["stats"] = stats - await update_crawl(self.crawls, crawl_id, **kwargs) + crawl = await update_crawl(self.crawls, crawl_id, **kwargs) + crawl_cid = crawl.get("cid") + + await update_config_crawl_stats(self.crawl_configs, self.crawls, crawl_cid) if redis: await self.add_crawl_errors_to_db(redis, crawl_id) diff --git a/backend/test/test_crawlconfigs.py b/backend/test/test_crawlconfigs.py index 2d1249ec..d0dcbe09 100644 --- a/backend/test/test_crawlconfigs.py +++ b/backend/test/test_crawlconfigs.py @@ -208,7 +208,7 @@ def test_verify_revs_history(crawler_auth_headers, default_org_id): assert sorted_data[0]["config"]["scopeType"] == "prefix" -def test_workflow_total_size( +def test_workflow_total_size_and_last_crawl_stats( crawler_auth_headers, default_org_id, admin_crawl_id, crawler_crawl_id ): admin_crawl_cid = "" @@ -225,8 +225,18 @@ def test_workflow_total_size( last_crawl_id = workflow.get("lastCrawlId") if last_crawl_id and last_crawl_id in (admin_crawl_id, crawler_crawl_id): assert workflow["totalSize"] > 0 + assert workflow["crawlCount"] > 0 + + assert workflow["lastCrawlId"] + assert workflow["lastCrawlStartTime"] + assert workflow["lastStartedByName"] + assert workflow["lastCrawlTime"] + assert workflow["lastCrawlState"] + assert workflow["lastCrawlSize"] > 0 + if last_crawl_id == admin_crawl_id: admin_crawl_cid = workflow["id"] + assert admin_crawl_cid else: assert workflow["totalSize"] == 0 @@ -237,3 +247,11 @@ def test_workflow_total_size( assert r.status_code == 200 data = r.json() assert data["totalSize"] > 0 + assert data["crawlCount"] > 0 + + assert data["lastCrawlId"] + assert data["lastCrawlStartTime"] + assert data["lastStartedByName"] + assert data["lastCrawlTime"] + assert data["lastCrawlState"] + assert data["lastCrawlSize"] > 0