backend work:

- support {configname}-{username}-@ts-@hostsuffix.wacz as output filename, sanitize username and config name
- support returning 'starting' for crawl status if no ips or 0/0 pages found.
- fix updating scale via POST crawlconfig update
- fix duplicate user error on superuser init
This commit is contained in:
Ilya Kreymer 2022-03-15 18:18:22 -07:00
parent 4b2f89db91
commit e6467c3374
5 changed files with 57 additions and 34 deletions

View File

@ -6,6 +6,7 @@ from typing import List, Union, Optional
from enum import Enum from enum import Enum
import uuid import uuid
import asyncio import asyncio
import re
from datetime import datetime from datetime import datetime
import pymongo import pymongo
@ -169,6 +170,7 @@ class CrawlConfigOps:
) )
self.coll_ops = None self.coll_ops = None
self._file_rx = re.compile("\\W+")
asyncio.create_task(self.init_index()) asyncio.create_task(self.init_index())
@ -182,6 +184,10 @@ class CrawlConfigOps:
""" set collection ops """ """ set collection ops """
self.coll_ops = coll_ops self.coll_ops = coll_ops
def sanitize(self, string=""):
""" sanitize string for use in wacz filename"""
return self._file_rx.sub("-", string.lower())
async def add_crawl_config( async def add_crawl_config(
self, config: CrawlConfigIn, archive: Archive, user: User self, config: CrawlConfigIn, archive: Archive, user: User
): ):
@ -216,8 +222,14 @@ class CrawlConfigOps:
crawlconfig = CrawlConfig.from_dict(data) crawlconfig = CrawlConfig.from_dict(data)
# pylint: disable=line-too-long
out_filename = f"{self.sanitize(crawlconfig.name)}-{self.sanitize(user.name)}-@ts-@hostsuffix.wacz"
new_name = await self.crawl_manager.add_crawl_config( new_name = await self.crawl_manager.add_crawl_config(
crawlconfig=crawlconfig, storage=archive.storage, run_now=config.runNow crawlconfig=crawlconfig,
storage=archive.storage,
run_now=config.runNow,
out_filename=out_filename,
) )
return result, new_name return result, new_name
@ -234,10 +246,10 @@ class CrawlConfigOps:
raise HTTPException(status_code=400, detail="no_update_data") raise HTTPException(status_code=400, detail="no_update_data")
# update schedule in crawl manager first # update schedule in crawl manager first
if update.schedule is not None: if update.schedule is not None or update.scale is not None:
try: try:
await self.crawl_manager.update_crawl_schedule( await self.crawl_manager.update_crawl_schedule_or_scale(
str(cid), update.schedule str(cid), update.schedule, update.scale
) )
except Exception: except Exception:
# pylint: disable=raise-missing-from # pylint: disable=raise-missing-from
@ -443,14 +455,6 @@ def init_crawl_config_api(
): ):
return await ops.update_crawl_config(uuid.UUID(cid), update) return await ops.update_crawl_config(uuid.UUID(cid), update)
# depcreated: to remove in favor of general patch
@router.patch("/{cid}/schedule", dependencies=[Depends(archive_crawl_dep)])
async def update_crawl_schedule(
update: UpdateCrawlConfig,
cid: str,
):
return await ops.update_crawl_config(uuid.UUID(cid), update)
@router.post("/{cid}/run") @router.post("/{cid}/run")
async def run_now( async def run_now(
cid: str, cid: str,

View File

@ -422,6 +422,8 @@ class CrawlOps:
for crawl, (done, total) in zip(crawl_list, pairwise(results)): for crawl, (done, total) in zip(crawl_list, pairwise(results)):
crawl.stats = {"done": done, "found": total} crawl.stats = {"done": done, "found": total}
if total == 0 and done == 0 and crawl.state == "running":
crawl.state = "starting"
async def cache_ips(self, crawl: CrawlOut): async def cache_ips(self, crawl: CrawlOut):
""" cache ips for ws auth check """ """ cache ips for ws auth check """

View File

@ -170,7 +170,8 @@ class DockerManager:
async def update_archive_storage(self, aid, userid, storage): async def update_archive_storage(self, aid, userid, storage):
""" No storage kept for docker manager """ """ No storage kept for docker manager """
async def add_crawl_config(self, crawlconfig, storage, run_now): # pylint: disable=unused-argument
async def add_crawl_config(self, crawlconfig, storage, run_now, out_filename):
""" Add new crawl config """ """ Add new crawl config """
cid = str(crawlconfig.id) cid = str(crawlconfig.id)
userid = str(crawlconfig.userid) userid = str(crawlconfig.userid)
@ -212,7 +213,7 @@ class DockerManager:
return "" return ""
async def update_crawl_schedule(self, cid, schedule): async def update_crawl_schedule_or_scale(self, cid, schedule=None, scale=None):
""" Update the schedule for existing crawl config """ """ Update the schedule for existing crawl config """
if schedule: if schedule:

View File

@ -164,7 +164,7 @@ class K8SManager:
name=archive_storage_name, namespace=self.namespace, body=crawl_secret name=archive_storage_name, namespace=self.namespace, body=crawl_secret
) )
async def add_crawl_config(self, crawlconfig, storage, run_now): async def add_crawl_config(self, crawlconfig, storage, run_now, out_filename):
"""add new crawl as cron job, store crawl config in configmap""" """add new crawl as cron job, store crawl config in configmap"""
cid = str(crawlconfig.id) cid = str(crawlconfig.id)
userid = str(crawlconfig.userid) userid = str(crawlconfig.userid)
@ -209,6 +209,7 @@ class K8SManager:
storage_path, storage_path,
labels, labels,
annotations, annotations,
out_filename,
crawlconfig.crawlTimeout, crawlconfig.crawlTimeout,
crawlconfig.scale, crawlconfig.scale,
) )
@ -242,8 +243,8 @@ class K8SManager:
return "" return ""
async def update_crawl_schedule(self, cid, schedule): async def update_crawl_schedule_or_scale(self, cid, schedule=None, scale=None):
""" Update the schedule for existing crawl config """ """ Update the schedule or scale for existing crawl config """
cron_jobs = await self.batch_beta_api.list_namespaced_cron_job( cron_jobs = await self.batch_beta_api.list_namespaced_cron_job(
namespace=self.namespace, label_selector=f"btrix.crawlconfig={cid}" namespace=self.namespace, label_selector=f"btrix.crawlconfig={cid}"
@ -254,16 +255,25 @@ class K8SManager:
cron_job = cron_jobs.items[0] cron_job = cron_jobs.items[0]
real_schedule = schedule or DEFAULT_NO_SCHEDULE updated = False
if real_schedule != cron_job.spec.schedule: if schedule is not None:
cron_job.spec.schedule = real_schedule real_schedule = schedule or DEFAULT_NO_SCHEDULE
cron_job.spec.suspend = not schedule
cron_job.spec.job_template.metadata.annotations[ if real_schedule != cron_job.spec.schedule:
"btrix.run.schedule" cron_job.spec.schedule = real_schedule
] = schedule cron_job.spec.suspend = not schedule
cron_job.spec.job_template.metadata.annotations[
"btrix.run.schedule"
] = schedule
updated = True
if scale is not None:
cron_job.spec.job_template.spec.parallelism = scale
updated = True
if updated:
await self.batch_beta_api.patch_namespaced_cron_job( await self.batch_beta_api.patch_namespaced_cron_job(
name=cron_job.metadata.name, namespace=self.namespace, body=cron_job name=cron_job.metadata.name, namespace=self.namespace, body=cron_job
) )
@ -397,17 +407,17 @@ class K8SManager:
if not status: if not status:
return None return None
crawl = self._make_crawl_for_job(job, status, False, CrawlOut)
pods = await self.core_api.list_namespaced_pod( pods = await self.core_api.list_namespaced_pod(
namespace=self.namespace, namespace=self.namespace,
label_selector=f"job-name={name},btrix.archive={aid}", label_selector=f"job-name={name},btrix.archive={aid}",
) )
crawl.watchIPs = [ watch_ips = [pod.status.pod_ip for pod in pods.items if pod.status.pod_ip]
pod.status.pod_ip for pod in pods.items if pod.status.pod_ip
] if status == "running" and not watch_ips:
return crawl status = "starting"
return self._make_crawl_for_job(job, status, False, CrawlOut, watch_ips)
# pylint: disable=broad-except # pylint: disable=broad-except
except Exception: except Exception:
@ -517,7 +527,9 @@ class K8SManager:
return None return None
# pylint: disable=no-self-use # pylint: disable=no-self-use
def _make_crawl_for_job(self, job, state, finish_now=False, crawl_cls=Crawl): def _make_crawl_for_job(
self, job, state, finish_now=False, crawl_cls=Crawl, watch_ips=None
):
""" Make a crawl object from a job""" """ Make a crawl object from a job"""
return crawl_cls( return crawl_cls(
id=job.metadata.name, id=job.metadata.name,
@ -529,10 +541,11 @@ class K8SManager:
# schedule=job.metadata.annotations.get("btrix.run.schedule", ""), # schedule=job.metadata.annotations.get("btrix.run.schedule", ""),
manual=job.metadata.annotations.get("btrix.run.manual") == "1", manual=job.metadata.annotations.get("btrix.run.manual") == "1",
started=job.status.start_time.replace(tzinfo=None), started=job.status.start_time.replace(tzinfo=None),
watchIPs=watch_ips or [],
colls=json.loads(job.metadata.annotations.get("btrix.colls", [])),
finished=datetime.datetime.utcnow().replace(microsecond=0, tzinfo=None) finished=datetime.datetime.utcnow().replace(microsecond=0, tzinfo=None)
if finish_now if finish_now
else None, else None,
colls=json.loads(job.metadata.annotations.get("btrix.colls", [])),
) )
async def _delete_job(self, name): async def _delete_job(self, name):
@ -669,6 +682,7 @@ class K8SManager:
storage_path, storage_path,
labels, labels,
annotations, annotations,
out_filename,
crawl_timeout, crawl_timeout,
parallel, parallel,
): ):
@ -731,7 +745,7 @@ class K8SManager:
{"name": "STORE_PATH", "value": storage_path}, {"name": "STORE_PATH", "value": storage_path},
{ {
"name": "STORE_FILENAME", "name": "STORE_FILENAME",
"value": "@ts-@hostname.wacz", "value": out_filename,
}, },
], ],
"resources": resources, "resources": resources,

View File

@ -14,6 +14,8 @@ import passlib.pwd
from fastapi import Request, Response, HTTPException, Depends, WebSocket from fastapi import Request, Response, HTTPException, Depends, WebSocket
from fastapi.security import OAuth2PasswordBearer from fastapi.security import OAuth2PasswordBearer
from pymongo.errors import DuplicateKeyError
from fastapi_users import FastAPIUsers, models, BaseUserManager from fastapi_users import FastAPIUsers, models, BaseUserManager
from fastapi_users.manager import UserAlreadyExists from fastapi_users.manager import UserAlreadyExists
from fastapi_users.authentication import ( from fastapi_users.authentication import (
@ -174,7 +176,7 @@ class UserManager(BaseUserManager[UserCreate, UserDB]):
print(f"Super user {email} created", flush=True) print(f"Super user {email} created", flush=True)
print(res, flush=True) print(res, flush=True)
except UserAlreadyExists: except (DuplicateKeyError, UserAlreadyExists):
print(f"User {email} already exists", flush=True) print(f"User {email} already exists", flush=True)
async def on_after_register_custom( async def on_after_register_custom(