crawljob:

- support run once on existing crawl job
- support updating/patching existing crawl job with new crawl config, new schedule and run once
This commit is contained in:
Ilya Kreymer 2021-08-21 22:10:31 -07:00
parent ea9010bf9a
commit a8255a76b2
2 changed files with 171 additions and 29 deletions

View File

@ -138,6 +138,21 @@ class CrawlOps:
) )
return result return result
async def update_crawl_config(
self, cid: str, config: CrawlConfigIn, archive: Archive, user: User
):
""" Update existing crawl config"""
data = config.dict()
data["archive"] = archive.id
data["user"] = user.id
data["_id"] = cid
await self.crawl_configs.find_one_and_replace({"_id": cid}, data)
crawlconfig = CrawlConfig.from_dict(data)
await self.crawl_manager.update_crawl_config(crawlconfig)
async def get_crawl_configs(self, archive: Archive): async def get_crawl_configs(self, archive: Archive):
"""Get all crawl configs for an archive is a member of""" """Get all crawl configs for an archive is a member of"""
cursor = self.crawl_configs.find({"archive": archive.id}) cursor = self.crawl_configs.find({"archive": archive.id})
@ -205,6 +220,50 @@ def init_crawl_config_api(mdb, user_dep, archive_ops, crawl_manager):
res = await ops.add_crawl_config(config, archive, user) res = await ops.add_crawl_config(config, archive, user)
return {"added": str(res.inserted_id)} return {"added": str(res.inserted_id)}
@router.patch("/{cid}")
async def update_crawl_config(
config: CrawlConfigIn,
cid: str,
archive: Archive = Depends(archive_dep),
user: User = Depends(user_dep),
):
if not archive.is_crawler(user):
raise HTTPException(
status_code=403, detail="User does not have permission to modify crawls"
)
try:
await ops.update_crawl_config(cid, config, archive, user)
except Exception as e:
# pylint: disable=raise-missing-from
raise HTTPException(
status_code=403, detail=f"Error updating crawl config: {e}"
)
return {"updated": cid}
@router.post("/{cid}/run")
async def run_now(
cid: str,
archive: Archive = Depends(archive_dep),
user: User = Depends(user_dep),
):
if not archive.is_crawler(user):
raise HTTPException(
status_code=403, detail="User does not have permission to modify crawls"
)
crawl_id = None
try:
crawl_id = await crawl_manager.run_crawl_config(cid)
except Exception as e:
# pylint: disable=raise-missing-from
raise HTTPException(status_code=500, detail=f"Error starting crawl: {e}")
return {"started": crawl_id}
@router.delete("") @router.delete("")
async def delete_crawl_configs( async def delete_crawl_configs(
archive: Archive = Depends(archive_dep), user: User = Depends(user_dep) archive: Archive = Depends(archive_dep), user: User = Depends(user_dep)
@ -217,16 +276,18 @@ def init_crawl_config_api(mdb, user_dep, archive_ops, crawl_manager):
result = await ops.delete_crawl_configs(archive) result = await ops.delete_crawl_configs(archive)
return {"deleted": result.deleted_count} return {"deleted": result.deleted_count}
@router.delete("/{id}") @router.delete("/{cid}")
async def delete_crawl_config( async def delete_crawl_config(
id: str, archive: Archive = Depends(archive_dep), user: User = Depends(user_dep) cid: str,
archive: Archive = Depends(archive_dep),
user: User = Depends(user_dep),
): ):
if not archive.is_crawler(user): if not archive.is_crawler(user):
raise HTTPException( raise HTTPException(
status_code=403, detail="User does not have permission to modify crawls" status_code=403, detail="User does not have permission to modify crawls"
) )
result = await ops.delete_crawl_config(id, archive) result = await ops.delete_crawl_config(cid, archive)
if not result or not result.deleted_count: if not result or not result.deleted_count:
raise HTTPException(status_code=404, detail="Crawl Config Not Found") raise HTTPException(status_code=404, detail="Crawl Config Not Found")

View File

@ -78,19 +78,10 @@ class K8SManager:
"btrix.crawlconfig": cid, "btrix.crawlconfig": cid,
} }
extra_crawl_params = extra_crawl_params or []
# Create Config Map # Create Config Map
config_map = client.V1ConfigMap( config_map = self._create_config_map(crawlconfig, labels)
metadata={
"name": f"crawl-config-{cid}",
"namespace": self.namespace,
"labels": labels,
},
data={"crawl-config.json": json.dumps(crawlconfig.config.dict())},
)
api_response = await self.core_api.create_namespaced_config_map( await self.core_api.create_namespaced_config_map(
namespace=self.namespace, body=config_map namespace=self.namespace, body=config_map
) )
@ -115,21 +106,15 @@ class K8SManager:
}, },
) )
api_response = await self.core_api.create_namespaced_secret( await self.core_api.create_namespaced_secret(
namespace=self.namespace, body=crawl_secret namespace=self.namespace, body=crawl_secret
) )
# Create Cron Job # Create Cron Job
suspend = False
schedule = crawlconfig.schedule
if not schedule: suspend, schedule, run_now = self._get_schedule_suspend_run_now(crawlconfig)
schedule = DEFAULT_NO_SCHEDULE
suspend = True
run_now = False extra_crawl_params = extra_crawl_params or []
if crawlconfig.runNow:
run_now = True
job_template = self._get_job_template(cid, labels, extra_crawl_params) job_template = self._get_job_template(cid, labels, extra_crawl_params)
@ -151,15 +136,99 @@ class K8SManager:
spec=spec, spec=spec,
) )
api_response = await self.batch_beta_api.create_namespaced_cron_job( cron_job = await self.batch_beta_api.create_namespaced_cron_job(
namespace=self.namespace, body=cron_job namespace=self.namespace, body=cron_job
) )
# Run Job Now # Run Job Now
if run_now: if run_now:
await self._create_run_now_job(api_response, labels) await self._create_run_now_job(cron_job)
return api_response return cron_job
async def update_crawl_config(self, crawlconfig):
""" Update existing crawl config """
cid = crawlconfig.id
cron_jobs = await self.batch_beta_api.list_namespaced_cron_job(
namespace=self.namespace, label_selector=f"btrix.crawlconfig={cid}"
)
if len(cron_jobs.items) != 1:
return
cron_job = cron_jobs.items[0]
if crawlconfig.archive != cron_job.metadata.labels["btrix.archive"]:
print("wrong archive")
return
labels = {
"btrix.user": cron_job.metadata.labels["btrix.user"],
"btrix.archive": crawlconfig.archive,
"btrix.crawlconfig": cid,
}
# Update Config Map
config_map = self._create_config_map(crawlconfig, labels)
await self.core_api.patch_namespaced_config_map(
name=f"crawl-config-{cid}", namespace=self.namespace, body=config_map
)
# Update CronJob, if needed
suspend, schedule, run_now = self._get_schedule_suspend_run_now(crawlconfig)
changed = False
if schedule != cron_job.spec.schedule:
cron_job.spec.schedule = schedule
changed = True
if suspend != cron_job.spec.suspend:
cron_job.spec.suspend = suspend
changed = True
if changed:
await self.batch_beta_api.patch_namespaced_cron_job(
name=cron_job.metadata.name, namespace=self.namespace, body=cron_job
)
# Run Job Now
if run_now:
await self._create_run_now_job(cron_job)
def _create_config_map(self, crawlconfig, labels):
""" Create Config Map based on CrawlConfig + labels """
config_map = client.V1ConfigMap(
metadata={
"name": f"crawl-config-{crawlconfig.id}",
"namespace": self.namespace,
"labels": labels,
},
data={"crawl-config.json": json.dumps(crawlconfig.config.dict())},
)
return config_map
#pylint: disable=no-self-use
def _get_schedule_suspend_run_now(self, crawlconfig):
""" get schedule/suspend/run_now data based on crawlconfig """
# Create Cron Job
suspend = False
schedule = crawlconfig.schedule
if not schedule:
schedule = DEFAULT_NO_SCHEDULE
suspend = True
run_now = False
if crawlconfig.runNow:
run_now = True
return suspend, schedule, run_now
async def delete_crawl_configs_for_archive(self, archive): async def delete_crawl_configs_for_archive(self, archive):
"""Delete all crawl configs for given archive""" """Delete all crawl configs for given archive"""
@ -190,7 +259,16 @@ class K8SManager:
propagation_policy="Foreground", propagation_policy="Foreground",
) )
async def _create_run_now_job(self, cron_job, labels): async def run_crawl_config(self, cid):
""" Run crawl job for cron job based on specified crawlconfig id (cid) """
cron_jobs = await self.batch_beta_api.list_namespaced_cron_job(
namespace=self.namespace, label_selector=f"btrix.crawlconfig={cid}"
)
res = await self._create_run_now_job(cron_jobs.items[0])
return res.metadata.name
async def _create_run_now_job(self, cron_job):
"""Create new job from cron job to run instantly""" """Create new job from cron job to run instantly"""
annotations = {} annotations = {}
annotations["cronjob.kubernetes.io/instantiate"] = "manual" annotations["cronjob.kubernetes.io/instantiate"] = "manual"
@ -204,10 +282,13 @@ class K8SManager:
api_version="batch/v1beta1", api_version="batch/v1beta1",
) )
ts_now = datetime.datetime.utcnow().strftime("%Y%m%d%H%M%S")
name = f"crawl-now-{ts_now}-{cron_job.metadata.labels['btrix.crawlconfig']}"
object_meta = client.V1ObjectMeta( object_meta = client.V1ObjectMeta(
name=cron_job.metadata.name + "-run-now", name=name,
annotations=annotations, annotations=annotations,
labels=labels, labels=cron_job.metadata.labels,
owner_references=[owner_ref], owner_references=[owner_ref],
) )