From df4c4e6c5a481a0e3326ed361d482169c604c6ba Mon Sep 17 00:00:00 2001 From: Tessa Walsh Date: Sat, 27 May 2023 01:57:08 -0400 Subject: [PATCH] Optimize workflow statistics updates (#892) * optimizations: - rename update_crawl_config_stats to stats_recompute_all, only used in migration to fetch all crawls and do a full recompute of all file sizes - add stats_recompute_last to only get last crawl by size, increment total size by specified amount, and incr/decr number of crawls - Update migration 0007 to use stats_recompute_all - Add isCrawlRunning, lastCrawlStopping, and lastRun to stats_recompute_last - Increment crawlSuccessfulCount in stats_recompute_last * operator/crawls: - operator: keep track of filesAddedSize in redis as well - rename update_crawl to update_crawl_state_if_changed() and only update if state is different, otherwise return false - ensure mark_finished() operations only occur if crawl is state has changed - don't clear 'stopping' flag, can track if crawl was stopped - state always starts with "starting", don't reset to starting tests: - Add test for incremental workflow stats updating - don't clear stopping==true, indicates crawl was manually stopped --------- Co-authored-by: Ilya Kreymer --- backend/btrixcloud/crawlconfigs.py | 61 +++++++++++- backend/btrixcloud/crawls.py | 47 ++++----- .../migration_0006_precompute_crawl_stats.py | 4 +- .../migration_0007_colls_and_config_update.py | 4 +- backend/btrixcloud/operator.py | 70 ++++++++++---- backend/test/test_crawlconfigs.py | 96 ++++++++++++++++++- backend/test/test_stop_cancel_crawl.py | 2 +- 7 files changed, 218 insertions(+), 66 deletions(-) diff --git a/backend/btrixcloud/crawlconfigs.py b/backend/btrixcloud/crawlconfigs.py index 534920e3..181d252e 100644 --- a/backend/btrixcloud/crawlconfigs.py +++ b/backend/btrixcloud/crawlconfigs.py @@ -658,9 +658,11 @@ class CrawlConfigOps: return None - async def update_crawl_stats(self, cid: uuid.UUID): - """Update crawl count, total size, and last crawl information for config.""" - result = await update_config_crawl_stats(self.crawl_configs, self.crawls, cid) + async def stats_recompute_remove_crawl(self, cid: uuid.UUID, size: int): + """Update last crawl, crawl count and total size by removing size of last crawl""" + result = await stats_recompute_last( + self.crawl_configs, self.crawls, cid, -size, -1 + ) if not result: raise HTTPException( status_code=404, detail=f"Crawl Config '{cid}' not found to update" @@ -943,7 +945,7 @@ async def set_config_current_crawl_info( # ============================================================================ # pylint: disable=too-many-locals -async def update_config_crawl_stats(crawl_configs, crawls, cid: uuid.UUID): +async def stats_recompute_all(crawl_configs, crawls, cid: uuid.UUID): """Re-calculate and update crawl statistics for config. Should only be called when a crawl completes from operator or on migration @@ -1005,6 +1007,57 @@ async def update_config_crawl_stats(crawl_configs, crawls, cid: uuid.UUID): return result +# ============================================================================ +async def stats_recompute_last( + crawl_configs, crawls, cid: uuid.UUID, size: int, inc_crawls=1 +): + """recompute stats by incrementing size counter and number of crawls""" + update_query = { + "lastCrawlId": None, + "lastCrawlStartTime": None, + "lastStartedBy": None, + "lastCrawlTime": None, + "lastCrawlState": None, + "lastCrawlSize": None, + "lastCrawlStopping": False, + "isCrawlRunning": False, + } + + match_query = {"cid": cid, "finished": {"$ne": None}, "inactive": {"$ne": True}} + last_crawl = await crawls.find_one( + match_query, sort=[("finished", pymongo.DESCENDING)] + ) + + if last_crawl: + last_crawl_finished = last_crawl.get("finished") + + update_query["lastCrawlId"] = str(last_crawl.get("_id")) + update_query["lastCrawlStartTime"] = last_crawl.get("started") + update_query["lastStartedBy"] = last_crawl.get("userid") + update_query["lastCrawlTime"] = last_crawl_finished + update_query["lastCrawlState"] = last_crawl.get("state") + update_query["lastCrawlSize"] = sum( + file_.get("size", 0) for file_ in last_crawl.get("files", []) + ) + + if last_crawl_finished: + update_query["lastRun"] = last_crawl_finished + + result = await crawl_configs.find_one_and_update( + {"_id": cid, "inactive": {"$ne": True}}, + { + "$set": update_query, + "$inc": { + "totalSize": size, + "crawlCount": inc_crawls, + "crawlSuccessfulCount": inc_crawls, + }, + }, + ) + + return result + + # ============================================================================ # pylint: disable=redefined-builtin,invalid-name,too-many-locals,too-many-arguments def init_crawl_config_api( diff --git a/backend/btrixcloud/crawls.py b/backend/btrixcloud/crawls.py index 819021b0..e6ae3f77 100644 --- a/backend/btrixcloud/crawls.py +++ b/backend/btrixcloud/crawls.py @@ -504,9 +504,10 @@ class CrawlOps: """Delete a list of crawls by id for given org""" cids_to_update = set() - for crawl_id in delete_list.crawl_ids: - await self._delete_crawl_files(org, crawl_id) + size = 0 + for crawl_id in delete_list.crawl_ids: + size += await self._delete_crawl_files(org, crawl_id) crawl = await self.get_crawl_raw(crawl_id, org) cids_to_update.add(crawl["cid"]) @@ -515,7 +516,7 @@ class CrawlOps: ) for cid in cids_to_update: - await self.crawl_configs.update_crawl_stats(cid) + await self.crawl_configs.stats_recompute_remove_crawl(cid, size) return res.deleted_count @@ -523,11 +524,15 @@ class CrawlOps: """Delete files associated with crawl from storage.""" crawl_raw = await self.get_crawl_raw(crawl_id, org) crawl = Crawl.from_dict(crawl_raw) + size = 0 for file_ in crawl.files: + size += file_.size status_code = await delete_crawl_file_object(org, file_, self.crawl_manager) if status_code != 204: raise HTTPException(status_code=400, detail="file_deletion_error") + return size + async def get_wacz_files(self, crawl_id: str, org: Organization): """Return list of WACZ files associated with crawl.""" wacz_files = [] @@ -586,22 +591,6 @@ class CrawlOps: return True - async def update_crawl_state(self, crawl_id: str, state: str): - """called only when job container is being stopped/canceled""" - - data = {"state": state} - # if cancelation, set the finish time here - if state == "canceled": - data["finished"] = dt_now() - - await self.crawls.find_one_and_update( - { - "_id": crawl_id, - "state": {"$in": ["running", "starting", "canceling", "stopping"]}, - }, - {"$set": data}, - ) - async def shutdown_crawl(self, crawl_id: str, org: Organization, graceful: bool): """stop or cancel specified crawl""" result = None @@ -625,15 +614,8 @@ class CrawlOps: status_code=404, detail=f"crawl_not_found, (details: {exc})" ) - # if job no longer running, canceling is considered success, - # but graceful stoppage is not possible, so would be a failure - if result.get("error") == "job_not_running": - if not graceful: - await self.update_crawl_state(crawl_id, "canceled") - return {"success": True} - # return whatever detail may be included in the response - raise HTTPException(status_code=400, detail=result.get("error")) + raise HTTPException(status_code=400, detail=result) async def _crawl_queue_len(self, redis, key): try: @@ -898,13 +880,16 @@ async def add_new_crawl( # ============================================================================ -async def update_crawl(crawls, crawl_id, **kwargs): - """update crawl state in db""" - return await crawls.find_one_and_update( - {"_id": crawl_id}, +async def update_crawl_state_if_changed(crawls, crawl_id, state, **kwargs): + """update crawl state and other properties in db if state has changed""" + kwargs["state"] = state + res = await crawls.find_one_and_update( + {"_id": crawl_id, "state": {"$ne": state}}, {"$set": kwargs}, return_document=pymongo.ReturnDocument.AFTER, ) + print("** UPDATE", crawl_id, state, res is not None) + return res # ============================================================================ diff --git a/backend/btrixcloud/migrations/migration_0006_precompute_crawl_stats.py b/backend/btrixcloud/migrations/migration_0006_precompute_crawl_stats.py index c65d249a..007ca827 100644 --- a/backend/btrixcloud/migrations/migration_0006_precompute_crawl_stats.py +++ b/backend/btrixcloud/migrations/migration_0006_precompute_crawl_stats.py @@ -1,7 +1,7 @@ """ Migration 0006 - Precomputing workflow crawl stats """ -from btrixcloud.crawlconfigs import update_config_crawl_stats +from btrixcloud.crawlconfigs import stats_recompute_all from btrixcloud.migrations import BaseMigration @@ -31,7 +31,7 @@ class Migration(BaseMigration): for config in configs: config_id = config["_id"] try: - await update_config_crawl_stats(crawl_configs, crawls, config_id) + await stats_recompute_all(crawl_configs, crawls, config_id) # pylint: disable=broad-exception-caught except Exception as err: print(f"Unable to update workflow {config_id}: {err}", flush=True) diff --git a/backend/btrixcloud/migrations/migration_0007_colls_and_config_update.py b/backend/btrixcloud/migrations/migration_0007_colls_and_config_update.py index 5bae2d74..99a2b276 100644 --- a/backend/btrixcloud/migrations/migration_0007_colls_and_config_update.py +++ b/backend/btrixcloud/migrations/migration_0007_colls_and_config_update.py @@ -4,7 +4,7 @@ Migration 0007 - Workflows changes - Rename colls to autoAddCollections - Re-calculate workflow crawl stats to populate crawlSuccessfulCount """ -from btrixcloud.crawlconfigs import update_config_crawl_stats +from btrixcloud.crawlconfigs import stats_recompute_all from btrixcloud.migrations import BaseMigration @@ -31,7 +31,7 @@ class Migration(BaseMigration): for config in configs: config_id = config["_id"] try: - await update_config_crawl_stats(crawl_configs, crawls, config_id) + await stats_recompute_all(crawl_configs, crawls, config_id) # pylint: disable=broad-exception-caught except Exception as err: print(f"Unable to update workflow {config_id}: {err}", flush=True) diff --git a/backend/btrixcloud/operator.py b/backend/btrixcloud/operator.py index 79464486..c08227ef 100644 --- a/backend/btrixcloud/operator.py +++ b/backend/btrixcloud/operator.py @@ -14,18 +14,23 @@ import humanize from pydantic import BaseModel from redis import asyncio as aioredis -from .utils import from_k8s_date, to_k8s_date, dt_now, get_redis_crawl_stats +from .utils import ( + from_k8s_date, + to_k8s_date, + dt_now, + get_redis_crawl_stats, +) from .k8sapi import K8sAPI from .db import init_db from .orgs import inc_org_stats from .colls import add_successful_crawl_to_collections -from .crawlconfigs import update_config_crawl_stats +from .crawlconfigs import stats_recompute_last from .crawls import ( CrawlFile, CrawlCompleteIn, add_crawl_file, - update_crawl, + update_crawl_state_if_changed, add_crawl_errors, SUCCESSFUL_STATES, ) @@ -80,13 +85,16 @@ class CrawlSpec(BaseModel): class CrawlStatus(BaseModel): """status from k8s CrawlJob object""" - state: str = "new" + state: str = "starting" pagesFound: int = 0 pagesDone: int = 0 size: str = "" scale: int = 1 filesAdded: int = 0 + filesAddedSize: int = 0 finished: Optional[str] = None + stopping: bool = False + # forceRestart: Optional[str] # ============================================================================ @@ -194,8 +202,6 @@ class BtrixOperator(K8sAPI): if has_crawl_children: pods = data.related[POD] status = await self.sync_crawl_state(redis_url, crawl, status, pods) - elif not status.finished: - status.state = "starting" if status.finished: return await self.handle_finished_delete_if_needed(crawl_id, status, spec) @@ -319,7 +325,7 @@ class BtrixOperator(K8sAPI): pvcs = list(related[PVC].keys()) if pvcs: - print("Deleting PVCs", pvcs) + print(f"Deleting PVCs for {crawl_id}", pvcs) asyncio.create_task(self.delete_pvc(crawl_id)) finalized = False @@ -356,14 +362,15 @@ class BtrixOperator(K8sAPI): msg = json.loads(file_done) # add completed file if msg.get("filename"): - await self.add_file_to_crawl(msg, crawl) + await self.add_file_to_crawl(msg, crawl, redis) await redis.incr("filesAdded") # get next file done file_done = await redis.lpop(self.done_key) - # ensure filesAdded always set + # ensure filesAdded and filesAddedSize always set status.filesAdded = int(await redis.get("filesAdded") or 0) + status.filesAddedSize = int(await redis.get("filesAddedSize") or 0) # update stats and get status return await self.update_crawl_state(redis, crawl, status, pods) @@ -378,7 +385,6 @@ class BtrixOperator(K8sAPI): """check if at least one crawler pod has started""" try: for pod in pods.values(): - print("Phase", pod["status"]["phase"]) if pod["status"]["phase"] == "Running": return True # pylint: disable=bare-except @@ -388,7 +394,7 @@ class BtrixOperator(K8sAPI): return False - async def add_file_to_crawl(self, cc_data, crawl): + async def add_file_to_crawl(self, cc_data, crawl, redis): """Handle finished CrawlFile to db""" filecomplete = CrawlCompleteIn(**cc_data) @@ -408,6 +414,8 @@ class BtrixOperator(K8sAPI): hash=filecomplete.hash, ) + await redis.incr("filesAddedSize", filecomplete.size) + await add_crawl_file(self.crawls, crawl.id, crawl_file) return True @@ -436,7 +444,9 @@ class BtrixOperator(K8sAPI): # otherwise, mark as 'waiting' and return if not await self.check_if_pods_running(pods): if status.state not in ("waiting", "canceled"): - await update_crawl(self.crawls, crawl.id, state="waiting") + await update_crawl_state_if_changed( + self.crawls, crawl.id, state="waiting" + ) status.state = "waiting" return status @@ -445,7 +455,7 @@ class BtrixOperator(K8sAPI): # will set stats at when crawl is finished, otherwise can read # directly from redis if status.state != "running": - await update_crawl(self.crawls, crawl.id, state="running") + await update_crawl_state_if_changed(self.crawls, crawl.id, state="running") # update status status.state = "running" @@ -499,28 +509,46 @@ class BtrixOperator(K8sAPI): self, redis, crawl_id, cid, status, state, crawl=None, stats=None ): """mark crawl as finished, set finished timestamp and final state""" + + # already marked as finished + if status.state == state: + print("already finished, ignoring mark_finished") + return status + finished = dt_now() status.state = state status.finished = to_k8s_date(finished) - if crawl: - await self.inc_crawl_complete_stats(crawl, finished) - - kwargs = {"state": state, "finished": finished} + kwargs = {"finished": finished} if stats: kwargs["stats"] = stats - await update_crawl(self.crawls, crawl_id, **kwargs) + if not await update_crawl_state_if_changed( + self.crawls, crawl_id, state=state, **kwargs + ): + print("already finished, ignoring mark_finished") + return status - asyncio.create_task(self.do_crawl_finished_tasks(redis, crawl_id, cid, state)) + if crawl: + await self.inc_crawl_complete_stats(crawl, finished) + + asyncio.create_task( + self.do_crawl_finished_tasks( + redis, crawl_id, cid, status.filesAddedSize, state + ) + ) return status # pylint: disable=too-many-arguments - async def do_crawl_finished_tasks(self, redis, crawl_id, cid, state): + async def do_crawl_finished_tasks( + self, redis, crawl_id, cid, files_added_size, state + ): """Run tasks after crawl completes in asyncio.task coroutine.""" - await update_config_crawl_stats(self.crawl_configs, self.crawls, cid) + await stats_recompute_last( + self.crawl_configs, self.crawls, cid, files_added_size, 1 + ) if redis: await self.add_crawl_errors_to_db(redis, crawl_id) diff --git a/backend/test/test_crawlconfigs.py b/backend/test/test_crawlconfigs.py index 45b7b6b6..ad3a52aa 100644 --- a/backend/test/test_crawlconfigs.py +++ b/backend/test/test_crawlconfigs.py @@ -1,3 +1,5 @@ +import time + import requests from .conftest import API_PREFIX @@ -9,6 +11,7 @@ UPDATED_DESCRIPTION = "Updated description" UPDATED_TAGS = ["tag3", "tag4"] _coll_id = None +_admin_crawl_cid = None def test_add_crawl_config(crawler_auth_headers, default_org_id, sample_crawl_data): @@ -231,8 +234,6 @@ def test_verify_revs_history(crawler_auth_headers, default_org_id): def test_workflow_total_size_and_last_crawl_stats( crawler_auth_headers, default_org_id, admin_crawl_id, crawler_crawl_id ): - admin_crawl_cid = "" - r = requests.get( f"{API_PREFIX}/orgs/{default_org_id}/crawlconfigs", headers=crawler_auth_headers, @@ -257,13 +258,14 @@ def test_workflow_total_size_and_last_crawl_stats( assert workflow["lastCrawlSize"] > 0 if last_crawl_id == admin_crawl_id: - admin_crawl_cid = workflow["id"] - assert admin_crawl_cid + global _admin_crawl_cid + _admin_crawl_cid = workflow["id"] + assert _admin_crawl_cid else: assert workflow["totalSize"] == 0 r = requests.get( - f"{API_PREFIX}/orgs/{default_org_id}/crawlconfigs/{admin_crawl_cid}", + f"{API_PREFIX}/orgs/{default_org_id}/crawlconfigs/{_admin_crawl_cid}", headers=crawler_auth_headers, ) assert r.status_code == 200 @@ -279,3 +281,87 @@ def test_workflow_total_size_and_last_crawl_stats( assert data["lastCrawlState"] assert data["lastRun"] assert data["lastCrawlSize"] > 0 + + +def test_incremental_workflow_total_size_and_last_crawl_stats( + crawler_auth_headers, default_org_id, admin_crawl_id, crawler_crawl_id +): + # Get baseline values + r = requests.get( + f"{API_PREFIX}/orgs/{default_org_id}/crawlconfigs/{_admin_crawl_cid}", + headers=crawler_auth_headers, + ) + assert r.status_code == 200 + data = r.json() + + assert data["crawlCount"] == 1 + assert data["crawlSuccessfulCount"] == 1 + total_size = data["totalSize"] + last_crawl_id = data["lastCrawlId"] + last_crawl_started = data["lastCrawlStartTime"] + last_crawl_finished = data["lastCrawlTime"] + last_run = data["lastRun"] + + # Run new crawl in this workflow + r = requests.post( + f"{API_PREFIX}/orgs/{default_org_id}/crawlconfigs/{_admin_crawl_cid}/run", + headers=crawler_auth_headers, + ) + assert r.status_code == 200 + crawl_id = r.json()["started"] + + # Wait for it to complete + while True: + r = requests.get( + f"{API_PREFIX}/orgs/{default_org_id}/crawls/{crawl_id}/replay.json", + headers=crawler_auth_headers, + ) + data = r.json() + if data["state"] == "complete": + break + time.sleep(5) + + # Give time for stats to re-compute + time.sleep(10) + + # Re-check stats + r = requests.get( + f"{API_PREFIX}/orgs/{default_org_id}/crawlconfigs/{_admin_crawl_cid}", + headers=crawler_auth_headers, + ) + assert r.status_code == 200 + data = r.json() + + assert data["crawlCount"] == 2 + assert data["crawlSuccessfulCount"] == 2 + assert data["totalSize"] > total_size + assert data["lastCrawlId"] == crawl_id + assert data["lastCrawlStartTime"] > last_crawl_started + assert data["lastCrawlTime"] > last_crawl_finished + assert data["lastRun"] > last_run + + # Delete new crawl + r = requests.post( + f"{API_PREFIX}/orgs/{default_org_id}/crawls/delete", + headers=crawler_auth_headers, + json={"crawl_ids": [crawl_id]}, + ) + assert r.status_code == 200 + data = r.json() + assert data["deleted"] == 1 + + # Re-check stats + r = requests.get( + f"{API_PREFIX}/orgs/{default_org_id}/crawlconfigs/{_admin_crawl_cid}", + headers=crawler_auth_headers, + ) + assert r.status_code == 200 + data = r.json() + + assert data["crawlCount"] == 1 + assert data["crawlSuccessfulCount"] == 1 + assert data["totalSize"] == total_size + assert data["lastCrawlId"] == last_crawl_id + assert data["lastCrawlStartTime"] == last_crawl_started + assert data["lastCrawlTime"] == last_crawl_finished + assert data["lastRun"] == last_run diff --git a/backend/test/test_stop_cancel_crawl.py b/backend/test/test_stop_cancel_crawl.py index b2a4c53f..b13ef38f 100644 --- a/backend/test/test_stop_cancel_crawl.py +++ b/backend/test/test_stop_cancel_crawl.py @@ -150,7 +150,7 @@ def test_stop_crawl_partial( while data["state"] == "running": data = get_crawl(default_org_id, crawler_auth_headers, crawl_id) - assert data["state"] == "partial_complete" + assert data["state"] in ("partial_complete", "complete") assert data["stopping"] == True assert len(data["resources"]) == 1