- Fix execution time checking by keeping lastUpdatedTime in db by @ikreymer in https://github.com/webrecorder/browsertrix-cloud/pull/1573 - disable postcss-lit for var css - Prevent closing tooltips from closing collection share dialog by @SuaYoo in https://github.com/webrecorder/browsertrix-cloud/pull/1579 - Fix pending exclusion pagination by @SuaYoo in https://github.com/webrecorder/browsertrix-cloud/pull/1578 - Fix regex escape in exclusion editor text match by @SuaYoo in https://github.com/webrecorder/browsertrix-cloud/pull/1577 --------- Co-authored-by: emma <hi@emma.cafe> Co-authored-by: sua yoo <sua@webrecorder.org>
		
			
				
	
	
		
			1286 lines
		
	
	
		
			44 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			1286 lines
		
	
	
		
			44 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| """ CrawlOperator """
 | |
| 
 | |
| import traceback
 | |
| import os
 | |
| from pprint import pprint
 | |
| from typing import Optional
 | |
| 
 | |
| import json
 | |
| from uuid import UUID
 | |
| from fastapi import HTTPException
 | |
| 
 | |
| import humanize
 | |
| 
 | |
| from kubernetes.utils import parse_quantity
 | |
| from redis import asyncio as exceptions
 | |
| 
 | |
| from btrixcloud.models import (
 | |
|     NON_RUNNING_STATES,
 | |
|     RUNNING_STATES,
 | |
|     RUNNING_AND_STARTING_ONLY,
 | |
|     RUNNING_AND_STARTING_STATES,
 | |
|     SUCCESSFUL_STATES,
 | |
|     FAILED_STATES,
 | |
|     CrawlFile,
 | |
|     CrawlCompleteIn,
 | |
|     StorageRef,
 | |
| )
 | |
| 
 | |
| from btrixcloud.utils import (
 | |
|     from_k8s_date,
 | |
|     to_k8s_date,
 | |
|     dt_now,
 | |
| )
 | |
| 
 | |
| from .baseoperator import BaseOperator, Redis
 | |
| from .models import (
 | |
|     CrawlSpec,
 | |
|     CrawlStatus,
 | |
|     MCBaseRequest,
 | |
|     MCSyncData,
 | |
|     POD,
 | |
|     CMAP,
 | |
|     PVC,
 | |
|     CJS,
 | |
|     BTRIX_API,
 | |
| )
 | |
| 
 | |
| 
 | |
| METRICS_API = "metrics.k8s.io/v1beta1"
 | |
| METRICS = f"PodMetrics.{METRICS_API}"
 | |
| 
 | |
| DEFAULT_TTL = 30
 | |
| 
 | |
| REDIS_TTL = 60
 | |
| 
 | |
| # time in seconds before a crawl is deemed 'waiting' instead of 'starting'
 | |
| STARTING_TIME_SECS = 60
 | |
| 
 | |
| # how often to update execution time seconds
 | |
| EXEC_TIME_UPDATE_SECS = 60
 | |
| 
 | |
| 
 | |
| # pylint: disable=too-many-public-methods, too-many-locals, too-many-branches, too-many-statements
 | |
| # pylint: disable=invalid-name, too-many-lines, too-many-return-statements
 | |
| # ============================================================================
 | |
| class CrawlOperator(BaseOperator):
 | |
|     """CrawlOperator Handler"""
 | |
| 
 | |
|     def __init__(self, *args):
 | |
|         super().__init__(*args)
 | |
| 
 | |
|         self.done_key = "crawls-done"
 | |
|         self.pages_key = "pages"
 | |
|         self.errors_key = "e"
 | |
| 
 | |
|         self.fast_retry_secs = int(os.environ.get("FAST_RETRY_SECS") or 0)
 | |
| 
 | |
|         self.log_failed_crawl_lines = int(os.environ.get("LOG_FAILED_CRAWL_LINES") or 0)
 | |
| 
 | |
|     def init_routes(self, app):
 | |
|         """init routes for this operator"""
 | |
| 
 | |
|         @app.post("/op/crawls/sync")
 | |
|         async def mc_sync_crawls(data: MCSyncData):
 | |
|             return await self.sync_crawls(data)
 | |
| 
 | |
|         # reuse sync path, but distinct endpoint for better logging
 | |
|         @app.post("/op/crawls/finalize")
 | |
|         async def mc_sync_finalize(data: MCSyncData):
 | |
|             return await self.sync_crawls(data)
 | |
| 
 | |
|         @app.post("/op/crawls/customize")
 | |
|         async def mc_related(data: MCBaseRequest):
 | |
|             return self.get_related(data)
 | |
| 
 | |
|     async def sync_crawls(self, data: MCSyncData):
 | |
|         """sync crawls"""
 | |
| 
 | |
|         status = CrawlStatus(**data.parent.get("status", {}))
 | |
| 
 | |
|         spec = data.parent.get("spec", {})
 | |
|         crawl_id = spec["id"]
 | |
|         cid = spec["cid"]
 | |
|         oid = spec["oid"]
 | |
| 
 | |
|         redis_url = self.k8s.get_redis_url(crawl_id)
 | |
| 
 | |
|         params = {}
 | |
|         params.update(self.k8s.shared_params)
 | |
|         params["id"] = crawl_id
 | |
|         params["cid"] = cid
 | |
|         params["userid"] = spec.get("userid", "")
 | |
| 
 | |
|         pods = data.children[POD]
 | |
| 
 | |
|         # if finalizing, crawl is being deleted
 | |
|         if data.finalizing:
 | |
|             if not status.finished:
 | |
|                 # if can't cancel, already finished
 | |
|                 if not await self.cancel_crawl(
 | |
|                     crawl_id, UUID(cid), UUID(oid), status, data.children[POD]
 | |
|                 ):
 | |
|                     # instead of fetching the state (that was already set)
 | |
|                     # return exception to ignore this request, keep previous
 | |
|                     # finished state
 | |
|                     raise HTTPException(status_code=400, detail="out_of_sync_status")
 | |
| 
 | |
|             return await self.finalize_response(
 | |
|                 crawl_id,
 | |
|                 UUID(oid),
 | |
|                 status,
 | |
|                 spec,
 | |
|                 data.children,
 | |
|                 params,
 | |
|             )
 | |
| 
 | |
|         # just in case, finished but not deleted, can only get here if
 | |
|         # do_crawl_finished_tasks() doesn't reach the end or taking too long
 | |
|         if status.finished:
 | |
|             print(
 | |
|                 f"warn crawl {crawl_id} finished but not deleted, post-finish taking too long?"
 | |
|             )
 | |
|             self.run_task(self.k8s.delete_crawl_job(crawl_id))
 | |
|             return await self.finalize_response(
 | |
|                 crawl_id,
 | |
|                 UUID(oid),
 | |
|                 status,
 | |
|                 spec,
 | |
|                 data.children,
 | |
|                 params,
 | |
|             )
 | |
| 
 | |
|         try:
 | |
|             configmap = data.related[CMAP][f"crawl-config-{cid}"]["data"]
 | |
|         # pylint: disable=bare-except, broad-except
 | |
|         except:
 | |
|             # fail crawl if config somehow missing, shouldn't generally happen
 | |
|             await self.fail_crawl(crawl_id, UUID(cid), UUID(oid), status, pods)
 | |
| 
 | |
|             return self._empty_response(status)
 | |
| 
 | |
|         crawl = CrawlSpec(
 | |
|             id=crawl_id,
 | |
|             cid=cid,
 | |
|             oid=oid,
 | |
|             storage=StorageRef(spec["storageName"]),
 | |
|             crawler_channel=spec.get("crawlerChannel"),
 | |
|             scale=spec.get("scale", 1),
 | |
|             started=data.parent["metadata"]["creationTimestamp"],
 | |
|             stopping=spec.get("stopping", False),
 | |
|             timeout=spec.get("timeout") or 0,
 | |
|             max_crawl_size=int(spec.get("maxCrawlSize") or 0),
 | |
|             scheduled=spec.get("manual") != "1",
 | |
|         )
 | |
| 
 | |
|         # shouldn't get here, crawl should already be finalizing when canceled
 | |
|         # just in case, handle canceled-but-not-finalizing here
 | |
|         if status.state == "canceled":
 | |
|             await self.k8s.delete_crawl_job(crawl.id)
 | |
|             return {"status": status.dict(exclude_none=True), "children": []}
 | |
| 
 | |
|         # first, check storage quota, and fail immediately if quota reached
 | |
|         if status.state in ("starting", "skipped_quota_reached"):
 | |
|             # only check on very first run, before any pods/pvcs created
 | |
|             # for now, allow if crawl has already started (pods/pvcs created)
 | |
|             if (
 | |
|                 not pods
 | |
|                 and not data.children[PVC]
 | |
|                 and await self.org_ops.storage_quota_reached(crawl.oid)
 | |
|             ):
 | |
|                 await self.mark_finished(
 | |
|                     crawl.id, crawl.cid, crawl.oid, status, "skipped_quota_reached"
 | |
|                 )
 | |
|                 return self._empty_response(status)
 | |
| 
 | |
|         if status.state in ("starting", "waiting_org_limit"):
 | |
|             if not await self.can_start_new(crawl, data, status):
 | |
|                 return self._empty_response(status)
 | |
| 
 | |
|             await self.set_state(
 | |
|                 "starting", status, crawl.id, allowed_from=["waiting_org_limit"]
 | |
|             )
 | |
| 
 | |
|         if len(pods):
 | |
|             for pod_name, pod in pods.items():
 | |
|                 self.sync_resources(status, pod_name, pod, data.children)
 | |
| 
 | |
|             status = await self.sync_crawl_state(
 | |
|                 redis_url,
 | |
|                 crawl,
 | |
|                 status,
 | |
|                 pods,
 | |
|                 data.related.get(METRICS, {}),
 | |
|             )
 | |
| 
 | |
|             # auto sizing handled here
 | |
|             self.handle_auto_size(crawl.id, status.podStatus)
 | |
| 
 | |
|             if status.finished:
 | |
|                 return await self.finalize_response(
 | |
|                     crawl_id,
 | |
|                     UUID(oid),
 | |
|                     status,
 | |
|                     spec,
 | |
|                     data.children,
 | |
|                     params,
 | |
|                 )
 | |
| 
 | |
|             await self.increment_pod_exec_time(
 | |
|                 pods, status, crawl.id, crawl.oid, EXEC_TIME_UPDATE_SECS
 | |
|             )
 | |
| 
 | |
|         else:
 | |
|             status.scale = crawl.scale
 | |
|             now = dt_now()
 | |
|             await self.crawl_ops.inc_crawl_exec_time(crawl_id, 0, now)
 | |
|             status.lastUpdatedTime = to_k8s_date(now)
 | |
| 
 | |
|         children = self._load_redis(params, status, data.children)
 | |
| 
 | |
|         storage_path = crawl.storage.get_storage_extra_path(oid)
 | |
|         storage_secret = crawl.storage.get_storage_secret_name(oid)
 | |
| 
 | |
|         params["storage_path"] = storage_path
 | |
|         params["storage_secret"] = storage_secret
 | |
|         params["profile_filename"] = configmap["PROFILE_FILENAME"]
 | |
| 
 | |
|         # only resolve if not already set
 | |
|         # not automatically updating image for existing crawls
 | |
|         if not status.crawlerImage:
 | |
|             status.crawlerImage = self.crawl_config_ops.get_channel_crawler_image(
 | |
|                 crawl.crawler_channel
 | |
|             )
 | |
| 
 | |
|         params["crawler_image"] = status.crawlerImage
 | |
| 
 | |
|         params["storage_filename"] = configmap["STORE_FILENAME"]
 | |
|         params["restart_time"] = spec.get("restartTime")
 | |
| 
 | |
|         params["warc_prefix"] = spec.get("warcPrefix")
 | |
| 
 | |
|         params["redis_url"] = redis_url
 | |
| 
 | |
|         if spec.get("restartTime") != status.restartTime:
 | |
|             # pylint: disable=invalid-name
 | |
|             status.restartTime = spec.get("restartTime")
 | |
|             status.resync_after = self.fast_retry_secs
 | |
|             params["force_restart"] = True
 | |
|         else:
 | |
|             params["force_restart"] = False
 | |
| 
 | |
|         for i in range(0, status.scale):
 | |
|             children.extend(self._load_crawler(params, i, status, data.children))
 | |
| 
 | |
|         return {
 | |
|             "status": status.dict(exclude_none=True),
 | |
|             "children": children,
 | |
|             "resyncAfterSeconds": status.resync_after,
 | |
|         }
 | |
| 
 | |
|     def _load_redis(self, params, status, children):
 | |
|         name = f"redis-{params['id']}"
 | |
|         has_pod = name in children[POD]
 | |
| 
 | |
|         pod_info = status.podStatus[name]
 | |
|         params["name"] = name
 | |
|         params["cpu"] = pod_info.newCpu or params.get("redis_cpu")
 | |
|         params["memory"] = pod_info.newMemory or params.get("redis_memory")
 | |
|         restart = pod_info.should_restart_pod() and has_pod
 | |
|         if restart:
 | |
|             print(f"Restart {name}")
 | |
| 
 | |
|         params["init_redis"] = status.initRedis and not restart
 | |
| 
 | |
|         return self.load_from_yaml("redis.yaml", params)
 | |
| 
 | |
|     def _load_crawler(self, params, i, status, children):
 | |
|         name = f"crawl-{params['id']}-{i}"
 | |
|         has_pod = name in children[POD]
 | |
| 
 | |
|         pod_info = status.podStatus[name]
 | |
|         params["name"] = name
 | |
|         params["cpu"] = pod_info.newCpu or params.get("crawler_cpu")
 | |
|         params["memory"] = pod_info.newMemory or params.get("crawler_memory")
 | |
|         params["do_restart"] = (
 | |
|             pod_info.should_restart_pod() or params.get("force_restart")
 | |
|         ) and has_pod
 | |
|         if params.get("do_restart"):
 | |
|             print(f"Restart {name}")
 | |
| 
 | |
|         params["priorityClassName"] = f"crawl-instance-{i}"
 | |
| 
 | |
|         return self.load_from_yaml("crawler.yaml", params)
 | |
| 
 | |
|     # pylint: disable=too-many-arguments
 | |
|     async def _resolve_scale(
 | |
|         self,
 | |
|         crawl_id: str,
 | |
|         desired_scale: int,
 | |
|         redis: Redis,
 | |
|         status: CrawlStatus,
 | |
|         pods: dict[str, dict],
 | |
|     ):
 | |
|         """Resolve scale
 | |
|         If desired_scale >= actual scale, just set (also limit by number of pages
 | |
|         found).
 | |
|         If desired scale < actual scale, attempt to shut down each crawl instance
 | |
|         via redis setting. If contiguous instances shutdown (successful exit), lower
 | |
|         scale and clean up previous scale state.
 | |
|         """
 | |
| 
 | |
|         # actual scale (minus redis pod)
 | |
|         actual_scale = len(pods)
 | |
|         if pods.get(f"redis-{crawl_id}"):
 | |
|             actual_scale -= 1
 | |
| 
 | |
|         # ensure at least enough pages for the scale
 | |
|         if status.pagesFound and status.pagesFound < desired_scale:
 | |
|             desired_scale = status.pagesFound
 | |
| 
 | |
|         # if desired_scale same or scaled up, return desired_scale
 | |
|         if desired_scale >= actual_scale:
 | |
|             return desired_scale
 | |
| 
 | |
|         new_scale = actual_scale
 | |
|         for i in range(actual_scale - 1, desired_scale - 1, -1):
 | |
|             name = f"crawl-{crawl_id}-{i}"
 | |
|             pod = pods.get(name)
 | |
|             if pod:
 | |
|                 print(f"Attempting scaling down of pod {i}")
 | |
|                 await redis.hset(f"{crawl_id}:stopone", name, "1")
 | |
| 
 | |
|             # check if this pod can be scaled down
 | |
|             if new_scale == i + 1:
 | |
|                 # if status key doesn't exist, this pod never actually ran, so just scale down
 | |
|                 if not await redis.hexists(f"{crawl_id}:status", name):
 | |
|                     new_scale = i
 | |
|                     print(f"Scaled down pod index {i + 1} -> {i}, no previous pod")
 | |
| 
 | |
|                 elif pod and pod["status"].get("phase") == "Succeeded":
 | |
|                     new_scale = i
 | |
|                     print(f"Scaled down pod index {i + 1} -> {i}, pod completed")
 | |
| 
 | |
|         if new_scale < actual_scale:
 | |
|             for i in range(new_scale, actual_scale):
 | |
|                 name = f"crawl-{crawl_id}-{i}"
 | |
|                 await redis.hdel(f"{crawl_id}:stopone", name)
 | |
|                 await redis.hdel(f"{crawl_id}:status", name)
 | |
| 
 | |
|         return new_scale
 | |
| 
 | |
|     def sync_resources(self, status, name, pod, children):
 | |
|         """set crawljob status from current resources"""
 | |
|         resources = status.podStatus[name].allocated
 | |
| 
 | |
|         src = pod["spec"]["containers"][0]["resources"]["requests"]
 | |
|         resources.memory = int(parse_quantity(src.get("memory")))
 | |
|         resources.cpu = float(parse_quantity(src.get("cpu")))
 | |
| 
 | |
|         pvc = children[PVC].get(name)
 | |
|         if pvc:
 | |
|             src = pvc["spec"]["resources"]["requests"]
 | |
|             resources.storage = int(parse_quantity(src.get("storage")))
 | |
| 
 | |
|     async def set_state(self, state, status, crawl_id, allowed_from, **kwargs):
 | |
|         """set status state and update db, if changed
 | |
|         if allowed_from passed in, can only transition from allowed_from state,
 | |
|         otherwise get current state from db and return
 | |
|         the following state transitions are supported:
 | |
| 
 | |
|         from starting to org concurrent crawl limit and back:
 | |
|          - starting -> waiting_org_capacity -> starting
 | |
| 
 | |
|         from starting to running:
 | |
|          - starting -> running
 | |
| 
 | |
|         from running to complete or complete[:stopReason]:
 | |
|          - running -> complete[:stopReason]
 | |
|          - running -> complete
 | |
| 
 | |
|         from starting or running to waiting for capacity (pods pending) and back:
 | |
|          - starting -> waiting_capacity
 | |
|          - running -> waiting_capacity
 | |
|          - waiting_capacity -> running
 | |
| 
 | |
|         from any state to canceled or failed:
 | |
|          - not complete[:stopReason] -> canceled
 | |
|          - not complete[:stopReason] -> failed
 | |
|         """
 | |
|         if not allowed_from or status.state in allowed_from:
 | |
|             res = await self.crawl_ops.update_crawl_state_if_allowed(
 | |
|                 crawl_id, state=state, allowed_from=allowed_from, **kwargs
 | |
|             )
 | |
|             if res:
 | |
|                 print(f"Setting state: {status.state} -> {state}, {crawl_id}")
 | |
|                 status.state = state
 | |
|                 return True
 | |
| 
 | |
|             # get actual crawl state
 | |
|             actual_state, finished = await self.crawl_ops.get_crawl_state(crawl_id)
 | |
|             if actual_state:
 | |
|                 status.state = actual_state
 | |
|             if finished:
 | |
|                 status.finished = to_k8s_date(finished)
 | |
| 
 | |
|             if actual_state != state:
 | |
|                 print(f"state mismatch, actual state {actual_state}, requested {state}")
 | |
|                 if not actual_state and state == "canceled":
 | |
|                     return True
 | |
| 
 | |
|         if status.state != state:
 | |
|             print(
 | |
|                 f"Not setting state: {status.state} -> {state}, {crawl_id} not allowed"
 | |
|             )
 | |
|         return False
 | |
| 
 | |
|     def get_related(self, data: MCBaseRequest):
 | |
|         """return objects related to crawl pods"""
 | |
|         spec = data.parent.get("spec", {})
 | |
|         cid = spec["cid"]
 | |
|         crawl_id = spec["id"]
 | |
|         oid = spec.get("oid")
 | |
|         related_resources = [
 | |
|             {
 | |
|                 "apiVersion": "v1",
 | |
|                 "resource": "configmaps",
 | |
|                 "labelSelector": {"matchLabels": {"btrix.crawlconfig": cid}},
 | |
|             },
 | |
|             {
 | |
|                 "apiVersion": BTRIX_API,
 | |
|                 "resource": "crawljobs",
 | |
|                 "labelSelector": {"matchLabels": {"btrix.org": oid}},
 | |
|             },
 | |
|         ]
 | |
| 
 | |
|         if self.k8s.has_pod_metrics:
 | |
|             related_resources.append(
 | |
|                 {
 | |
|                     "apiVersion": METRICS_API,
 | |
|                     "resource": "pods",
 | |
|                     "labelSelector": {"matchLabels": {"crawl": crawl_id}},
 | |
|                 }
 | |
|             )
 | |
| 
 | |
|         return {"relatedResources": related_resources}
 | |
| 
 | |
|     async def can_start_new(self, crawl: CrawlSpec, data: MCSyncData, status):
 | |
|         """return true if crawl can start, otherwise set crawl to 'queued' state
 | |
|         until more crawls for org finish"""
 | |
|         max_crawls = await self.org_ops.get_max_concurrent_crawls(crawl.oid)
 | |
|         if not max_crawls:
 | |
|             return True
 | |
| 
 | |
|         if len(data.related[CJS]) <= max_crawls:
 | |
|             return True
 | |
| 
 | |
|         name = data.parent.get("metadata", {}).get("name")
 | |
| 
 | |
|         # def metadata_key(val):
 | |
|         #    return val.get("metadata").get("creationTimestamp")
 | |
| 
 | |
|         # all_crawljobs = sorted(data.related[CJS].values(), key=metadata_key)
 | |
|         # print(list(data.related[CJS].keys()))
 | |
| 
 | |
|         i = 0
 | |
|         for crawl_sorted in data.related[CJS].values():
 | |
|             if crawl_sorted.get("status", {}).get("state") in NON_RUNNING_STATES:
 | |
|                 continue
 | |
| 
 | |
|             # print(i, crawl_sorted.get("metadata").get("name"))
 | |
|             if crawl_sorted.get("metadata").get("name") == name:
 | |
|                 # print("found: ", name, "index", i)
 | |
|                 if i < max_crawls:
 | |
|                     return True
 | |
| 
 | |
|                 break
 | |
|             i += 1
 | |
| 
 | |
|         await self.set_state(
 | |
|             "waiting_org_limit", status, crawl.id, allowed_from=["starting"]
 | |
|         )
 | |
|         return False
 | |
| 
 | |
|     async def cancel_crawl(
 | |
|         self,
 | |
|         crawl_id: str,
 | |
|         cid: UUID,
 | |
|         oid: UUID,
 | |
|         status: CrawlStatus,
 | |
|         pods: dict,
 | |
|     ) -> bool:
 | |
|         """Mark crawl as canceled"""
 | |
|         if not await self.mark_finished(crawl_id, cid, oid, status, "canceled"):
 | |
|             return False
 | |
| 
 | |
|         await self.mark_for_cancelation(crawl_id)
 | |
| 
 | |
|         if not status.canceled:
 | |
|             for name, pod in pods.items():
 | |
|                 pstatus = pod["status"]
 | |
|                 role = pod["metadata"]["labels"]["role"]
 | |
| 
 | |
|                 if role != "crawler":
 | |
|                     continue
 | |
| 
 | |
|                 if "containerStatuses" not in pstatus:
 | |
|                     continue
 | |
| 
 | |
|                 cstatus = pstatus["containerStatuses"][0]
 | |
| 
 | |
|                 self.handle_terminated_pod(
 | |
|                     name, role, status, cstatus["state"].get("terminated")
 | |
|                 )
 | |
| 
 | |
|             status.canceled = True
 | |
| 
 | |
|         return status.canceled
 | |
| 
 | |
|     async def fail_crawl(
 | |
|         self,
 | |
|         crawl_id: str,
 | |
|         cid: UUID,
 | |
|         oid: UUID,
 | |
|         status: CrawlStatus,
 | |
|         pods: dict,
 | |
|         stats=None,
 | |
|     ) -> bool:
 | |
|         """Mark crawl as failed, log crawl state and print crawl logs, if possible"""
 | |
|         prev_state = status.state
 | |
| 
 | |
|         if not await self.mark_finished(
 | |
|             crawl_id, cid, oid, status, "failed", stats=stats
 | |
|         ):
 | |
|             return False
 | |
| 
 | |
|         if not self.log_failed_crawl_lines or prev_state == "failed":
 | |
|             return True
 | |
| 
 | |
|         pod_names = list(pods.keys())
 | |
| 
 | |
|         for name in pod_names:
 | |
|             print(f"============== POD STATUS: {name} ==============")
 | |
|             pprint(pods[name]["status"])
 | |
| 
 | |
|         self.run_task(self.k8s.print_pod_logs(pod_names, self.log_failed_crawl_lines))
 | |
| 
 | |
|         return True
 | |
| 
 | |
|     def _empty_response(self, status):
 | |
|         """done response for removing crawl"""
 | |
|         return {
 | |
|             "status": status.dict(exclude_none=True),
 | |
|             "children": [],
 | |
|         }
 | |
| 
 | |
|     async def finalize_response(
 | |
|         self,
 | |
|         crawl_id: str,
 | |
|         oid: UUID,
 | |
|         status: CrawlStatus,
 | |
|         spec: dict,
 | |
|         children: dict,
 | |
|         params: dict,
 | |
|     ):
 | |
|         """ensure crawl id ready for deletion"""
 | |
| 
 | |
|         redis_pod = f"redis-{crawl_id}"
 | |
|         new_children = []
 | |
| 
 | |
|         finalized = False
 | |
| 
 | |
|         pods = children[POD]
 | |
| 
 | |
|         if redis_pod in pods:
 | |
|             # if has other pods, keep redis pod until they are removed
 | |
|             if len(pods) > 1:
 | |
|                 new_children = self._load_redis(params, status, children)
 | |
|                 await self.increment_pod_exec_time(pods, status, crawl_id, oid)
 | |
| 
 | |
|         # keep pvs until pods are removed
 | |
|         if new_children:
 | |
|             new_children.extend(list(children[PVC].values()))
 | |
| 
 | |
|         if not children[POD] and not children[PVC]:
 | |
|             # keep parent until ttl expired, if any
 | |
|             if status.finished:
 | |
|                 ttl = spec.get("ttlSecondsAfterFinished", DEFAULT_TTL)
 | |
|                 finished = from_k8s_date(status.finished)
 | |
|                 if (dt_now() - finished).total_seconds() > ttl > 0:
 | |
|                     print("CrawlJob expired, deleting: " + crawl_id)
 | |
|                     finalized = True
 | |
|             else:
 | |
|                 finalized = True
 | |
| 
 | |
|         return {
 | |
|             "status": status.dict(exclude_none=True),
 | |
|             "children": new_children,
 | |
|             "finalized": finalized,
 | |
|         }
 | |
| 
 | |
|     async def _get_redis(self, redis_url: str) -> Optional[Redis]:
 | |
|         """init redis, ensure connectivity"""
 | |
|         redis = None
 | |
|         try:
 | |
|             redis = await self.k8s.get_redis_client(redis_url)
 | |
|             # test connection
 | |
|             await redis.ping()
 | |
|             return redis
 | |
| 
 | |
|         # pylint: disable=bare-except
 | |
|         except:
 | |
|             if redis:
 | |
|                 await redis.close()
 | |
| 
 | |
|             return None
 | |
| 
 | |
|     async def sync_crawl_state(
 | |
|         self,
 | |
|         redis_url: str,
 | |
|         crawl: CrawlSpec,
 | |
|         status: CrawlStatus,
 | |
|         pods: dict[str, dict],
 | |
|         metrics: Optional[dict],
 | |
|     ):
 | |
|         """sync crawl state for running crawl"""
 | |
|         # check if at least one crawler pod started running
 | |
|         crawler_running, redis_running, done = self.sync_pod_status(pods, status)
 | |
|         redis = None
 | |
| 
 | |
|         try:
 | |
|             if redis_running:
 | |
|                 redis = await self._get_redis(redis_url)
 | |
| 
 | |
|             await self.add_used_stats(crawl.id, status.podStatus, redis, metrics)
 | |
| 
 | |
|             # skip if no newly exited pods
 | |
|             if status.anyCrawlPodNewExit:
 | |
|                 await self.log_crashes(crawl.id, status.podStatus, redis)
 | |
| 
 | |
|             if not crawler_running or not redis:
 | |
|                 # if either crawler is not running or redis is inaccessible
 | |
|                 if self.should_mark_waiting(status.state, crawl.started):
 | |
|                     # mark as waiting (if already running)
 | |
|                     await self.set_state(
 | |
|                         "waiting_capacity",
 | |
|                         status,
 | |
|                         crawl.id,
 | |
|                         allowed_from=RUNNING_AND_STARTING_ONLY,
 | |
|                     )
 | |
| 
 | |
|                 if not crawler_running and redis:
 | |
|                     # if crawler running, but no redis, stop redis instance until crawler
 | |
|                     # is running
 | |
|                     if status.lastActiveTime and (
 | |
|                         (
 | |
|                             dt_now() - from_k8s_date(status.lastActiveTime)
 | |
|                         ).total_seconds()
 | |
|                         > REDIS_TTL
 | |
|                     ):
 | |
|                         print(
 | |
|                             f"Pausing redis, no running crawler pods for >{REDIS_TTL} secs"
 | |
|                         )
 | |
|                         status.initRedis = False
 | |
|                 elif crawler_running and not redis:
 | |
|                     # if crawler is running, but no redis, init redis
 | |
|                     status.initRedis = True
 | |
|                     status.lastActiveTime = to_k8s_date(dt_now())
 | |
| 
 | |
|                 # if no crawler / no redis, resync after N seconds
 | |
|                 status.resync_after = self.fast_retry_secs
 | |
|                 return status
 | |
| 
 | |
|             # set state to running (if not already)
 | |
|             if status.state not in RUNNING_STATES:
 | |
|                 # if true (state is set), also run webhook
 | |
|                 if await self.set_state(
 | |
|                     "running",
 | |
|                     status,
 | |
|                     crawl.id,
 | |
|                     allowed_from=["starting", "waiting_capacity"],
 | |
|                 ):
 | |
|                     self.run_task(
 | |
|                         self.event_webhook_ops.create_crawl_started_notification(
 | |
|                             crawl.id, crawl.oid, scheduled=crawl.scheduled
 | |
|                         )
 | |
|                     )
 | |
| 
 | |
|             file_done = await redis.lpop(self.done_key)
 | |
|             while file_done:
 | |
|                 msg = json.loads(file_done)
 | |
|                 # add completed file
 | |
|                 if msg.get("filename"):
 | |
|                     await self.add_file_to_crawl(msg, crawl, redis)
 | |
|                     await redis.incr("filesAdded")
 | |
| 
 | |
|                 # get next file done
 | |
|                 file_done = await redis.lpop(self.done_key)
 | |
| 
 | |
|             page_crawled = await redis.lpop(f"{crawl.id}:{self.pages_key}")
 | |
|             while page_crawled:
 | |
|                 page_dict = json.loads(page_crawled)
 | |
|                 await self.page_ops.add_page_to_db(page_dict, crawl.id, crawl.oid)
 | |
|                 page_crawled = await redis.lpop(f"{crawl.id}:{self.pages_key}")
 | |
| 
 | |
|             crawl_error = await redis.lpop(f"{crawl.id}:{self.errors_key}")
 | |
|             while crawl_error:
 | |
|                 await self.crawl_ops.add_crawl_error(crawl.id, crawl_error)
 | |
|                 crawl_error = await redis.lpop(f"{crawl.id}:{self.errors_key}")
 | |
| 
 | |
|             # ensure filesAdded and filesAddedSize always set
 | |
|             status.filesAdded = int(await redis.get("filesAdded") or 0)
 | |
|             status.filesAddedSize = int(await redis.get("filesAddedSize") or 0)
 | |
| 
 | |
|             # update stats and get status
 | |
|             return await self.update_crawl_state(redis, crawl, status, pods, done)
 | |
| 
 | |
|         # pylint: disable=broad-except
 | |
|         except Exception as exc:
 | |
|             traceback.print_exc()
 | |
|             print(f"Crawl get failed: {exc}, will try again")
 | |
|             return status
 | |
| 
 | |
|         finally:
 | |
|             if redis:
 | |
|                 await redis.close()
 | |
| 
 | |
|     def sync_pod_status(self, pods: dict[str, dict], status: CrawlStatus):
 | |
|         """check status of pods"""
 | |
|         crawler_running = False
 | |
|         redis_running = False
 | |
|         done = True
 | |
| 
 | |
|         try:
 | |
|             for name, pod in pods.items():
 | |
|                 running = False
 | |
| 
 | |
|                 pstatus = pod["status"]
 | |
|                 phase = pstatus["phase"]
 | |
|                 role = pod["metadata"]["labels"]["role"]
 | |
| 
 | |
|                 if phase in ("Running", "Succeeded"):
 | |
|                     running = True
 | |
| 
 | |
|                 if "containerStatuses" in pstatus:
 | |
|                     cstatus = pstatus["containerStatuses"][0]
 | |
| 
 | |
|                     # consider 'ContainerCreating' as running
 | |
|                     waiting = cstatus["state"].get("waiting")
 | |
|                     if (
 | |
|                         phase == "Pending"
 | |
|                         and waiting
 | |
|                         and waiting.get("reason") == "ContainerCreating"
 | |
|                     ):
 | |
|                         running = True
 | |
| 
 | |
|                     self.handle_terminated_pod(
 | |
|                         name, role, status, cstatus["state"].get("terminated")
 | |
|                     )
 | |
| 
 | |
|                 if role == "crawler":
 | |
|                     crawler_running = crawler_running or running
 | |
|                     done = done and phase == "Succeeded"
 | |
|                 elif role == "redis":
 | |
|                     redis_running = redis_running or running
 | |
| 
 | |
|         # pylint: disable=broad-except
 | |
|         except Exception as exc:
 | |
|             done = False
 | |
|             print(exc)
 | |
| 
 | |
|         return crawler_running, redis_running, done
 | |
| 
 | |
|     def handle_terminated_pod(self, name, role, status, terminated):
 | |
|         """handle terminated pod state"""
 | |
|         if not terminated:
 | |
|             return
 | |
| 
 | |
|         exit_time = terminated.get("finishedAt")
 | |
|         if not exit_time:
 | |
|             print("warn: terminated pod missing finishedAt", flush=True)
 | |
|             return
 | |
| 
 | |
|         pod_status = status.podStatus[name]
 | |
| 
 | |
|         pod_status.isNewExit = pod_status.exitTime != exit_time
 | |
|         if pod_status.isNewExit and role == "crawler":
 | |
|             pod_status.exitTime = exit_time
 | |
|             status.anyCrawlPodNewExit = True
 | |
| 
 | |
|         # detect reason
 | |
|         exit_code = terminated.get("exitCode")
 | |
| 
 | |
|         if exit_code == 0:
 | |
|             pod_status.reason = "done"
 | |
|         elif terminated.get("reason") == "OOMKilled" or exit_code == 137:
 | |
|             pod_status.reason = "oom"
 | |
|         else:
 | |
|             pod_status.reason = "interrupt: " + str(exit_code)
 | |
| 
 | |
|         pod_status.exitCode = exit_code
 | |
| 
 | |
|     async def increment_pod_exec_time(
 | |
|         self,
 | |
|         pods: dict[str, dict],
 | |
|         status: CrawlStatus,
 | |
|         crawl_id: str,
 | |
|         oid: UUID,
 | |
|         min_duration=0,
 | |
|     ) -> None:
 | |
|         """inc exec time tracking"""
 | |
|         now = dt_now()
 | |
| 
 | |
|         update_start_time = await self.crawl_ops.get_crawl_exec_last_update_time(
 | |
|             crawl_id
 | |
|         )
 | |
| 
 | |
|         if not update_start_time:
 | |
|             await self.crawl_ops.inc_crawl_exec_time(crawl_id, 0, now)
 | |
|             status.lastUpdatedTime = to_k8s_date(now)
 | |
|             return
 | |
| 
 | |
|         reason = None
 | |
|         update_duration = (now - update_start_time).total_seconds()
 | |
| 
 | |
|         if status.anyCrawlPodNewExit:
 | |
|             reason = "new pod exit"
 | |
| 
 | |
|         elif status.canceled:
 | |
|             reason = "crawl canceled"
 | |
| 
 | |
|         elif now.month != update_start_time.month:
 | |
|             reason = "month change"
 | |
| 
 | |
|         elif update_duration >= min_duration:
 | |
|             reason = "duration reached" if min_duration else "finalizing"
 | |
| 
 | |
|         if not reason:
 | |
|             return
 | |
| 
 | |
|         exec_time = 0
 | |
|         max_duration = 0
 | |
|         print(
 | |
|             f"Exec Time Update: {reason}: {now} - {update_start_time} = {update_duration}"
 | |
|         )
 | |
| 
 | |
|         for name, pod in pods.items():
 | |
|             pstatus = pod["status"]
 | |
|             role = pod["metadata"]["labels"]["role"]
 | |
| 
 | |
|             if role != "crawler":
 | |
|                 continue
 | |
| 
 | |
|             if "containerStatuses" not in pstatus:
 | |
|                 continue
 | |
| 
 | |
|             cstate = pstatus["containerStatuses"][0]["state"]
 | |
| 
 | |
|             end_time = None
 | |
|             start_time = None
 | |
|             pod_state = ""
 | |
| 
 | |
|             if "running" in cstate:
 | |
|                 pod_state = "running"
 | |
|                 state = cstate["running"]
 | |
|                 start_time = from_k8s_date(state.get("startedAt"))
 | |
|                 if update_start_time and update_start_time > start_time:
 | |
|                     start_time = update_start_time
 | |
| 
 | |
|                 end_time = now
 | |
|             elif "terminated" in cstate:
 | |
|                 pod_state = "terminated"
 | |
|                 state = cstate["terminated"]
 | |
|                 start_time = from_k8s_date(state.get("startedAt"))
 | |
|                 end_time = from_k8s_date(state.get("finishedAt"))
 | |
|                 if update_start_time and update_start_time > start_time:
 | |
|                     start_time = update_start_time
 | |
| 
 | |
|                 # already counted
 | |
|                 if update_start_time and end_time < update_start_time:
 | |
|                     print(
 | |
|                         f"  - {name}: {pod_state}: skipping already counted, "
 | |
|                         + f"{end_time} < {start_time}"
 | |
|                     )
 | |
|                     continue
 | |
| 
 | |
|             if end_time and start_time:
 | |
|                 duration = int((end_time - start_time).total_seconds())
 | |
|                 print(
 | |
|                     f"  - {name}: {pod_state}: {end_time} - {start_time} = {duration}"
 | |
|                 )
 | |
|                 exec_time += duration
 | |
|                 max_duration = max(duration, max_duration)
 | |
| 
 | |
|         if exec_time:
 | |
|             await self.org_ops.inc_org_time_stats(oid, exec_time, True)
 | |
|             status.crawlExecTime += exec_time
 | |
|             status.elapsedCrawlTime += max_duration
 | |
| 
 | |
|         print(
 | |
|             f"  Exec Time Total: {status.crawlExecTime}, Incremented By: {exec_time}",
 | |
|             flush=True,
 | |
|         )
 | |
| 
 | |
|         await self.crawl_ops.inc_crawl_exec_time(crawl_id, exec_time, now)
 | |
|         status.lastUpdatedTime = to_k8s_date(now)
 | |
| 
 | |
|     def should_mark_waiting(self, state, started):
 | |
|         """Should the crawl be marked as waiting for capacity?"""
 | |
|         if state in RUNNING_STATES:
 | |
|             return True
 | |
| 
 | |
|         if state == "starting":
 | |
|             started = from_k8s_date(started)
 | |
|             return (dt_now() - started).total_seconds() > STARTING_TIME_SECS
 | |
| 
 | |
|         return False
 | |
| 
 | |
|     async def add_used_stats(self, crawl_id, pod_status, redis, metrics):
 | |
|         """load current usage stats"""
 | |
|         if redis:
 | |
|             stats = await redis.info("persistence")
 | |
|             storage = int(stats.get("aof_current_size", 0)) + int(
 | |
|                 stats.get("current_cow_size", 0)
 | |
|             )
 | |
|             pod_info = pod_status[f"redis-{crawl_id}"]
 | |
|             pod_info.used.storage = storage
 | |
| 
 | |
|             # if no pod metrics, get memory estimate from redis itself
 | |
|             if not self.k8s.has_pod_metrics:
 | |
|                 stats = await redis.info("memory")
 | |
|                 pod_info.used.memory = int(stats.get("used_memory_rss", 0))
 | |
| 
 | |
|                 # stats = await redis.info("cpu")
 | |
|                 # pod_info.used.cpu = float(stats.get("used_cpu_sys", 0))
 | |
| 
 | |
|         for name, metric in metrics.items():
 | |
|             usage = metric["containers"][0]["usage"]
 | |
|             pod_info = pod_status[name]
 | |
|             pod_info.used.memory = int(parse_quantity(usage["memory"]))
 | |
|             pod_info.used.cpu = float(parse_quantity(usage["cpu"]))
 | |
| 
 | |
|     def handle_auto_size(self, _, pod_status):
 | |
|         """auto scale pods here, experimental"""
 | |
|         for name, pod in pod_status.items():
 | |
|             # if pod crashed due to OOM, increase mem
 | |
|             # if pod.isNewExit and pod.reason == "oom":
 | |
|             #    pod.newMemory = int(float(pod.allocated.memory) * 1.2)
 | |
|             #    print(f"Resizing pod {name} -> mem {pod.newMemory} - OOM Detected")
 | |
| 
 | |
|             # if redis is using >0.90 of its memory, increase mem
 | |
|             if name.startswith("redis") and pod.get_percent_memory() > 0.90:
 | |
|                 pod.newMemory = int(float(pod.allocated.memory) * 1.2)
 | |
|                 print(f"Resizing pod {name} -> mem {pod.newMemory} - Redis Capacity")
 | |
| 
 | |
|     async def log_crashes(self, crawl_id, pod_status, redis):
 | |
|         """report/log any pod crashes here"""
 | |
|         for name, pod in pod_status.items():
 | |
|             # log only unexpected exits as crashes
 | |
|             # - 0 is success / intended shutdown
 | |
|             # - 11 is default interrupt / intended restart
 | |
|             # - 13 is force interrupt / intended restart
 | |
|             if not pod.isNewExit or pod.exitCode in (0, 11, 13):
 | |
|                 continue
 | |
| 
 | |
|             log = self.get_log_line(
 | |
|                 "Crawler Instance Crashed", {"reason": pod.reason, "pod": name}
 | |
|             )
 | |
|             if not redis:
 | |
|                 print(log)
 | |
|             else:
 | |
|                 await redis.lpush(f"{crawl_id}:e", log)
 | |
| 
 | |
|     def get_log_line(self, message, details):
 | |
|         """get crawler error line for logging"""
 | |
|         err = {
 | |
|             "timestamp": dt_now().isoformat(),
 | |
|             "logLevel": "error",
 | |
|             "context": "k8s",
 | |
|             "message": message,
 | |
|             "details": details,
 | |
|         }
 | |
|         return json.dumps(err)
 | |
| 
 | |
|     async def add_file_to_crawl(self, cc_data, crawl, redis):
 | |
|         """Handle finished CrawlFile to db"""
 | |
| 
 | |
|         filecomplete = CrawlCompleteIn(**cc_data)
 | |
| 
 | |
|         org = await self.org_ops.get_org_by_id(crawl.oid)
 | |
| 
 | |
|         filename = self.storage_ops.get_org_relative_path(
 | |
|             org, crawl.storage, filecomplete.filename
 | |
|         )
 | |
| 
 | |
|         crawl_file = CrawlFile(
 | |
|             filename=filename,
 | |
|             size=filecomplete.size,
 | |
|             hash=filecomplete.hash,
 | |
|             crc32=filecomplete.crc32,
 | |
|             storage=crawl.storage,
 | |
|         )
 | |
| 
 | |
|         await redis.incr("filesAddedSize", filecomplete.size)
 | |
| 
 | |
|         await self.crawl_ops.add_crawl_file(crawl.id, crawl_file, filecomplete.size)
 | |
| 
 | |
|         try:
 | |
|             await self.background_job_ops.create_replica_jobs(
 | |
|                 crawl.oid, crawl_file, crawl.id, "crawl"
 | |
|             )
 | |
|         # pylint: disable=broad-except
 | |
|         except Exception as exc:
 | |
|             print("Replicate Exception", exc, flush=True)
 | |
| 
 | |
|         return True
 | |
| 
 | |
|     async def is_crawl_stopping(
 | |
|         self, crawl: CrawlSpec, status: CrawlStatus
 | |
|     ) -> Optional[str]:
 | |
|         """check if crawl is stopping and set reason"""
 | |
|         # if user requested stop, then enter stopping phase
 | |
|         if crawl.stopping:
 | |
|             print("Graceful Stop: User requested stop")
 | |
|             return "stopped_by_user"
 | |
| 
 | |
|         # check timeout if timeout time exceeds elapsed time
 | |
|         if crawl.timeout:
 | |
|             elapsed = (
 | |
|                 status.elapsedCrawlTime
 | |
|                 + (dt_now() - from_k8s_date(status.lastUpdatedTime)).total_seconds()
 | |
|             )
 | |
|             if elapsed > crawl.timeout:
 | |
|                 print(
 | |
|                     f"Graceful Stop: Crawl running time exceeded {crawl.timeout} second timeout"
 | |
|                 )
 | |
|                 return "time-limit"
 | |
| 
 | |
|         # crawl size limit
 | |
|         if crawl.max_crawl_size and status.size > crawl.max_crawl_size:
 | |
|             print(f"Graceful Stop: Maximum crawl size {crawl.max_crawl_size} hit")
 | |
|             return "size-limit"
 | |
| 
 | |
|         # check exec time quotas and stop if reached limit
 | |
|         if await self.org_ops.exec_mins_quota_reached(crawl.oid):
 | |
|             return "stopped_quota_reached"
 | |
| 
 | |
|         return None
 | |
| 
 | |
|     async def get_redis_crawl_stats(self, redis: Redis, crawl_id: str):
 | |
|         """get page stats"""
 | |
|         try:
 | |
|             # crawler >0.9.0, done key is a value
 | |
|             pages_done = int(await redis.get(f"{crawl_id}:d") or 0)
 | |
|         except exceptions.ResponseError:
 | |
|             # crawler <=0.9.0, done key is a list
 | |
|             pages_done = await redis.llen(f"{crawl_id}:d")
 | |
| 
 | |
|         pages_found = await redis.scard(f"{crawl_id}:s")
 | |
|         sizes = await redis.hgetall(f"{crawl_id}:size")
 | |
|         archive_size = sum(int(x) for x in sizes.values())
 | |
| 
 | |
|         stats = {"found": pages_found, "done": pages_done, "size": archive_size}
 | |
|         return stats, sizes
 | |
| 
 | |
|     async def update_crawl_state(
 | |
|         self,
 | |
|         redis: Redis,
 | |
|         crawl: CrawlSpec,
 | |
|         status: CrawlStatus,
 | |
|         pods: dict[str, dict],
 | |
|         done: bool,
 | |
|     ) -> CrawlStatus:
 | |
|         """update crawl state and check if crawl is now done"""
 | |
|         results = await redis.hgetall(f"{crawl.id}:status")
 | |
|         stats, sizes = await self.get_redis_crawl_stats(redis, crawl.id)
 | |
| 
 | |
|         # need to add size of previously completed WACZ files as well!
 | |
|         stats["size"] += status.filesAddedSize
 | |
| 
 | |
|         # update status
 | |
|         status.pagesDone = stats["done"]
 | |
|         status.pagesFound = stats["found"]
 | |
|         status.size = stats["size"]
 | |
|         status.sizeHuman = humanize.naturalsize(status.size)
 | |
| 
 | |
|         await self.crawl_ops.update_running_crawl_stats(crawl.id, stats)
 | |
| 
 | |
|         for key, value in sizes.items():
 | |
|             value = int(value)
 | |
|             if value > 0 and status.podStatus:
 | |
|                 pod_info = status.podStatus[key]
 | |
|                 pod_info.used.storage = value
 | |
| 
 | |
|         if not status.stopReason:
 | |
|             status.stopReason = await self.is_crawl_stopping(crawl, status)
 | |
|             status.stopping = status.stopReason is not None
 | |
| 
 | |
|         # mark crawl as stopping
 | |
|         if status.stopping:
 | |
|             await redis.set(f"{crawl.id}:stopping", "1")
 | |
|             # backwards compatibility with older crawler
 | |
|             await redis.set("crawl-stop", "1")
 | |
| 
 | |
|         # resolve scale
 | |
|         if crawl.scale != status.scale:
 | |
|             status.scale = await self._resolve_scale(
 | |
|                 crawl.id, crawl.scale, redis, status, pods
 | |
|             )
 | |
| 
 | |
|         # check if done / failed
 | |
|         status_count: dict[str, int] = {}
 | |
|         for i in range(crawl.scale):
 | |
|             res = results.get(f"crawl-{crawl.id}-{i}")
 | |
|             if res:
 | |
|                 status_count[res] = status_count.get(res, 0) + 1
 | |
| 
 | |
|         # check if all crawlers are done
 | |
|         if done and status_count.get("done", 0) >= crawl.scale:
 | |
|             # check if one-page crawls actually succeeded
 | |
|             # if only one page found, and no files, assume failed
 | |
|             if status.pagesFound == 1 and not status.filesAdded:
 | |
|                 await self.fail_crawl(
 | |
|                     crawl.id, crawl.cid, crawl.oid, status, pods, stats
 | |
|                 )
 | |
|                 return status
 | |
| 
 | |
|             if status.stopReason in ("stopped_by_user", "stopped_quota_reached"):
 | |
|                 state = status.stopReason
 | |
|             else:
 | |
|                 state = "complete"
 | |
| 
 | |
|             await self.mark_finished(
 | |
|                 crawl.id, crawl.cid, crawl.oid, status, state, crawl, stats
 | |
|             )
 | |
| 
 | |
|         # check if all crawlers failed
 | |
|         elif status_count.get("failed", 0) >= crawl.scale:
 | |
|             # if stopping, and no pages finished, mark as canceled
 | |
|             if status.stopping and not status.pagesDone:
 | |
|                 await self.mark_finished(
 | |
|                     crawl.id, crawl.cid, crawl.oid, status, "canceled", crawl, stats
 | |
|                 )
 | |
|             else:
 | |
|                 await self.fail_crawl(
 | |
|                     crawl.id, crawl.cid, crawl.oid, status, pods, stats
 | |
|                 )
 | |
| 
 | |
|         # check for other statuses
 | |
|         else:
 | |
|             new_status = None
 | |
|             if status_count.get("running"):
 | |
|                 if status.state in ("generate-wacz", "uploading-wacz", "pending-wacz"):
 | |
|                     new_status = "running"
 | |
| 
 | |
|             elif status_count.get("generate-wacz"):
 | |
|                 new_status = "generate-wacz"
 | |
|             elif status_count.get("uploading-wacz"):
 | |
|                 new_status = "uploading-wacz"
 | |
|             elif status_count.get("pending-wait"):
 | |
|                 new_status = "pending-wait"
 | |
|             if new_status:
 | |
|                 await self.set_state(
 | |
|                     new_status, status, crawl.id, allowed_from=RUNNING_STATES
 | |
|                 )
 | |
| 
 | |
|         return status
 | |
| 
 | |
|     # pylint: disable=too-many-arguments
 | |
|     async def mark_finished(
 | |
|         self,
 | |
|         crawl_id: str,
 | |
|         cid: UUID,
 | |
|         oid: UUID,
 | |
|         status: CrawlStatus,
 | |
|         state: str,
 | |
|         crawl=None,
 | |
|         stats=None,
 | |
|     ) -> bool:
 | |
|         """mark crawl as finished, set finished timestamp and final state"""
 | |
| 
 | |
|         finished = dt_now()
 | |
| 
 | |
|         kwargs = {"finished": finished}
 | |
|         if stats:
 | |
|             kwargs["stats"] = stats
 | |
| 
 | |
|         if state in SUCCESSFUL_STATES:
 | |
|             allowed_from = RUNNING_STATES
 | |
|         else:
 | |
|             allowed_from = RUNNING_AND_STARTING_STATES
 | |
| 
 | |
|         # if set_state returns false, already set to same status, return
 | |
|         if not await self.set_state(
 | |
|             state, status, crawl_id, allowed_from=allowed_from, **kwargs
 | |
|         ):
 | |
|             print("already finished, ignoring mark_finished")
 | |
|             if not status.finished:
 | |
|                 status.finished = to_k8s_date(finished)
 | |
| 
 | |
|             return False
 | |
| 
 | |
|         status.finished = to_k8s_date(finished)
 | |
| 
 | |
|         if crawl and state in SUCCESSFUL_STATES:
 | |
|             await self.inc_crawl_complete_stats(crawl, finished)
 | |
| 
 | |
|         self.run_task(
 | |
|             self.do_crawl_finished_tasks(
 | |
|                 crawl_id, cid, oid, status.filesAddedSize, state
 | |
|             )
 | |
|         )
 | |
| 
 | |
|         return True
 | |
| 
 | |
|     # pylint: disable=too-many-arguments
 | |
|     async def do_crawl_finished_tasks(
 | |
|         self,
 | |
|         crawl_id: str,
 | |
|         cid: UUID,
 | |
|         oid: UUID,
 | |
|         files_added_size: int,
 | |
|         state: str,
 | |
|     ) -> None:
 | |
|         """Run tasks after crawl completes in asyncio.task coroutine."""
 | |
|         await self.crawl_config_ops.stats_recompute_last(cid, files_added_size, 1)
 | |
| 
 | |
|         if state in SUCCESSFUL_STATES and oid:
 | |
|             await self.org_ops.inc_org_bytes_stored(oid, files_added_size, "crawl")
 | |
|             await self.coll_ops.add_successful_crawl_to_collections(crawl_id, cid)
 | |
| 
 | |
|         if state in FAILED_STATES:
 | |
|             await self.crawl_ops.delete_crawl_files(crawl_id, oid)
 | |
|             await self.page_ops.delete_crawl_pages(crawl_id, oid)
 | |
| 
 | |
|         await self.event_webhook_ops.create_crawl_finished_notification(
 | |
|             crawl_id, oid, state
 | |
|         )
 | |
| 
 | |
|         # finally, delete job
 | |
|         await self.k8s.delete_crawl_job(crawl_id)
 | |
| 
 | |
|     async def inc_crawl_complete_stats(self, crawl, finished):
 | |
|         """Increment Crawl Stats"""
 | |
| 
 | |
|         started = from_k8s_date(crawl.started)
 | |
| 
 | |
|         duration = int((finished - started).total_seconds())
 | |
| 
 | |
|         print(f"Duration: {duration}", flush=True)
 | |
| 
 | |
|         await self.org_ops.inc_org_time_stats(crawl.oid, duration)
 | |
| 
 | |
|     async def mark_for_cancelation(self, crawl_id):
 | |
|         """mark crawl as canceled in redis"""
 | |
|         try:
 | |
|             redis_url = self.k8s.get_redis_url(crawl_id)
 | |
|             redis = await self._get_redis(redis_url)
 | |
|             if not redis:
 | |
|                 return False
 | |
| 
 | |
|             await redis.set(f"{crawl_id}:canceled", "1")
 | |
|             return True
 | |
|         finally:
 | |
|             if redis:
 | |
|                 await redis.close()
 |