Optimize workflow statistics updates (#892)

* optimizations:
- rename update_crawl_config_stats to stats_recompute_all, only used in migration to fetch all crawls
and do a full recompute of all file sizes
- add stats_recompute_last to only get last crawl by size, increment total size by specified amount, and incr/decr number of crawls
- Update migration 0007 to use stats_recompute_all
- Add isCrawlRunning, lastCrawlStopping, and lastRun to
stats_recompute_last
- Increment crawlSuccessfulCount in stats_recompute_last

* operator/crawls:
- operator: keep track of filesAddedSize in redis as well
- rename update_crawl to update_crawl_state_if_changed() and only update
if state is different, otherwise return false
- ensure mark_finished() operations only occur if crawl is state has changed
- don't clear 'stopping' flag, can track if crawl was stopped
- state always starts with "starting", don't reset to starting

tests:
- Add test for incremental workflow stats updating
- don't clear stopping==true, indicates crawl was manually stopped

---------
Co-authored-by: Ilya Kreymer <ikreymer@gmail.com>
This commit is contained in:
Tessa Walsh 2023-05-27 01:57:08 -04:00 committed by GitHub
parent 9c7a312a4c
commit df4c4e6c5a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 218 additions and 66 deletions

View File

@ -658,9 +658,11 @@ class CrawlConfigOps:
return None
async def update_crawl_stats(self, cid: uuid.UUID):
"""Update crawl count, total size, and last crawl information for config."""
result = await update_config_crawl_stats(self.crawl_configs, self.crawls, cid)
async def stats_recompute_remove_crawl(self, cid: uuid.UUID, size: int):
"""Update last crawl, crawl count and total size by removing size of last crawl"""
result = await stats_recompute_last(
self.crawl_configs, self.crawls, cid, -size, -1
)
if not result:
raise HTTPException(
status_code=404, detail=f"Crawl Config '{cid}' not found to update"
@ -943,7 +945,7 @@ async def set_config_current_crawl_info(
# ============================================================================
# pylint: disable=too-many-locals
async def update_config_crawl_stats(crawl_configs, crawls, cid: uuid.UUID):
async def stats_recompute_all(crawl_configs, crawls, cid: uuid.UUID):
"""Re-calculate and update crawl statistics for config.
Should only be called when a crawl completes from operator or on migration
@ -1005,6 +1007,57 @@ async def update_config_crawl_stats(crawl_configs, crawls, cid: uuid.UUID):
return result
# ============================================================================
async def stats_recompute_last(
crawl_configs, crawls, cid: uuid.UUID, size: int, inc_crawls=1
):
"""recompute stats by incrementing size counter and number of crawls"""
update_query = {
"lastCrawlId": None,
"lastCrawlStartTime": None,
"lastStartedBy": None,
"lastCrawlTime": None,
"lastCrawlState": None,
"lastCrawlSize": None,
"lastCrawlStopping": False,
"isCrawlRunning": False,
}
match_query = {"cid": cid, "finished": {"$ne": None}, "inactive": {"$ne": True}}
last_crawl = await crawls.find_one(
match_query, sort=[("finished", pymongo.DESCENDING)]
)
if last_crawl:
last_crawl_finished = last_crawl.get("finished")
update_query["lastCrawlId"] = str(last_crawl.get("_id"))
update_query["lastCrawlStartTime"] = last_crawl.get("started")
update_query["lastStartedBy"] = last_crawl.get("userid")
update_query["lastCrawlTime"] = last_crawl_finished
update_query["lastCrawlState"] = last_crawl.get("state")
update_query["lastCrawlSize"] = sum(
file_.get("size", 0) for file_ in last_crawl.get("files", [])
)
if last_crawl_finished:
update_query["lastRun"] = last_crawl_finished
result = await crawl_configs.find_one_and_update(
{"_id": cid, "inactive": {"$ne": True}},
{
"$set": update_query,
"$inc": {
"totalSize": size,
"crawlCount": inc_crawls,
"crawlSuccessfulCount": inc_crawls,
},
},
)
return result
# ============================================================================
# pylint: disable=redefined-builtin,invalid-name,too-many-locals,too-many-arguments
def init_crawl_config_api(

View File

@ -504,9 +504,10 @@ class CrawlOps:
"""Delete a list of crawls by id for given org"""
cids_to_update = set()
for crawl_id in delete_list.crawl_ids:
await self._delete_crawl_files(org, crawl_id)
size = 0
for crawl_id in delete_list.crawl_ids:
size += await self._delete_crawl_files(org, crawl_id)
crawl = await self.get_crawl_raw(crawl_id, org)
cids_to_update.add(crawl["cid"])
@ -515,7 +516,7 @@ class CrawlOps:
)
for cid in cids_to_update:
await self.crawl_configs.update_crawl_stats(cid)
await self.crawl_configs.stats_recompute_remove_crawl(cid, size)
return res.deleted_count
@ -523,11 +524,15 @@ class CrawlOps:
"""Delete files associated with crawl from storage."""
crawl_raw = await self.get_crawl_raw(crawl_id, org)
crawl = Crawl.from_dict(crawl_raw)
size = 0
for file_ in crawl.files:
size += file_.size
status_code = await delete_crawl_file_object(org, file_, self.crawl_manager)
if status_code != 204:
raise HTTPException(status_code=400, detail="file_deletion_error")
return size
async def get_wacz_files(self, crawl_id: str, org: Organization):
"""Return list of WACZ files associated with crawl."""
wacz_files = []
@ -586,22 +591,6 @@ class CrawlOps:
return True
async def update_crawl_state(self, crawl_id: str, state: str):
"""called only when job container is being stopped/canceled"""
data = {"state": state}
# if cancelation, set the finish time here
if state == "canceled":
data["finished"] = dt_now()
await self.crawls.find_one_and_update(
{
"_id": crawl_id,
"state": {"$in": ["running", "starting", "canceling", "stopping"]},
},
{"$set": data},
)
async def shutdown_crawl(self, crawl_id: str, org: Organization, graceful: bool):
"""stop or cancel specified crawl"""
result = None
@ -625,15 +614,8 @@ class CrawlOps:
status_code=404, detail=f"crawl_not_found, (details: {exc})"
)
# if job no longer running, canceling is considered success,
# but graceful stoppage is not possible, so would be a failure
if result.get("error") == "job_not_running":
if not graceful:
await self.update_crawl_state(crawl_id, "canceled")
return {"success": True}
# return whatever detail may be included in the response
raise HTTPException(status_code=400, detail=result.get("error"))
raise HTTPException(status_code=400, detail=result)
async def _crawl_queue_len(self, redis, key):
try:
@ -898,13 +880,16 @@ async def add_new_crawl(
# ============================================================================
async def update_crawl(crawls, crawl_id, **kwargs):
"""update crawl state in db"""
return await crawls.find_one_and_update(
{"_id": crawl_id},
async def update_crawl_state_if_changed(crawls, crawl_id, state, **kwargs):
"""update crawl state and other properties in db if state has changed"""
kwargs["state"] = state
res = await crawls.find_one_and_update(
{"_id": crawl_id, "state": {"$ne": state}},
{"$set": kwargs},
return_document=pymongo.ReturnDocument.AFTER,
)
print("** UPDATE", crawl_id, state, res is not None)
return res
# ============================================================================

View File

@ -1,7 +1,7 @@
"""
Migration 0006 - Precomputing workflow crawl stats
"""
from btrixcloud.crawlconfigs import update_config_crawl_stats
from btrixcloud.crawlconfigs import stats_recompute_all
from btrixcloud.migrations import BaseMigration
@ -31,7 +31,7 @@ class Migration(BaseMigration):
for config in configs:
config_id = config["_id"]
try:
await update_config_crawl_stats(crawl_configs, crawls, config_id)
await stats_recompute_all(crawl_configs, crawls, config_id)
# pylint: disable=broad-exception-caught
except Exception as err:
print(f"Unable to update workflow {config_id}: {err}", flush=True)

View File

@ -4,7 +4,7 @@ Migration 0007 - Workflows changes
- Rename colls to autoAddCollections
- Re-calculate workflow crawl stats to populate crawlSuccessfulCount
"""
from btrixcloud.crawlconfigs import update_config_crawl_stats
from btrixcloud.crawlconfigs import stats_recompute_all
from btrixcloud.migrations import BaseMigration
@ -31,7 +31,7 @@ class Migration(BaseMigration):
for config in configs:
config_id = config["_id"]
try:
await update_config_crawl_stats(crawl_configs, crawls, config_id)
await stats_recompute_all(crawl_configs, crawls, config_id)
# pylint: disable=broad-exception-caught
except Exception as err:
print(f"Unable to update workflow {config_id}: {err}", flush=True)

View File

@ -14,18 +14,23 @@ import humanize
from pydantic import BaseModel
from redis import asyncio as aioredis
from .utils import from_k8s_date, to_k8s_date, dt_now, get_redis_crawl_stats
from .utils import (
from_k8s_date,
to_k8s_date,
dt_now,
get_redis_crawl_stats,
)
from .k8sapi import K8sAPI
from .db import init_db
from .orgs import inc_org_stats
from .colls import add_successful_crawl_to_collections
from .crawlconfigs import update_config_crawl_stats
from .crawlconfigs import stats_recompute_last
from .crawls import (
CrawlFile,
CrawlCompleteIn,
add_crawl_file,
update_crawl,
update_crawl_state_if_changed,
add_crawl_errors,
SUCCESSFUL_STATES,
)
@ -80,13 +85,16 @@ class CrawlSpec(BaseModel):
class CrawlStatus(BaseModel):
"""status from k8s CrawlJob object"""
state: str = "new"
state: str = "starting"
pagesFound: int = 0
pagesDone: int = 0
size: str = ""
scale: int = 1
filesAdded: int = 0
filesAddedSize: int = 0
finished: Optional[str] = None
stopping: bool = False
# forceRestart: Optional[str]
# ============================================================================
@ -194,8 +202,6 @@ class BtrixOperator(K8sAPI):
if has_crawl_children:
pods = data.related[POD]
status = await self.sync_crawl_state(redis_url, crawl, status, pods)
elif not status.finished:
status.state = "starting"
if status.finished:
return await self.handle_finished_delete_if_needed(crawl_id, status, spec)
@ -319,7 +325,7 @@ class BtrixOperator(K8sAPI):
pvcs = list(related[PVC].keys())
if pvcs:
print("Deleting PVCs", pvcs)
print(f"Deleting PVCs for {crawl_id}", pvcs)
asyncio.create_task(self.delete_pvc(crawl_id))
finalized = False
@ -356,14 +362,15 @@ class BtrixOperator(K8sAPI):
msg = json.loads(file_done)
# add completed file
if msg.get("filename"):
await self.add_file_to_crawl(msg, crawl)
await self.add_file_to_crawl(msg, crawl, redis)
await redis.incr("filesAdded")
# get next file done
file_done = await redis.lpop(self.done_key)
# ensure filesAdded always set
# ensure filesAdded and filesAddedSize always set
status.filesAdded = int(await redis.get("filesAdded") or 0)
status.filesAddedSize = int(await redis.get("filesAddedSize") or 0)
# update stats and get status
return await self.update_crawl_state(redis, crawl, status, pods)
@ -378,7 +385,6 @@ class BtrixOperator(K8sAPI):
"""check if at least one crawler pod has started"""
try:
for pod in pods.values():
print("Phase", pod["status"]["phase"])
if pod["status"]["phase"] == "Running":
return True
# pylint: disable=bare-except
@ -388,7 +394,7 @@ class BtrixOperator(K8sAPI):
return False
async def add_file_to_crawl(self, cc_data, crawl):
async def add_file_to_crawl(self, cc_data, crawl, redis):
"""Handle finished CrawlFile to db"""
filecomplete = CrawlCompleteIn(**cc_data)
@ -408,6 +414,8 @@ class BtrixOperator(K8sAPI):
hash=filecomplete.hash,
)
await redis.incr("filesAddedSize", filecomplete.size)
await add_crawl_file(self.crawls, crawl.id, crawl_file)
return True
@ -436,7 +444,9 @@ class BtrixOperator(K8sAPI):
# otherwise, mark as 'waiting' and return
if not await self.check_if_pods_running(pods):
if status.state not in ("waiting", "canceled"):
await update_crawl(self.crawls, crawl.id, state="waiting")
await update_crawl_state_if_changed(
self.crawls, crawl.id, state="waiting"
)
status.state = "waiting"
return status
@ -445,7 +455,7 @@ class BtrixOperator(K8sAPI):
# will set stats at when crawl is finished, otherwise can read
# directly from redis
if status.state != "running":
await update_crawl(self.crawls, crawl.id, state="running")
await update_crawl_state_if_changed(self.crawls, crawl.id, state="running")
# update status
status.state = "running"
@ -499,28 +509,46 @@ class BtrixOperator(K8sAPI):
self, redis, crawl_id, cid, status, state, crawl=None, stats=None
):
"""mark crawl as finished, set finished timestamp and final state"""
# already marked as finished
if status.state == state:
print("already finished, ignoring mark_finished")
return status
finished = dt_now()
status.state = state
status.finished = to_k8s_date(finished)
if crawl:
await self.inc_crawl_complete_stats(crawl, finished)
kwargs = {"state": state, "finished": finished}
kwargs = {"finished": finished}
if stats:
kwargs["stats"] = stats
await update_crawl(self.crawls, crawl_id, **kwargs)
if not await update_crawl_state_if_changed(
self.crawls, crawl_id, state=state, **kwargs
):
print("already finished, ignoring mark_finished")
return status
asyncio.create_task(self.do_crawl_finished_tasks(redis, crawl_id, cid, state))
if crawl:
await self.inc_crawl_complete_stats(crawl, finished)
asyncio.create_task(
self.do_crawl_finished_tasks(
redis, crawl_id, cid, status.filesAddedSize, state
)
)
return status
# pylint: disable=too-many-arguments
async def do_crawl_finished_tasks(self, redis, crawl_id, cid, state):
async def do_crawl_finished_tasks(
self, redis, crawl_id, cid, files_added_size, state
):
"""Run tasks after crawl completes in asyncio.task coroutine."""
await update_config_crawl_stats(self.crawl_configs, self.crawls, cid)
await stats_recompute_last(
self.crawl_configs, self.crawls, cid, files_added_size, 1
)
if redis:
await self.add_crawl_errors_to_db(redis, crawl_id)

View File

@ -1,3 +1,5 @@
import time
import requests
from .conftest import API_PREFIX
@ -9,6 +11,7 @@ UPDATED_DESCRIPTION = "Updated description"
UPDATED_TAGS = ["tag3", "tag4"]
_coll_id = None
_admin_crawl_cid = None
def test_add_crawl_config(crawler_auth_headers, default_org_id, sample_crawl_data):
@ -231,8 +234,6 @@ def test_verify_revs_history(crawler_auth_headers, default_org_id):
def test_workflow_total_size_and_last_crawl_stats(
crawler_auth_headers, default_org_id, admin_crawl_id, crawler_crawl_id
):
admin_crawl_cid = ""
r = requests.get(
f"{API_PREFIX}/orgs/{default_org_id}/crawlconfigs",
headers=crawler_auth_headers,
@ -257,13 +258,14 @@ def test_workflow_total_size_and_last_crawl_stats(
assert workflow["lastCrawlSize"] > 0
if last_crawl_id == admin_crawl_id:
admin_crawl_cid = workflow["id"]
assert admin_crawl_cid
global _admin_crawl_cid
_admin_crawl_cid = workflow["id"]
assert _admin_crawl_cid
else:
assert workflow["totalSize"] == 0
r = requests.get(
f"{API_PREFIX}/orgs/{default_org_id}/crawlconfigs/{admin_crawl_cid}",
f"{API_PREFIX}/orgs/{default_org_id}/crawlconfigs/{_admin_crawl_cid}",
headers=crawler_auth_headers,
)
assert r.status_code == 200
@ -279,3 +281,87 @@ def test_workflow_total_size_and_last_crawl_stats(
assert data["lastCrawlState"]
assert data["lastRun"]
assert data["lastCrawlSize"] > 0
def test_incremental_workflow_total_size_and_last_crawl_stats(
crawler_auth_headers, default_org_id, admin_crawl_id, crawler_crawl_id
):
# Get baseline values
r = requests.get(
f"{API_PREFIX}/orgs/{default_org_id}/crawlconfigs/{_admin_crawl_cid}",
headers=crawler_auth_headers,
)
assert r.status_code == 200
data = r.json()
assert data["crawlCount"] == 1
assert data["crawlSuccessfulCount"] == 1
total_size = data["totalSize"]
last_crawl_id = data["lastCrawlId"]
last_crawl_started = data["lastCrawlStartTime"]
last_crawl_finished = data["lastCrawlTime"]
last_run = data["lastRun"]
# Run new crawl in this workflow
r = requests.post(
f"{API_PREFIX}/orgs/{default_org_id}/crawlconfigs/{_admin_crawl_cid}/run",
headers=crawler_auth_headers,
)
assert r.status_code == 200
crawl_id = r.json()["started"]
# Wait for it to complete
while True:
r = requests.get(
f"{API_PREFIX}/orgs/{default_org_id}/crawls/{crawl_id}/replay.json",
headers=crawler_auth_headers,
)
data = r.json()
if data["state"] == "complete":
break
time.sleep(5)
# Give time for stats to re-compute
time.sleep(10)
# Re-check stats
r = requests.get(
f"{API_PREFIX}/orgs/{default_org_id}/crawlconfigs/{_admin_crawl_cid}",
headers=crawler_auth_headers,
)
assert r.status_code == 200
data = r.json()
assert data["crawlCount"] == 2
assert data["crawlSuccessfulCount"] == 2
assert data["totalSize"] > total_size
assert data["lastCrawlId"] == crawl_id
assert data["lastCrawlStartTime"] > last_crawl_started
assert data["lastCrawlTime"] > last_crawl_finished
assert data["lastRun"] > last_run
# Delete new crawl
r = requests.post(
f"{API_PREFIX}/orgs/{default_org_id}/crawls/delete",
headers=crawler_auth_headers,
json={"crawl_ids": [crawl_id]},
)
assert r.status_code == 200
data = r.json()
assert data["deleted"] == 1
# Re-check stats
r = requests.get(
f"{API_PREFIX}/orgs/{default_org_id}/crawlconfigs/{_admin_crawl_cid}",
headers=crawler_auth_headers,
)
assert r.status_code == 200
data = r.json()
assert data["crawlCount"] == 1
assert data["crawlSuccessfulCount"] == 1
assert data["totalSize"] == total_size
assert data["lastCrawlId"] == last_crawl_id
assert data["lastCrawlStartTime"] == last_crawl_started
assert data["lastCrawlTime"] == last_crawl_finished
assert data["lastRun"] == last_run

View File

@ -150,7 +150,7 @@ def test_stop_crawl_partial(
while data["state"] == "running":
data = get_crawl(default_org_id, crawler_auth_headers, crawl_id)
assert data["state"] == "partial_complete"
assert data["state"] in ("partial_complete", "complete")
assert data["stopping"] == True
assert len(data["resources"]) == 1