Crawl Queue API (#342)

* crawl queue api work: (#329)
- add api to /crawls/{crawl_id}/queue api to get crawl queue, with offset, count, and optional regex. returns results and regex matches within the results, along with total urls in queue.
- add api to match entire crawl queue, /crawls/{crawl_id}/queueMatch with query 'regex' arg, which processes entire crawl queue on backend and returns a list of matches (more experimental)
- if crawl not yet started / redis not available, return empty queue
- only supported for k8s deployment at the moment
This commit is contained in:
Ilya Kreymer 2022-10-12 19:56:13 -07:00 committed by GitHub
parent 8708c24a74
commit f7836c345d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -3,12 +3,15 @@
import asyncio import asyncio
import uuid import uuid
import os import os
import json
import re
from typing import Optional, List, Dict, Union from typing import Optional, List, Dict, Union
from datetime import datetime, timedelta from datetime import datetime, timedelta
from fastapi import Depends, HTTPException from fastapi import Depends, HTTPException
from pydantic import BaseModel, UUID4, conint from pydantic import BaseModel, UUID4, conint
from redis import asyncio as aioredis, exceptions
import pymongo import pymongo
@ -20,21 +23,21 @@ from .storages import get_presigned_url
# ============================================================================ # ============================================================================
class DeleteCrawlList(BaseModel): class DeleteCrawlList(BaseModel):
""" delete crawl list POST body """ """delete crawl list POST body"""
crawl_ids: List[str] crawl_ids: List[str]
# ============================================================================ # ============================================================================
class CrawlScale(BaseModel): class CrawlScale(BaseModel):
""" scale the crawl to N parallel containers """ """scale the crawl to N parallel containers"""
scale: conint(ge=1, le=MAX_CRAWL_SCALE) = 1 scale: conint(ge=1, le=MAX_CRAWL_SCALE) = 1
# ============================================================================ # ============================================================================
class CrawlFile(BaseModel): class CrawlFile(BaseModel):
""" file from a crawl """ """file from a crawl"""
filename: str filename: str
hash: str hash: str
@ -47,7 +50,7 @@ class CrawlFile(BaseModel):
# ============================================================================ # ============================================================================
class CrawlFileOut(BaseModel): class CrawlFileOut(BaseModel):
""" output for file from a crawl (conformance to Data Resource Spec) """ """output for file from a crawl (conformance to Data Resource Spec)"""
name: str name: str
path: str path: str
@ -57,7 +60,7 @@ class CrawlFileOut(BaseModel):
# ============================================================================ # ============================================================================
class Crawl(BaseMongoModel): class Crawl(BaseMongoModel):
""" Store State of a Crawl (Finished or Running) """ """Store State of a Crawl (Finished or Running)"""
id: str id: str
@ -85,7 +88,7 @@ class Crawl(BaseMongoModel):
# ============================================================================ # ============================================================================
class CrawlOut(Crawl): class CrawlOut(Crawl):
""" Output for single crawl, add configName and userName""" """Output for single crawl, add configName and userName"""
userName: Optional[str] userName: Optional[str]
configName: Optional[str] configName: Optional[str]
@ -96,7 +99,7 @@ class CrawlOut(Crawl):
# ============================================================================ # ============================================================================
class ListCrawlOut(BaseMongoModel): class ListCrawlOut(BaseMongoModel):
""" Crawl output model for list view """ """Crawl output model for list view"""
id: str id: str
@ -124,14 +127,14 @@ class ListCrawlOut(BaseMongoModel):
# ============================================================================ # ============================================================================
class ListCrawls(BaseModel): class ListCrawls(BaseModel):
""" Response model for list of crawls """ """Response model for list of crawls"""
crawls: List[ListCrawlOut] crawls: List[ListCrawlOut]
# ============================================================================ # ============================================================================
class CrawlCompleteIn(BaseModel): class CrawlCompleteIn(BaseModel):
""" Completed Crawl Webhook POST message """ """Completed Crawl Webhook POST message"""
id: str id: str
@ -146,7 +149,7 @@ class CrawlCompleteIn(BaseModel):
# ============================================================================ # ============================================================================
class CrawlOps: class CrawlOps:
""" Crawl Ops """ """Crawl Ops"""
# pylint: disable=too-many-arguments, too-many-instance-attributes # pylint: disable=too-many-arguments, too-many-instance-attributes
def __init__(self, mdb, users, crawl_manager, crawl_configs, archives): def __init__(self, mdb, users, crawl_manager, crawl_configs, archives):
@ -155,6 +158,7 @@ class CrawlOps:
self.crawl_configs = crawl_configs self.crawl_configs = crawl_configs
self.user_manager = users self.user_manager = users
self.archives = archives self.archives = archives
self.namespace = os.environ.get("CRAWLER_NAMESPACE") or "crawlers"
self.crawl_configs.set_crawl_ops(self) self.crawl_configs.set_crawl_ops(self)
@ -163,7 +167,7 @@ class CrawlOps:
asyncio.create_task(self.init_index()) asyncio.create_task(self.init_index())
async def init_index(self): async def init_index(self):
""" init index for crawls db """ """init index for crawls db"""
await self.crawls.create_index("colls") await self.crawls.create_index("colls")
async def list_crawls( async def list_crawls(
@ -174,7 +178,7 @@ class CrawlOps:
exclude_files=True, exclude_files=True,
running_only=False, running_only=False,
): ):
"""List all finished crawls from the db """ """List all finished crawls from the db"""
aid = archive.id if archive else None aid = archive.id if archive else None
@ -233,7 +237,7 @@ class CrawlOps:
return crawls return crawls
async def get_crawl_raw(self, crawlid: str, archive: Archive): async def get_crawl_raw(self, crawlid: str, archive: Archive):
""" Get data for single crawl """ """Get data for single crawl"""
query = {"_id": crawlid} query = {"_id": crawlid}
if archive: if archive:
@ -247,7 +251,7 @@ class CrawlOps:
return res return res
async def get_crawl(self, crawlid: str, archive: Archive): async def get_crawl(self, crawlid: str, archive: Archive):
""" Get data for single crawl """ """Get data for single crawl"""
res = await self.get_crawl_raw(crawlid, archive) res = await self.get_crawl_raw(crawlid, archive)
@ -268,7 +272,7 @@ class CrawlOps:
async def _resolve_crawl_refs( async def _resolve_crawl_refs(
self, crawl: Union[CrawlOut, ListCrawlOut], archive: Archive self, crawl: Union[CrawlOut, ListCrawlOut], archive: Archive
): ):
""" Resolve running crawl data """ """Resolve running crawl data"""
config = await self.crawl_configs.get_crawl_config( config = await self.crawl_configs.get_crawl_config(
crawl.cid, archive, active_only=False crawl.cid, archive, active_only=False
) )
@ -331,14 +335,14 @@ class CrawlOps:
await self.crawls.find_one_and_update(*update) await self.crawls.find_one_and_update(*update)
async def delete_crawls(self, aid: uuid.UUID, delete_list: DeleteCrawlList): async def delete_crawls(self, aid: uuid.UUID, delete_list: DeleteCrawlList):
""" Delete a list of crawls by id for given archive """ """Delete a list of crawls by id for given archive"""
res = await self.crawls.delete_many( res = await self.crawls.delete_many(
{"_id": {"$in": delete_list.crawl_ids}, "aid": aid} {"_id": {"$in": delete_list.crawl_ids}, "aid": aid}
) )
return res.deleted_count return res.deleted_count
async def add_new_crawl(self, crawl_id: str, crawlconfig): async def add_new_crawl(self, crawl_id: str, crawlconfig):
""" initialize new crawl """ """initialize new crawl"""
crawl = Crawl( crawl = Crawl(
id=crawl_id, id=crawl_id,
state="starting", state="starting",
@ -358,7 +362,7 @@ class CrawlOps:
return False return False
async def update_crawl_state(self, crawl_id: str, state: str): async def update_crawl_state(self, crawl_id: str, state: str):
""" called only when job container is being stopped/canceled """ """called only when job container is being stopped/canceled"""
data = {"state": state} data = {"state": state}
# if cancelation, set the finish time here # if cancelation, set the finish time here
@ -374,7 +378,7 @@ class CrawlOps:
) )
async def shutdown_crawl(self, crawl_id: str, archive: Archive, graceful: bool): async def shutdown_crawl(self, crawl_id: str, archive: Archive, graceful: bool):
""" stop or cancel specified crawl """ """stop or cancel specified crawl"""
result = None result = None
try: try:
result = await self.crawl_manager.shutdown_crawl( result = await self.crawl_manager.shutdown_crawl(
@ -405,13 +409,73 @@ class CrawlOps:
# return whatever detail may be included in the response # return whatever detail may be included in the response
raise HTTPException(status_code=400, detail=result.get("error")) raise HTTPException(status_code=400, detail=result.get("error"))
async def get_crawl_queue(self, crawl_id, offset, count, regex):
""" get crawl queue """
total = 0
results = []
redis = None
try:
redis = await aioredis.from_url(
self.get_redis_url(crawl_id), encoding="utf-8", decode_responses=True
)
total = await redis.llen(f"{crawl_id}:q")
results = await redis.lrange(f"{crawl_id}:q", offset, count)
results = [json.loads(result)["url"] for result in results]
except exceptions.ConnectionError:
# can't connect to redis, likely not initialized yet
pass
matched = []
if regex:
regex = re.compile(regex)
matched = [result for result in results if regex.search(result)]
return {"total": total, "results": results, "matched": matched}
async def match_crawl_queue(self, crawl_id, regex):
""" get crawl queue """
total = 0
try:
redis = await aioredis.from_url(
self.get_redis_url(crawl_id), encoding="utf-8", decode_responses=True
)
total = await redis.llen(f"{crawl_id}:q")
except exceptions.ConnectionError:
# can't connect to redis, likely not initialized yet
pass
matched = []
regex = re.compile(regex)
step = 50
for count in range(0, total, step):
results = await redis.lrange(f"{crawl_id}:q", count, count + step)
for result in results:
url = json.loads(result)["url"]
if regex.search(url):
matched.append(url)
return {"total": total, "matched": matched}
def get_redis_url(self, crawl_id):
""" get redis url for crawl id """
# pylint: disable=line-too-long
return f"redis://redis-{crawl_id}-0.redis-{crawl_id}.{self.namespace}.svc.cluster.local/0"
# ============================================================================ # ============================================================================
# pylint: disable=too-many-arguments, too-many-locals # pylint: disable=too-many-arguments, too-many-locals
def init_crawls_api( def init_crawls_api(
app, mdb, users, crawl_manager, crawl_config_ops, archives, user_dep app, mdb, users, crawl_manager, crawl_config_ops, archives, user_dep
): ):
""" API for crawl management, including crawl done callback""" """API for crawl management, including crawl done callback"""
ops = CrawlOps(mdb, users, crawl_manager, crawl_config_ops, archives) ops = CrawlOps(mdb, users, crawl_manager, crawl_config_ops, archives)
@ -505,14 +569,40 @@ def init_crawls_api(
if await ops.get_crawl_raw(crawl_id, archive): if await ops.get_crawl_raw(crawl_id, archive):
return {} return {}
@app.get(
"/archives/{aid}/crawls/{crawl_id}/queue",
tags=["crawls"],
)
async def get_crawl_queue(
crawl_id,
offset: int,
count: int,
regex: Optional[str] = "",
archive: Archive = Depends(archive_crawl_dep),
):
await ops.get_crawl_raw(crawl_id, archive)
return await ops.get_crawl_queue(crawl_id, offset, count, regex)
@app.get(
"/archives/{aid}/crawls/{crawl_id}/queueMatchAll",
tags=["crawls"],
)
async def match_crawl_queue(
crawl_id, regex: str, archive: Archive = Depends(archive_crawl_dep)
):
await ops.get_crawl_raw(crawl_id, archive)
return await ops.match_crawl_queue(crawl_id, regex)
return ops return ops
def dt_now(): def dt_now():
""" get current ts """ """get current ts"""
return datetime.utcnow().replace(microsecond=0, tzinfo=None) return datetime.utcnow().replace(microsecond=0, tzinfo=None)
def ts_now(): def ts_now():
""" get current ts """ """get current ts"""
return str(dt_now()) return str(dt_now())