browsertrix/backend/btrixcloud/crawl_updater.py
Ilya Kreymer 0c8a5a49b4 refactor to use docker swarm for local alternative to k8s instead of docker compose (#247):
- use python-on-whale to use docker cli api directly, creating docker stack for each crawl or profile browser
- configure storages via storages.yaml secret
- add crawl_job, profile_job, splitting into base and k8s/swarm implementations
- split manager into base crawlmanager and k8s/swarm implementations
- swarm: load initial scale from db to avoid modifying fixed configs, in k8s, load from configmap
- swarm: support scheduled jobs via swarm-cronjob service
- remove docker dependencies (aiodocker, apscheduler, scheduling)
- swarm: when using local minio, expose via /data/ route in nginx via extra include (in k8s, include dir is empty and routing handled via ingress)
- k8s: cleanup minio chart: move init containers to minio.yaml
- swarm: stateful set implementation to be consistent with k8s scaling:
  - don't use service replicas,
  - create a unique service with '-N' appended and allocate unique volume for each replica
  - allows crawl containers to be restarted w/o losing data
- add volume pruning background service, as volumes can be deleted only after service shuts down fully
- watch: fully simplify routing, route via replica index instead of ip for both k8s and swarm
- rename network btrix-cloud-net -> btrix-net to avoid conflict with compose network
2022-06-05 10:37:17 -07:00

272 lines
8.3 KiB
Python

""" Create and Update Running Crawl within Crawl Job """
import os
import json
import uuid
from datetime import datetime
import asyncio
from redis import asyncio as aioredis
import pymongo
from .db import init_db
from .crawls import Crawl, CrawlFile, CrawlCompleteIn, dt_now
# =============================================================================
# pylint: disable=too-many-instance-attributes,bare-except
class CrawlUpdater:
""" Crawl Update """
started: datetime
finished: datetime
def __init__(self, id_, job):
_, mdb = init_db()
self.archives = mdb["archives"]
self.crawls = mdb["crawls"]
self.crawl_configs = mdb["crawl_configs"]
self.crawl_id = id_
self.crawls_done_key = "crawls-done"
self.aid = uuid.UUID(os.environ["ARCHIVE_ID"])
self.cid = uuid.UUID(os.environ["CRAWL_CONFIG_ID"])
self.userid = uuid.UUID(os.environ["USER_ID"])
self.is_manual = os.environ.get("RUN_MANUAL") == "1"
self.scale = int(os.environ.get("INITIAL_SCALE") or 0)
self.storage_path = os.environ.get("STORE_PATH")
self.storage_name = os.environ.get("STORAGE_NAME")
self.last_done = None
self.last_found = None
self.redis = None
self.job = job
self.started = dt_now()
self.finished = None
async def init_crawl_updater(self, redis_url, scale=None):
""" init crawl, then init redis, wait for valid connection """
self.scale = scale
await self.init_crawl()
prev_start_time = None
retry = 3
# init redis
while True:
try:
self.redis = await aioredis.from_url(
redis_url, encoding="utf-8", decode_responses=True
)
prev_start_time = await self.redis.get("start_time")
print("Redis Connected!", flush=True)
break
except:
print(f"Retrying redis connection in {retry}", flush=True)
await asyncio.sleep(retry)
if prev_start_time:
try:
self.started = datetime.fromisoformat(prev_start_time)
except:
pass
else:
await self.redis.set("start_time", str(self.started))
# run redis loop
while True:
try:
result = await self.redis.blpop(self.crawls_done_key, timeout=5)
if result:
msg = json.loads(result[1])
# add completed file
if msg.get("filename"):
await self.add_file_to_crawl(msg)
# update stats
await self.update_running_crawl_stats(self.crawl_id)
# check crawl status
await self.check_crawl_status()
# pylint: disable=broad-except
except Exception as exc:
print(f"Retrying crawls done loop: {exc}")
await asyncio.sleep(10)
async def check_crawl_status(self):
""" check if crawl is done if all crawl workers have set their done state """
results = await self.redis.hvals(f"{self.crawl_id}:status")
# check if done
done = 0
for res in results:
if res == "done":
done += 1
else:
return
# check if done
if done >= self.scale:
await self.finish_crawl()
await self.job.delete_crawl()
async def update_scale(self, new_scale):
""" set scale dynamically of running crawl """
self.scale = new_scale
await self.update_crawl(scale=new_scale)
async def finish_crawl(self):
""" finish crawl """
if self.finished:
return
self.finished = dt_now()
completed = self.last_done and self.last_done == self.last_found
state = "complete" if completed else "partial_complete"
print("marking crawl as: " + state, flush=True)
await self.update_crawl(state=state, finished=self.finished)
if completed:
await self.inc_crawl_complete_stats(state)
async def inc_crawl_complete_stats(self, state):
""" Increment Crawl Stats """
duration = int((self.finished - self.started).total_seconds())
print(f"Duration: {duration}", flush=True)
# init crawl config stats
await self.crawl_configs.find_one_and_update(
{"_id": self.cid, "inactive": {"$ne": True}},
{
"$inc": {"crawlCount": 1},
"$set": {
"lastCrawlId": self.crawl_id,
"lastCrawlTime": self.finished,
"lastCrawlState": state,
},
},
)
# init archive crawl stats
yymm = datetime.utcnow().strftime("%Y-%m")
await self.archives.find_one_and_update(
{"_id": self.aid}, {"$inc": {f"usage.{yymm}": duration}}
)
async def update_running_crawl_stats(self, crawl_id):
""" update stats for running crawl """
done = await self.redis.llen(f"{crawl_id}:d")
found = await self.redis.scard(f"{crawl_id}:s")
if self.last_done == done and self.last_found == found:
return
stats = {"found": found, "done": done}
if not self.last_found and found:
await self.update_crawl(state="running", stats=stats)
else:
await self.update_crawl(stats=stats)
self.last_found = found
self.last_done = done
async def update_crawl(self, **kwargs):
""" update crawl state, and optionally mark as finished """
await self.crawls.find_one_and_update({"_id": self.crawl_id}, {"$set": kwargs})
async def init_crawl(self):
""" create crawl, doesn't exist, mark as starting """
try:
crawl = self._make_crawl("starting", self.scale)
await self.crawls.insert_one(crawl.to_dict())
except pymongo.errors.DuplicateKeyError:
await self.update_crawl(state="starting", scale=self.scale)
async def add_file_to_crawl(self, cc_data):
""" Handle finished CrawlFile to db """
filecomplete = CrawlCompleteIn(**cc_data)
inx = None
filename = None
if self.storage_path:
inx = filecomplete.filename.index(self.storage_path)
filename = filecomplete.filename[inx:] if inx > 0 else filecomplete.filename
# storage_name = job.metadata.annotations.get("btrix.storage_name")
def_storage_name = self.storage_name if inx else None
crawl_file = CrawlFile(
def_storage_name=def_storage_name,
filename=filename or filecomplete.filename,
size=filecomplete.size,
hash=filecomplete.hash,
)
await self.crawls.find_one_and_update(
{"_id": self.crawl_id},
{
"$push": {"files": crawl_file.dict()},
},
)
return True
async def stop_crawl(self, graceful=True):
""" mark crawl as stopped or canceled """
if graceful:
await self.update_crawl(state="stopping")
else:
self.finished = dt_now()
await self.update_crawl(state="canceled", finished=self.finished)
async def _get_running_stats(self, crawl_id):
""" get stats from redis for running or finished crawl """
async def load_initial_scale(self):
""" load scale from config if not set """
if self.scale:
return self.scale
try:
result = await self.crawl_configs.find_one(
{"_id": self.cid}, {"scale": True}
)
return result["scale"]
# pylint: disable=broad-except
except Exception as exc:
print(exc)
return 1
def _make_crawl(self, state, scale):
""" Create crawl object for partial or fully complete crawl """
return Crawl(
id=self.crawl_id,
state=state,
userid=self.userid,
aid=self.aid,
cid=self.cid,
manual=self.is_manual,
scale=scale,
started=self.started,
# colls=json.loads(job.metadata.annotations.get("btrix.colls", [])),
)