add completed crawls to crawls table

This commit is contained in:
Ilya Kreymer 2021-08-20 23:52:24 -07:00
parent 4b08163ead
commit ea9010bf9a
3 changed files with 33 additions and 16 deletions

View File

@ -226,7 +226,7 @@ class ArchiveOps:
res = await self.archives.find_one_and_update( res = await self.archives.find_one_and_update(
{"_id": aid}, {"$inc": {f"usage.{yymm}": amount}} {"_id": aid}, {"$inc": {f"usage.{yymm}": amount}}
) )
print(res) print(res, flush=True)
return res is not None return res is not None

View File

@ -5,11 +5,10 @@ import asyncio
from typing import Optional from typing import Optional
from datetime import datetime from datetime import datetime
from pydantic import BaseModel from db import BaseMongoModel
# ============================================================================ # ============================================================================
class CrawlComplete(BaseModel): class CrawlComplete(BaseMongoModel):
""" Store State of Completed Crawls """ """ Store State of Completed Crawls """
id: str id: str
@ -27,29 +26,43 @@ class CrawlComplete(BaseModel):
# ============================================================================ # ============================================================================
def init_crawls_api(app, crawl_manager, users, archives): class CrawlOps:
""" API for crawl management, including crawl done callback""" """ Crawl Ops """
async def on_handle_crawl_complete(msg: CrawlComplete): def __init__(self, mdb, crawl_manager, users, archives):
if not await crawl_manager.validate_crawl_complete(msg): self.crawls = mdb["crawls"]
self.crawl_manager = crawl_manager
self.users = users
self.archives = archives
async def on_handle_crawl_complete(self, msg: CrawlComplete):
""" Handle completed crawl, add to crawls db collection, also update archive usage """
if not await self.crawl_manager.validate_crawl_complete(msg):
print("Not a valid crawl complete msg!", flush=True) print("Not a valid crawl complete msg!", flush=True)
return return
print(msg, flush=True) print(msg, flush=True)
await self.crawls.insert_one(msg.to_dict())
dura = int((msg.finished - msg.started).total_seconds()) dura = int((msg.finished - msg.started).total_seconds())
print(f"Duration: {dura}", flush=True) print(f"Duration: {dura}", flush=True)
await users.inc_usage(msg.user, dura) await self.archives.inc_usage(msg.aid, dura)
await archives.inc_usage(msg.aid, dura)
async def delete_crawl(self, cid: str, aid: str):
""" Delete crawl by id """
return await self.crawls.delete_one({"_id": cid, "aid": aid})
# ============================================================================
def init_crawls_api(app, mdb, crawl_manager, users, archives):
""" API for crawl management, including crawl done callback"""
ops = CrawlOps(mdb, crawl_manager, users, archives)
@app.post("/crawls/done") @app.post("/crawls/done")
async def webhook(msg: CrawlComplete): async def webhook(msg: CrawlComplete):
# background_tasks.add_task(on_handle_crawl_complete, msg)
# asyncio.ensure_future(on_handle_crawl_complete(msg))
loop = asyncio.get_running_loop() loop = asyncio.get_running_loop()
loop.create_task(on_handle_crawl_complete(msg)) loop.create_task(ops.on_handle_crawl_complete(msg))
# await on_handle_crawl_complete(msg)
return {"message": "webhook received"} return {"message": "webhook received"}

View File

@ -73,7 +73,11 @@ class BrowsertrixAPI:
) )
init_crawls_api( init_crawls_api(
self.app, self.crawl_manager, self.fastapi_users.db, self.archive_ops self.app,
self.mdb,
self.crawl_manager,
self.fastapi_users.db,
self.archive_ops,
) )
self.app.include_router(self.archive_ops.router) self.app.include_router(self.archive_ops.router)