diff --git a/backend/btrixcloud/crawls.py b/backend/btrixcloud/crawls.py index f103bde5..5e62553c 100644 --- a/backend/btrixcloud/crawls.py +++ b/backend/btrixcloud/crawls.py @@ -3,12 +3,15 @@ import asyncio import uuid import os +import json +import re from typing import Optional, List, Dict, Union from datetime import datetime, timedelta from fastapi import Depends, HTTPException from pydantic import BaseModel, UUID4, conint +from redis import asyncio as aioredis, exceptions import pymongo @@ -20,21 +23,21 @@ from .storages import get_presigned_url # ============================================================================ class DeleteCrawlList(BaseModel): - """ delete crawl list POST body """ + """delete crawl list POST body""" crawl_ids: List[str] # ============================================================================ class CrawlScale(BaseModel): - """ scale the crawl to N parallel containers """ + """scale the crawl to N parallel containers""" scale: conint(ge=1, le=MAX_CRAWL_SCALE) = 1 # ============================================================================ class CrawlFile(BaseModel): - """ file from a crawl """ + """file from a crawl""" filename: str hash: str @@ -47,7 +50,7 @@ class CrawlFile(BaseModel): # ============================================================================ class CrawlFileOut(BaseModel): - """ output for file from a crawl (conformance to Data Resource Spec) """ + """output for file from a crawl (conformance to Data Resource Spec)""" name: str path: str @@ -57,7 +60,7 @@ class CrawlFileOut(BaseModel): # ============================================================================ class Crawl(BaseMongoModel): - """ Store State of a Crawl (Finished or Running) """ + """Store State of a Crawl (Finished or Running)""" id: str @@ -85,7 +88,7 @@ class Crawl(BaseMongoModel): # ============================================================================ class CrawlOut(Crawl): - """ Output for single crawl, add configName and userName""" + """Output for single crawl, add configName and userName""" userName: Optional[str] configName: Optional[str] @@ -96,7 +99,7 @@ class CrawlOut(Crawl): # ============================================================================ class ListCrawlOut(BaseMongoModel): - """ Crawl output model for list view """ + """Crawl output model for list view""" id: str @@ -124,14 +127,14 @@ class ListCrawlOut(BaseMongoModel): # ============================================================================ class ListCrawls(BaseModel): - """ Response model for list of crawls """ + """Response model for list of crawls""" crawls: List[ListCrawlOut] # ============================================================================ class CrawlCompleteIn(BaseModel): - """ Completed Crawl Webhook POST message """ + """Completed Crawl Webhook POST message""" id: str @@ -146,7 +149,7 @@ class CrawlCompleteIn(BaseModel): # ============================================================================ class CrawlOps: - """ Crawl Ops """ + """Crawl Ops""" # pylint: disable=too-many-arguments, too-many-instance-attributes def __init__(self, mdb, users, crawl_manager, crawl_configs, archives): @@ -155,6 +158,7 @@ class CrawlOps: self.crawl_configs = crawl_configs self.user_manager = users self.archives = archives + self.namespace = os.environ.get("CRAWLER_NAMESPACE") or "crawlers" self.crawl_configs.set_crawl_ops(self) @@ -163,7 +167,7 @@ class CrawlOps: asyncio.create_task(self.init_index()) async def init_index(self): - """ init index for crawls db """ + """init index for crawls db""" await self.crawls.create_index("colls") async def list_crawls( @@ -174,7 +178,7 @@ class CrawlOps: exclude_files=True, running_only=False, ): - """List all finished crawls from the db """ + """List all finished crawls from the db""" aid = archive.id if archive else None @@ -233,7 +237,7 @@ class CrawlOps: return crawls async def get_crawl_raw(self, crawlid: str, archive: Archive): - """ Get data for single crawl """ + """Get data for single crawl""" query = {"_id": crawlid} if archive: @@ -247,7 +251,7 @@ class CrawlOps: return res async def get_crawl(self, crawlid: str, archive: Archive): - """ Get data for single crawl """ + """Get data for single crawl""" res = await self.get_crawl_raw(crawlid, archive) @@ -268,7 +272,7 @@ class CrawlOps: async def _resolve_crawl_refs( self, crawl: Union[CrawlOut, ListCrawlOut], archive: Archive ): - """ Resolve running crawl data """ + """Resolve running crawl data""" config = await self.crawl_configs.get_crawl_config( crawl.cid, archive, active_only=False ) @@ -331,14 +335,14 @@ class CrawlOps: await self.crawls.find_one_and_update(*update) async def delete_crawls(self, aid: uuid.UUID, delete_list: DeleteCrawlList): - """ Delete a list of crawls by id for given archive """ + """Delete a list of crawls by id for given archive""" res = await self.crawls.delete_many( {"_id": {"$in": delete_list.crawl_ids}, "aid": aid} ) return res.deleted_count async def add_new_crawl(self, crawl_id: str, crawlconfig): - """ initialize new crawl """ + """initialize new crawl""" crawl = Crawl( id=crawl_id, state="starting", @@ -358,7 +362,7 @@ class CrawlOps: return False async def update_crawl_state(self, crawl_id: str, state: str): - """ called only when job container is being stopped/canceled """ + """called only when job container is being stopped/canceled""" data = {"state": state} # if cancelation, set the finish time here @@ -374,7 +378,7 @@ class CrawlOps: ) async def shutdown_crawl(self, crawl_id: str, archive: Archive, graceful: bool): - """ stop or cancel specified crawl """ + """stop or cancel specified crawl""" result = None try: result = await self.crawl_manager.shutdown_crawl( @@ -405,13 +409,73 @@ class CrawlOps: # return whatever detail may be included in the response raise HTTPException(status_code=400, detail=result.get("error")) + async def get_crawl_queue(self, crawl_id, offset, count, regex): + """ get crawl queue """ + + total = 0 + results = [] + redis = None + + try: + redis = await aioredis.from_url( + self.get_redis_url(crawl_id), encoding="utf-8", decode_responses=True + ) + + total = await redis.llen(f"{crawl_id}:q") + results = await redis.lrange(f"{crawl_id}:q", offset, count) + results = [json.loads(result)["url"] for result in results] + except exceptions.ConnectionError: + # can't connect to redis, likely not initialized yet + pass + + matched = [] + if regex: + regex = re.compile(regex) + matched = [result for result in results if regex.search(result)] + + return {"total": total, "results": results, "matched": matched} + + async def match_crawl_queue(self, crawl_id, regex): + """ get crawl queue """ + + total = 0 + + try: + redis = await aioredis.from_url( + self.get_redis_url(crawl_id), encoding="utf-8", decode_responses=True + ) + + total = await redis.llen(f"{crawl_id}:q") + except exceptions.ConnectionError: + # can't connect to redis, likely not initialized yet + pass + + matched = [] + regex = re.compile(regex) + + step = 50 + + for count in range(0, total, step): + results = await redis.lrange(f"{crawl_id}:q", count, count + step) + for result in results: + url = json.loads(result)["url"] + if regex.search(url): + matched.append(url) + + return {"total": total, "matched": matched} + + def get_redis_url(self, crawl_id): + """ get redis url for crawl id """ + # pylint: disable=line-too-long + return f"redis://redis-{crawl_id}-0.redis-{crawl_id}.{self.namespace}.svc.cluster.local/0" + # ============================================================================ # pylint: disable=too-many-arguments, too-many-locals def init_crawls_api( app, mdb, users, crawl_manager, crawl_config_ops, archives, user_dep ): - """ API for crawl management, including crawl done callback""" + """API for crawl management, including crawl done callback""" ops = CrawlOps(mdb, users, crawl_manager, crawl_config_ops, archives) @@ -505,14 +569,40 @@ def init_crawls_api( if await ops.get_crawl_raw(crawl_id, archive): return {} + @app.get( + "/archives/{aid}/crawls/{crawl_id}/queue", + tags=["crawls"], + ) + async def get_crawl_queue( + crawl_id, + offset: int, + count: int, + regex: Optional[str] = "", + archive: Archive = Depends(archive_crawl_dep), + ): + await ops.get_crawl_raw(crawl_id, archive) + + return await ops.get_crawl_queue(crawl_id, offset, count, regex) + + @app.get( + "/archives/{aid}/crawls/{crawl_id}/queueMatchAll", + tags=["crawls"], + ) + async def match_crawl_queue( + crawl_id, regex: str, archive: Archive = Depends(archive_crawl_dep) + ): + await ops.get_crawl_raw(crawl_id, archive) + + return await ops.match_crawl_queue(crawl_id, regex) + return ops def dt_now(): - """ get current ts """ + """get current ts""" return datetime.utcnow().replace(microsecond=0, tzinfo=None) def ts_now(): - """ get current ts """ + """get current ts""" return str(dt_now())