webhook tweak: pass oid to crawl finished and upload finished webhooks (#1287)
Optimizes webhooks by passing oid directly to webhooks: - avoids extra crawl lookup - possible for crawl to be deleted before webhook is processed via operator (resulting in crawl lookup to fail) - add more typing to operator and webhooks
This commit is contained in:
parent
6d6fa03ade
commit
dc8d510b11
@ -362,7 +362,9 @@ class BtrixOperator(K8sAPI):
|
|||||||
# pylint: disable=bare-except, broad-except
|
# pylint: disable=bare-except, broad-except
|
||||||
except:
|
except:
|
||||||
# fail crawl if config somehow missing, shouldn't generally happen
|
# fail crawl if config somehow missing, shouldn't generally happen
|
||||||
await self.fail_crawl(crawl_id, uuid.UUID(cid), status, pods)
|
await self.fail_crawl(
|
||||||
|
crawl_id, uuid.UUID(cid), uuid.UUID(oid), status, pods
|
||||||
|
)
|
||||||
|
|
||||||
return self._empty_response(status)
|
return self._empty_response(status)
|
||||||
|
|
||||||
@ -661,7 +663,14 @@ class BtrixOperator(K8sAPI):
|
|||||||
)
|
)
|
||||||
return False
|
return False
|
||||||
|
|
||||||
async def cancel_crawl(self, crawl_id, cid, oid, status, pods):
|
async def cancel_crawl(
|
||||||
|
self,
|
||||||
|
crawl_id: str,
|
||||||
|
cid: uuid.UUID,
|
||||||
|
oid: uuid.UUID,
|
||||||
|
status: CrawlStatus,
|
||||||
|
pods: dict,
|
||||||
|
) -> bool:
|
||||||
"""Mark crawl as canceled"""
|
"""Mark crawl as canceled"""
|
||||||
if not await self.mark_finished(crawl_id, cid, oid, status, "canceled"):
|
if not await self.mark_finished(crawl_id, cid, oid, status, "canceled"):
|
||||||
return False
|
return False
|
||||||
@ -698,12 +707,20 @@ class BtrixOperator(K8sAPI):
|
|||||||
|
|
||||||
return status.canceled
|
return status.canceled
|
||||||
|
|
||||||
async def fail_crawl(self, crawl_id, cid, status, pods, stats=None):
|
async def fail_crawl(
|
||||||
|
self,
|
||||||
|
crawl_id: str,
|
||||||
|
cid: uuid.UUID,
|
||||||
|
oid: uuid.UUID,
|
||||||
|
status: CrawlStatus,
|
||||||
|
pods: dict,
|
||||||
|
stats=None,
|
||||||
|
) -> bool:
|
||||||
"""Mark crawl as failed, log crawl state and print crawl logs, if possible"""
|
"""Mark crawl as failed, log crawl state and print crawl logs, if possible"""
|
||||||
prev_state = status.state
|
prev_state = status.state
|
||||||
|
|
||||||
if not await self.mark_finished(
|
if not await self.mark_finished(
|
||||||
crawl_id, cid, None, status, "failed", stats=stats
|
crawl_id, cid, oid, status, "failed", stats=stats
|
||||||
):
|
):
|
||||||
return False
|
return False
|
||||||
|
|
||||||
@ -1138,7 +1155,9 @@ class BtrixOperator(K8sAPI):
|
|||||||
# check if one-page crawls actually succeeded
|
# check if one-page crawls actually succeeded
|
||||||
# if only one page found, and no files, assume failed
|
# if only one page found, and no files, assume failed
|
||||||
if status.pagesFound == 1 and not status.filesAdded:
|
if status.pagesFound == 1 and not status.filesAdded:
|
||||||
await self.fail_crawl(crawl.id, crawl.cid, status, pods, stats)
|
await self.fail_crawl(
|
||||||
|
crawl.id, crawl.cid, crawl.oid, status, pods, stats
|
||||||
|
)
|
||||||
return status
|
return status
|
||||||
|
|
||||||
completed = status.pagesDone and status.pagesDone >= status.pagesFound
|
completed = status.pagesDone and status.pagesDone >= status.pagesFound
|
||||||
@ -1157,7 +1176,9 @@ class BtrixOperator(K8sAPI):
|
|||||||
crawl.id, crawl.cid, crawl.oid, status, "canceled", crawl, stats
|
crawl.id, crawl.cid, crawl.oid, status, "canceled", crawl, stats
|
||||||
)
|
)
|
||||||
else:
|
else:
|
||||||
await self.fail_crawl(crawl.id, crawl.cid, status, pods, stats)
|
await self.fail_crawl(
|
||||||
|
crawl.id, crawl.cid, crawl.oid, status, pods, stats
|
||||||
|
)
|
||||||
|
|
||||||
# check for other statuses
|
# check for other statuses
|
||||||
else:
|
else:
|
||||||
@ -1181,8 +1202,15 @@ class BtrixOperator(K8sAPI):
|
|||||||
|
|
||||||
# pylint: disable=too-many-arguments
|
# pylint: disable=too-many-arguments
|
||||||
async def mark_finished(
|
async def mark_finished(
|
||||||
self, crawl_id, cid, oid, status, state, crawl=None, stats=None
|
self,
|
||||||
):
|
crawl_id: str,
|
||||||
|
cid: uuid.UUID,
|
||||||
|
oid: uuid.UUID,
|
||||||
|
status: CrawlStatus,
|
||||||
|
state: str,
|
||||||
|
crawl=None,
|
||||||
|
stats=None,
|
||||||
|
) -> bool:
|
||||||
"""mark crawl as finished, set finished timestamp and final state"""
|
"""mark crawl as finished, set finished timestamp and final state"""
|
||||||
|
|
||||||
finished = dt_now()
|
finished = dt_now()
|
||||||
@ -1192,9 +1220,9 @@ class BtrixOperator(K8sAPI):
|
|||||||
kwargs["stats"] = stats
|
kwargs["stats"] = stats
|
||||||
|
|
||||||
if state in SUCCESSFUL_STATES:
|
if state in SUCCESSFUL_STATES:
|
||||||
allowed_from = RUNNING_STATES
|
allowed_from = list(RUNNING_STATES)
|
||||||
else:
|
else:
|
||||||
allowed_from = RUNNING_AND_STARTING_STATES
|
allowed_from = list(RUNNING_AND_STARTING_STATES)
|
||||||
|
|
||||||
# if set_state returns false, already set to same status, return
|
# if set_state returns false, already set to same status, return
|
||||||
if not await self.set_state(
|
if not await self.set_state(
|
||||||
@ -1221,8 +1249,13 @@ class BtrixOperator(K8sAPI):
|
|||||||
|
|
||||||
# pylint: disable=too-many-arguments
|
# pylint: disable=too-many-arguments
|
||||||
async def do_crawl_finished_tasks(
|
async def do_crawl_finished_tasks(
|
||||||
self, crawl_id, cid, oid, files_added_size, state
|
self,
|
||||||
):
|
crawl_id: str,
|
||||||
|
cid: uuid.UUID,
|
||||||
|
oid: uuid.UUID,
|
||||||
|
files_added_size: int,
|
||||||
|
state: str,
|
||||||
|
) -> None:
|
||||||
"""Run tasks after crawl completes in asyncio.task coroutine."""
|
"""Run tasks after crawl completes in asyncio.task coroutine."""
|
||||||
await self.crawl_config_ops.stats_recompute_last(cid, files_added_size, 1)
|
await self.crawl_config_ops.stats_recompute_last(cid, files_added_size, 1)
|
||||||
|
|
||||||
@ -1230,7 +1263,9 @@ class BtrixOperator(K8sAPI):
|
|||||||
await self.org_ops.inc_org_bytes_stored(oid, files_added_size, "crawl")
|
await self.org_ops.inc_org_bytes_stored(oid, files_added_size, "crawl")
|
||||||
await self.coll_ops.add_successful_crawl_to_collections(crawl_id, cid)
|
await self.coll_ops.add_successful_crawl_to_collections(crawl_id, cid)
|
||||||
|
|
||||||
await self.event_webhook_ops.create_crawl_finished_notification(crawl_id, state)
|
await self.event_webhook_ops.create_crawl_finished_notification(
|
||||||
|
crawl_id, oid, state
|
||||||
|
)
|
||||||
|
|
||||||
# add crawl errors to db
|
# add crawl errors to db
|
||||||
await self.add_crawl_errors_to_db(crawl_id)
|
await self.add_crawl_errors_to_db(crawl_id)
|
||||||
|
|||||||
@ -186,7 +186,7 @@ class UploadOps(BaseCrawlOps):
|
|||||||
)
|
)
|
||||||
|
|
||||||
asyncio.create_task(
|
asyncio.create_task(
|
||||||
self.event_webhook_ops.create_upload_finished_notification(crawl_id)
|
self.event_webhook_ops.create_upload_finished_notification(crawl_id, org.id)
|
||||||
)
|
)
|
||||||
|
|
||||||
quota_reached = await self.orgs.inc_org_bytes_stored(
|
quota_reached = await self.orgs.inc_org_bytes_stored(
|
||||||
|
|||||||
@ -214,10 +214,11 @@ class EventWebhookOps:
|
|||||||
crawl_ids=[crawl_id], coll_id=coll_id, org=org
|
crawl_ids=[crawl_id], coll_id=coll_id, org=org
|
||||||
)
|
)
|
||||||
|
|
||||||
async def create_crawl_finished_notification(self, crawl_id: str, state: str):
|
async def create_crawl_finished_notification(
|
||||||
|
self, crawl_id: str, oid: uuid.UUID, state: str
|
||||||
|
) -> None:
|
||||||
"""Create webhook notification for finished crawl."""
|
"""Create webhook notification for finished crawl."""
|
||||||
crawl_res = await self.crawls.find_one({"_id": crawl_id})
|
org = await self.org_ops.get_org_by_id(oid)
|
||||||
org = await self.org_ops.get_org_by_id(crawl_res["oid"])
|
|
||||||
|
|
||||||
if not org.webhookUrls or not org.webhookUrls.crawlFinished:
|
if not org.webhookUrls or not org.webhookUrls.crawlFinished:
|
||||||
return
|
return
|
||||||
@ -233,10 +234,11 @@ class EventWebhookOps:
|
|||||||
),
|
),
|
||||||
)
|
)
|
||||||
|
|
||||||
async def create_upload_finished_notification(self, crawl_id: str):
|
async def create_upload_finished_notification(
|
||||||
|
self, crawl_id: str, oid: uuid.UUID
|
||||||
|
) -> None:
|
||||||
"""Create webhook notification for finished upload."""
|
"""Create webhook notification for finished upload."""
|
||||||
crawl_res = await self.crawls.find_one({"_id": crawl_id})
|
org = await self.org_ops.get_org_by_id(oid)
|
||||||
org = await self.org_ops.get_org_by_id(crawl_res["oid"])
|
|
||||||
|
|
||||||
if not org.webhookUrls or not org.webhookUrls.uploadFinished:
|
if not org.webhookUrls or not org.webhookUrls.uploadFinished:
|
||||||
return
|
return
|
||||||
@ -252,7 +254,7 @@ class EventWebhookOps:
|
|||||||
|
|
||||||
async def create_crawl_started_notification(
|
async def create_crawl_started_notification(
|
||||||
self, crawl_id: str, oid: uuid.UUID, scheduled: bool = False
|
self, crawl_id: str, oid: uuid.UUID, scheduled: bool = False
|
||||||
):
|
) -> None:
|
||||||
"""Create webhook notification for started crawl."""
|
"""Create webhook notification for started crawl."""
|
||||||
org = await self.org_ops.get_org_by_id(oid)
|
org = await self.org_ops.get_org_by_id(oid)
|
||||||
|
|
||||||
@ -318,7 +320,7 @@ class EventWebhookOps:
|
|||||||
crawl_ids: List[str],
|
crawl_ids: List[str],
|
||||||
coll_id: uuid.UUID,
|
coll_id: uuid.UUID,
|
||||||
org: Organization,
|
org: Organization,
|
||||||
):
|
) -> None:
|
||||||
"""Create webhook notification for item added to collection"""
|
"""Create webhook notification for item added to collection"""
|
||||||
if not org.webhookUrls or not org.webhookUrls.addedToCollection:
|
if not org.webhookUrls or not org.webhookUrls.addedToCollection:
|
||||||
return
|
return
|
||||||
@ -339,7 +341,7 @@ class EventWebhookOps:
|
|||||||
crawl_ids: List[str],
|
crawl_ids: List[str],
|
||||||
coll_id: uuid.UUID,
|
coll_id: uuid.UUID,
|
||||||
org: Organization,
|
org: Organization,
|
||||||
):
|
) -> None:
|
||||||
"""Create webhook notification for item removed from collection"""
|
"""Create webhook notification for item removed from collection"""
|
||||||
if not org.webhookUrls or not org.webhookUrls.removedFromCollection:
|
if not org.webhookUrls or not org.webhookUrls.removedFromCollection:
|
||||||
return
|
return
|
||||||
|
|||||||
Loading…
Reference in New Issue
Block a user