Ensure Volumes are deleted when crawl is canceled (#828)
* operator: - ensures crawler pvcs are always deleted before crawl object is finalized (fixes #827) - refactor to ensure finalizer handler always run when finalizing - remove obsolete config entries
This commit is contained in:
		
							parent
							
								
									48d34bc3c4
								
							
						
					
					
						commit
						aae0e6590e
					
				@ -24,10 +24,6 @@ class CrawlManager(K8sAPI):
 | 
			
		||||
        self.job_image = os.environ["JOB_IMAGE"]
 | 
			
		||||
        self.job_image_pull_policy = os.environ.get("JOB_PULL_POLICY", "Always")
 | 
			
		||||
 | 
			
		||||
        self.no_delete_jobs = os.environ.get("NO_DELETE_JOBS", "0") != "0"
 | 
			
		||||
 | 
			
		||||
        self.crawler_node_type = os.environ.get("CRAWLER_NODE_TYPE", "")
 | 
			
		||||
 | 
			
		||||
        self.cron_namespace = os.environ.get("CRON_NAMESPACE", "default")
 | 
			
		||||
 | 
			
		||||
        self._default_storages = {}
 | 
			
		||||
 | 
			
		||||
@ -30,6 +30,7 @@ from .crawls import (
 | 
			
		||||
 | 
			
		||||
STS = "StatefulSet.apps/v1"
 | 
			
		||||
CMAP = "ConfigMap.v1"
 | 
			
		||||
PVC = "PersistentVolumeClaim.v1"
 | 
			
		||||
 | 
			
		||||
DEFAULT_TTL = 30
 | 
			
		||||
 | 
			
		||||
@ -140,18 +141,29 @@ class BtrixOperator(K8sAPI):
 | 
			
		||||
        scale = spec.get("scale", 1)
 | 
			
		||||
        status.scale = scale
 | 
			
		||||
 | 
			
		||||
        redis_url = self.get_redis_url(crawl_id)
 | 
			
		||||
 | 
			
		||||
        # if finalizing, crawl is being deleted
 | 
			
		||||
        if data.finalizing:
 | 
			
		||||
            # if not yet finished, assume it was canceled, mark as such
 | 
			
		||||
            if not status.finished:
 | 
			
		||||
                await self.cancel_crawl(redis_url, crawl_id, status, "canceled")
 | 
			
		||||
 | 
			
		||||
            return await self.finalize_crawl(crawl_id, status, data.related)
 | 
			
		||||
 | 
			
		||||
        if status.finished:
 | 
			
		||||
            return await self.handle_finished_delete_if_needed(crawl_id, status, spec)
 | 
			
		||||
 | 
			
		||||
        cid = spec["cid"]
 | 
			
		||||
 | 
			
		||||
        redis_url = self.get_redis_url(crawl_id)
 | 
			
		||||
 | 
			
		||||
        try:
 | 
			
		||||
            configmap = data.related[CMAP][f"crawl-config-{cid}"]["data"]
 | 
			
		||||
        # pylint: disable=bare-except, broad-except
 | 
			
		||||
        except:
 | 
			
		||||
            return await self.cancel_crawl(redis_url, crawl_id, status, "failed")
 | 
			
		||||
            # fail crawl if config somehow missing, shouldn't generally happen
 | 
			
		||||
            await self.cancel_crawl(redis_url, crawl_id, status, "failed")
 | 
			
		||||
 | 
			
		||||
            return self._done_response(status)
 | 
			
		||||
 | 
			
		||||
        crawl = CrawlSpec(
 | 
			
		||||
            id=crawl_id,
 | 
			
		||||
@ -165,10 +177,6 @@ class BtrixOperator(K8sAPI):
 | 
			
		||||
            expire_time=from_k8s_date(spec.get("expireTime")),
 | 
			
		||||
        )
 | 
			
		||||
 | 
			
		||||
        # if finalizing and not finished, job is being deleted, so assume crawl has been canceled
 | 
			
		||||
        if data.finalizing:
 | 
			
		||||
            return await self.cancel_crawl(redis_url, crawl_id, status, "canceled")
 | 
			
		||||
 | 
			
		||||
        crawl_sts = f"crawl-{crawl_id}"
 | 
			
		||||
        redis_sts = f"redis-{crawl_id}"
 | 
			
		||||
 | 
			
		||||
@ -179,7 +187,7 @@ class BtrixOperator(K8sAPI):
 | 
			
		||||
            status.state = "starting"
 | 
			
		||||
 | 
			
		||||
        if status.finished:
 | 
			
		||||
            return await self.handle_finished_delete_if_needed(crawl.id, status, spec)
 | 
			
		||||
            return await self.handle_finished_delete_if_needed(crawl_id, status, spec)
 | 
			
		||||
 | 
			
		||||
        params = {}
 | 
			
		||||
        params.update(self.shared_params)
 | 
			
		||||
@ -223,14 +231,20 @@ class BtrixOperator(K8sAPI):
 | 
			
		||||
    def get_related(self, data: MCBaseRequest):
 | 
			
		||||
        """return configmap related to crawl"""
 | 
			
		||||
        spec = data.parent.get("spec", {})
 | 
			
		||||
        cid = spec.get("cid")
 | 
			
		||||
        cid = spec["cid"]
 | 
			
		||||
        crawl_id = spec["id"]
 | 
			
		||||
        return {
 | 
			
		||||
            "relatedResources": [
 | 
			
		||||
                {
 | 
			
		||||
                    "apiVersion": "v1",
 | 
			
		||||
                    "resource": "configmaps",
 | 
			
		||||
                    "labelSelector": {"matchLabels": {"btrix.crawlconfig": cid}},
 | 
			
		||||
                }
 | 
			
		||||
                },
 | 
			
		||||
                {
 | 
			
		||||
                    "apiVersion": "v1",
 | 
			
		||||
                    "resource": "persistentvolumeclaims",
 | 
			
		||||
                    "labelSelector": {"matchLabels": {"crawl": crawl_id}},
 | 
			
		||||
                },
 | 
			
		||||
            ]
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
@ -241,17 +255,15 @@ class BtrixOperator(K8sAPI):
 | 
			
		||||
 | 
			
		||||
        ttl = spec.get("ttlSecondsAfterFinished", DEFAULT_TTL)
 | 
			
		||||
        finished = from_k8s_date(status.finished)
 | 
			
		||||
        if (dt_now() - finished).total_seconds() > ttl:
 | 
			
		||||
        if (dt_now() - finished).total_seconds() > ttl > 0:
 | 
			
		||||
            print("Job expired, deleting: " + crawl_id)
 | 
			
		||||
 | 
			
		||||
            asyncio.create_task(self.delete_crawl_job(crawl_id))
 | 
			
		||||
 | 
			
		||||
        return self._done_response(status)
 | 
			
		||||
 | 
			
		||||
    async def delete_crawl_job(self, crawl_id):
 | 
			
		||||
        # delete the crawljob itself
 | 
			
		||||
        await super().delete_crawl_job(crawl_id)
 | 
			
		||||
 | 
			
		||||
    async def delete_pvc(self, crawl_id):
 | 
			
		||||
        """delete all pvcs for crawl"""
 | 
			
		||||
        # until delete policy is supported in StatefulSet
 | 
			
		||||
        # now, delete pvcs explicitly
 | 
			
		||||
        # (don't want to make them children as already owned by sts)
 | 
			
		||||
@ -263,20 +275,34 @@ class BtrixOperator(K8sAPI):
 | 
			
		||||
        except Exception as exc:
 | 
			
		||||
            print("PVC Delete failed", exc, flush=True)
 | 
			
		||||
 | 
			
		||||
    # pylint: disable=too-many-arguments
 | 
			
		||||
    async def cancel_crawl(self, redis_url, crawl_id, status, state):
 | 
			
		||||
        """immediately cancel crawl with specified state"""
 | 
			
		||||
        redis = await self._get_redis(redis_url)
 | 
			
		||||
        await self.mark_finished(redis, crawl_id, status, state)
 | 
			
		||||
        return self._done_response(status)
 | 
			
		||||
 | 
			
		||||
    def _done_response(self, status):
 | 
			
		||||
        """response for when crawl job is done/to be deleted"""
 | 
			
		||||
    def _done_response(self, status, finalized=False):
 | 
			
		||||
        """done response for removing crawl"""
 | 
			
		||||
        return {
 | 
			
		||||
            "status": status.dict(exclude_none=True),
 | 
			
		||||
            "children": [],
 | 
			
		||||
            "finalized": True,
 | 
			
		||||
            "finalized": finalized,
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
    async def finalize_crawl(self, crawl_id, status, related):
 | 
			
		||||
        """ensure crawl id ready for deletion
 | 
			
		||||
        return with finalized state"""
 | 
			
		||||
 | 
			
		||||
        pvcs = list(related[PVC].keys())
 | 
			
		||||
        if pvcs:
 | 
			
		||||
            print("Deleting PVCs", pvcs)
 | 
			
		||||
            await self.delete_pvc(crawl_id)
 | 
			
		||||
            finalized = False
 | 
			
		||||
        else:
 | 
			
		||||
            finalized = True
 | 
			
		||||
 | 
			
		||||
        return self._done_response(status, finalized)
 | 
			
		||||
 | 
			
		||||
    async def _get_redis(self, redis_url):
 | 
			
		||||
        """init redis, ensure connectivity"""
 | 
			
		||||
        redis = None
 | 
			
		||||
@ -447,20 +473,29 @@ class BtrixOperator(K8sAPI):
 | 
			
		||||
    async def add_crawl_errors_to_db(self, redis, crawl_id, inc=100):
 | 
			
		||||
        """Pull crawl errors from redis and write to mongo db"""
 | 
			
		||||
        index = 0
 | 
			
		||||
        while True:
 | 
			
		||||
            skip = index * inc
 | 
			
		||||
            upper_bound = skip + inc - 1
 | 
			
		||||
            errors = await redis.lrange(f"{crawl_id}:e", skip, upper_bound)
 | 
			
		||||
            if not errors:
 | 
			
		||||
                break
 | 
			
		||||
        try:
 | 
			
		||||
            # ensure this only runs once
 | 
			
		||||
            if not await redis.setnx("errors-exported", "1"):
 | 
			
		||||
                return
 | 
			
		||||
 | 
			
		||||
            await add_crawl_errors(self.crawls, crawl_id, errors)
 | 
			
		||||
            while True:
 | 
			
		||||
                skip = index * inc
 | 
			
		||||
                upper_bound = skip + inc - 1
 | 
			
		||||
                errors = await redis.lrange(f"{crawl_id}:e", skip, upper_bound)
 | 
			
		||||
                if not errors:
 | 
			
		||||
                    break
 | 
			
		||||
 | 
			
		||||
            if len(errors) < inc:
 | 
			
		||||
                # If we have fewer than inc errors, we can assume this is the
 | 
			
		||||
                # last page of data to add.
 | 
			
		||||
                break
 | 
			
		||||
            index += 1
 | 
			
		||||
                await add_crawl_errors(self.crawls, crawl_id, errors)
 | 
			
		||||
 | 
			
		||||
                if len(errors) < inc:
 | 
			
		||||
                    # If we have fewer than inc errors, we can assume this is the
 | 
			
		||||
                    # last page of data to add.
 | 
			
		||||
                    break
 | 
			
		||||
                index += 1
 | 
			
		||||
        # likely redis has already been deleted, so nothing to do
 | 
			
		||||
        # pylint: disable=bare-except
 | 
			
		||||
        except:
 | 
			
		||||
            return
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
# ============================================================================
 | 
			
		||||
 | 
			
		||||
@ -38,14 +38,10 @@ data:
 | 
			
		||||
  CRAWLER_PV_CLAIM: "{{ .Values.crawler_pv_claim }}"
 | 
			
		||||
  {{- end }}
 | 
			
		||||
 | 
			
		||||
  CRAWLER_NODE_TYPE: "{{ .Values.crawler_node_type }}"
 | 
			
		||||
 | 
			
		||||
  REDIS_URL: "{{ .Values.redis_url }}"
 | 
			
		||||
 | 
			
		||||
  REDIS_CRAWLS_DONE_KEY: "crawls-done"
 | 
			
		||||
 | 
			
		||||
  NO_DELETE_JOBS: "{{ .Values.no_delete_jobs | default 0 }}"
 | 
			
		||||
 | 
			
		||||
  GRACE_PERIOD_SECS: "{{ .Values.grace_period_secs | default 600 }}"
 | 
			
		||||
 | 
			
		||||
  REGISTRATION_ENABLED: "{{ .Values.registration_enabled | default 0 }}"
 | 
			
		||||
 | 
			
		||||
@ -180,11 +180,6 @@ crawler_liveness_port: 6065
 | 
			
		||||
grace_period: 1000
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
# debug
 | 
			
		||||
no_delete_jobs: 0
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
# Local Minio Pod (optional)
 | 
			
		||||
# =========================================
 | 
			
		||||
# set to true to use a local minio image
 | 
			
		||||
 | 
			
		||||
		Loading…
	
		Reference in New Issue
	
	Block a user