- run as child process using aioprocessing - cleanup: support cleanup of orphaned containers - timeout: support crawlTimeout via check in cleanup loop - support crawl listing + crawl stopping
		
			
				
	
	
		
			207 lines
		
	
	
		
			6.1 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			207 lines
		
	
	
		
			6.1 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| """ Crawl API """
 | |
| 
 | |
| import asyncio
 | |
| 
 | |
| from typing import Optional, List
 | |
| from datetime import datetime
 | |
| 
 | |
| from fastapi import Depends, HTTPException
 | |
| from pydantic import BaseModel
 | |
| import pymongo
 | |
| 
 | |
| from db import BaseMongoModel
 | |
| from archives import Archive
 | |
| 
 | |
| 
 | |
| # ============================================================================
 | |
| class DeleteCrawlList(BaseModel):
 | |
|     """ delete crawl list POST body """
 | |
| 
 | |
|     crawl_ids: List[str]
 | |
| 
 | |
| 
 | |
| # ============================================================================
 | |
| class Crawl(BaseMongoModel):
 | |
|     """ Store State of a Crawl (Finished or Running) """
 | |
| 
 | |
|     user: str
 | |
|     aid: str
 | |
|     cid: str
 | |
| 
 | |
|     schedule: Optional[str]
 | |
|     manual: Optional[bool]
 | |
| 
 | |
|     started: datetime
 | |
|     finished: Optional[datetime]
 | |
| 
 | |
|     state: str
 | |
| 
 | |
|     filename: Optional[str]
 | |
|     size: Optional[int]
 | |
|     hash: Optional[str]
 | |
| 
 | |
| 
 | |
| # ============================================================================
 | |
| class CrawlCompleteIn(BaseModel):
 | |
|     """ Completed Crawl Webhook POST message  """
 | |
| 
 | |
|     id: str
 | |
| 
 | |
|     user: str
 | |
| 
 | |
|     filename: str
 | |
|     size: int
 | |
|     hash: str
 | |
| 
 | |
|     completed: Optional[bool] = True
 | |
| 
 | |
| 
 | |
| # ============================================================================
 | |
| class CrawlOps:
 | |
|     """ Crawl Ops """
 | |
| 
 | |
|     def __init__(self, mdb, crawl_manager, crawl_configs, archives):
 | |
|         self.crawls = mdb["crawls"]
 | |
|         self.crawl_manager = crawl_manager
 | |
|         self.crawl_configs = crawl_configs
 | |
|         self.archives = archives
 | |
| 
 | |
|         self.crawl_manager.set_crawl_ops(self)
 | |
| 
 | |
|     async def on_handle_crawl_complete(self, msg: CrawlCompleteIn):
 | |
|         """ Handle completed crawl, add to crawls db collection, also update archive usage """
 | |
|         crawl = await self.crawl_manager.validate_crawl_complete(msg)
 | |
|         if not crawl:
 | |
|             print("Not a valid crawl complete msg!", flush=True)
 | |
|             return
 | |
| 
 | |
|         await self.store_crawl(crawl, update_existing=True)
 | |
| 
 | |
|     async def store_crawl(self, crawl: Crawl, update_existing=False):
 | |
|         """ Add finished crawl to db, increment archive usage """
 | |
|         if update_existing:
 | |
|             await self.crawls.find_one_and_replace(
 | |
|                 {"_id": crawl.id}, crawl.to_dict(), upsert=True
 | |
|             )
 | |
|         else:
 | |
|             try:
 | |
|                 await self.crawls.insert_one(crawl.to_dict())
 | |
|             except pymongo.errors.DuplicateKeyError:
 | |
|                 # print(f"Crawl Already Added: {crawl.id} - {crawl.state}")
 | |
|                 return False
 | |
| 
 | |
|         dura = int((crawl.finished - crawl.started).total_seconds())
 | |
| 
 | |
|         print(crawl, flush=True)
 | |
|         print(f"Duration: {dura}", flush=True)
 | |
| 
 | |
|         await self.archives.inc_usage(crawl.aid, dura)
 | |
| 
 | |
|         await self.crawl_configs.inc_crawls(crawl.cid)
 | |
| 
 | |
|     async def list_crawls(self, aid: str, cid: str = None):
 | |
|         """Get all crawl configs for an archive is a member of"""
 | |
|         query = {"aid": aid}
 | |
|         if cid:
 | |
|             query["cid"] = cid
 | |
| 
 | |
|         cursor = self.crawls.find(query)
 | |
|         results = await cursor.to_list(length=1000)
 | |
|         return [Crawl.from_dict(res) for res in results]
 | |
| 
 | |
|     async def delete_crawls(self, aid: str, delete_list: DeleteCrawlList):
 | |
|         """ Delete a list of crawls by id for given archive """
 | |
|         res = await self.crawls.delete_many(
 | |
|             {"_id": {"$in": delete_list.crawl_ids}, "aid": aid}
 | |
|         )
 | |
|         return res.deleted_count
 | |
| 
 | |
| 
 | |
| # ============================================================================
 | |
| def init_crawls_api(app, mdb, crawl_manager, crawl_config_ops, archives):
 | |
|     """ API for crawl management, including crawl done callback"""
 | |
| 
 | |
|     ops = CrawlOps(mdb, crawl_manager, crawl_config_ops, archives)
 | |
| 
 | |
|     archive_crawl_dep = archives.archive_crawl_dep
 | |
| 
 | |
|     @app.post("/crawls/done", tags=["crawls"])
 | |
|     async def crawl_done(msg: CrawlCompleteIn):
 | |
|         loop = asyncio.get_running_loop()
 | |
|         loop.create_task(ops.on_handle_crawl_complete(msg))
 | |
| 
 | |
|         return {"success": True}
 | |
| 
 | |
|     @app.get("/archives/{aid}/crawls", tags=["crawls"])
 | |
|     async def list_crawls(archive: Archive = Depends(archive_crawl_dep)):
 | |
|         aid = str(archive.id)
 | |
| 
 | |
|         running_crawls = await crawl_manager.list_running_crawls(aid=aid)
 | |
| 
 | |
|         finished_crawls = await ops.list_crawls(aid)
 | |
| 
 | |
|         return {
 | |
|             "running": [
 | |
|                 crawl.dict(exclude_none=True, exclude_unset=True)
 | |
|                 for crawl in running_crawls
 | |
|             ],
 | |
|             "finished": finished_crawls,
 | |
|         }
 | |
| 
 | |
|     @app.post(
 | |
|         "/archives/{aid}/crawls/{crawl_id}/cancel",
 | |
|         tags=["crawls"],
 | |
|     )
 | |
|     async def crawl_cancel_stop(
 | |
|         crawl_id, archive: Archive = Depends(archive_crawl_dep)
 | |
|     ):
 | |
|         try:
 | |
|             crawl = await crawl_manager.stop_crawl(crawl_id, archive.id, graceful=False)
 | |
|             if not crawl:
 | |
|                 raise HTTPException(
 | |
|                     status_code=404, detail=f"Crawl not found: {crawl_id}"
 | |
|                 )
 | |
| 
 | |
|             await ops.store_crawl(crawl)
 | |
| 
 | |
|         except HTTPException as httpe:
 | |
|             raise httpe
 | |
| 
 | |
|         except Exception as exc:
 | |
|             # pylint: disable=raise-missing-from
 | |
|             raise HTTPException(status_code=400, detail=f"Error Canceling Crawl: {exc}")
 | |
| 
 | |
|         return {"canceled": True}
 | |
| 
 | |
|     @app.post(
 | |
|         "/archives/{aid}/crawls/{crawl_id}/stop",
 | |
|         tags=["crawls"],
 | |
|     )
 | |
|     async def crawl_graceful_stop(
 | |
|         crawl_id, archive: Archive = Depends(archive_crawl_dep)
 | |
|     ):
 | |
|         try:
 | |
|             canceled = await crawl_manager.stop_crawl(
 | |
|                 crawl_id, archive.id, graceful=True
 | |
|             )
 | |
|             if not canceled:
 | |
|                 raise HTTPException(
 | |
|                     status_code=404, detail=f"Crawl not found: {crawl_id}"
 | |
|                 )
 | |
| 
 | |
|         except HTTPException as httpe:
 | |
|             raise httpe
 | |
| 
 | |
|         except Exception as exc:
 | |
|             # pylint: disable=raise-missing-from
 | |
|             raise HTTPException(status_code=400, detail=f"Error Stopping Crawl: {exc}")
 | |
| 
 | |
|         return {"stopped_gracefully": True}
 | |
| 
 | |
|     @app.post("/archives/{aid}/crawls/delete", tags=["crawls"])
 | |
|     async def delete_crawls(
 | |
|         delete_list: DeleteCrawlList, archive: Archive = Depends(archive_crawl_dep)
 | |
|     ):
 | |
|         res = await ops.delete_crawls(archive.id, delete_list)
 | |
|         return {"deleted": res}
 |