From 793611e5bb7af969629a8786e156490dd554df83 Mon Sep 17 00:00:00 2001 From: Ilya Kreymer Date: Sat, 12 Nov 2022 17:24:30 -0800 Subject: [PATCH] add exclusion api, fixes #311 (#349) * add exclusion api, fixes #311 add new apis: `POST crawls/{crawl_id}/exclusion?regex=...` and `DELETE crawls/{crawl_id}/exclusion?regex=...` which will: - create new config with add 'regex' as exclusion (deleting or making inactive previous config) OR remove as exclusion. - update crawl to point to new config - update statefulset to point to new config, causing crawler pods to restart - filter out urls matching 'regex' from both queue and seen list (currently a bit slow) (when adding only) - return 400 if exclusion already existing when adding, or doesn't exist when removing - api reads redis list in reverse to match how exclusion queue is used --- backend/btrixcloud/crawl_job.py | 8 + backend/btrixcloud/crawlconfigs.py | 69 +++++++- backend/btrixcloud/crawlmanager.py | 5 + backend/btrixcloud/crawls.py | 158 +++++++++++++++--- backend/btrixcloud/k8s/crawl_job.py | 24 +++ .../btrixcloud/k8s/templates/crawl_job.yaml | 2 +- backend/btrixcloud/k8s/templates/crawler.yaml | 15 +- backend/btrixcloud/swarm/crawl_job.py | 3 + 8 files changed, 240 insertions(+), 44 deletions(-) diff --git a/backend/btrixcloud/crawl_job.py b/backend/btrixcloud/crawl_job.py index 502ce3ab..9ef439cf 100644 --- a/backend/btrixcloud/crawl_job.py +++ b/backend/btrixcloud/crawl_job.py @@ -394,6 +394,10 @@ class CrawlJob(ABC): async def healthz(): return {} + @app.post("/change_config/{cid}") + async def change_config(cid: str): + return await self._change_crawl_config(cid) + @abstractmethod async def init_job_objects(self, template, params): """base for creating objects""" @@ -414,6 +418,10 @@ class CrawlJob(ABC): async def _send_shutdown_signal(self, signame): """gracefully shutdown crawl""" + @abstractmethod + async def _change_crawl_config(self, cid): + """change crawl config for this crawl""" + @property @abstractmethod def redis_url(self): diff --git a/backend/btrixcloud/crawlconfigs.py b/backend/btrixcloud/crawlconfigs.py index ce579142..03c446be 100644 --- a/backend/btrixcloud/crawlconfigs.py +++ b/backend/btrixcloud/crawlconfigs.py @@ -54,8 +54,8 @@ class RawCrawlConfig(BaseModel): scopeType: Optional[ScopeType] = ScopeType.PREFIX - include: Union[str, List[str], None] - exclude: Union[str, List[str], None] + include: Union[str, List[str], None] = None + exclude: Union[str, List[str], None] = None depth: Optional[int] = -1 limit: Optional[int] = 0 @@ -215,7 +215,11 @@ class CrawlConfigOps: return self._file_rx.sub("-", string.lower()) async def add_crawl_config( - self, config: CrawlConfigIn, archive: Archive, user: User + self, + config: CrawlConfigIn, + archive: Archive, + user: User, + for_running_crawl=False, ): """Add new crawl config""" data = config.dict() @@ -224,6 +228,9 @@ class CrawlConfigOps: data["_id"] = uuid.uuid4() data["created"] = datetime.utcnow().replace(microsecond=0, tzinfo=None) + if for_running_crawl: + data["crawlAttemptCount"] = 1 + profile_filename = None if config.profileid: profile_filename = await self.profiles.get_profile_storage_path( @@ -244,7 +251,9 @@ class CrawlConfigOps: async with await self.dbclient.start_session() as sesh: async with sesh.start_transaction(): if ( - await self.make_inactive_or_delete(old_config, data["_id"]) + await self.make_inactive_or_delete( + old_config, data["_id"], for_running_crawl=for_running_crawl + ) == "deleted" ): data["oldId"] = old_config.oldId @@ -365,7 +374,6 @@ class CrawlConfigOps: for res in results: config = CrawlConfigOut.from_dict(res) # pylint: disable=invalid-name - print("config", config.id, flush=True) config.currCrawlId = running.get(config.id) configs.append(config) @@ -440,7 +448,10 @@ class CrawlConfigOps: return config_cls.from_dict(res) async def make_inactive_or_delete( - self, crawlconfig: CrawlConfig, new_id: uuid.UUID = None + self, + crawlconfig: CrawlConfig, + new_id: uuid.UUID = None, + for_running_crawl=False, ): """Make config inactive if crawls exist, otherwise move to inactive list""" @@ -449,14 +460,21 @@ class CrawlConfigOps: if new_id: crawlconfig.newId = query["newId"] = new_id - if await self.get_running_crawl(crawlconfig): + is_running = await self.get_running_crawl(crawlconfig) is not None + + if is_running != for_running_crawl: raise HTTPException(status_code=400, detail="crawl_running_cant_deactivate") # set to either "deleted" or "deactivated" status = None + other_crawl_count = crawlconfig.crawlAttemptCount + # don't count current crawl, if for running crawl + if for_running_crawl: + other_crawl_count -= 1 + # if no crawls have been run, actually delete - if not crawlconfig.crawlAttemptCount and not crawlconfig.crawlCount: + if not other_crawl_count: result = await self.crawl_configs.delete_one( {"_id": crawlconfig.id, "aid": crawlconfig.aid} ) @@ -495,6 +513,41 @@ class CrawlConfigOps: return {"success": True, "status": status} + async def copy_add_remove_exclusion(self, regex, cid, archive, user, add=True): + """create a copy of existing crawl config, with added exclusion regex""" + # get crawl config + crawl_config = await self.get_crawl_config(cid, archive, active_only=False) + + # update exclusion + exclude = crawl_config.config.exclude or [] + if isinstance(exclude, str): + exclude = [exclude] + + if add: + if regex in exclude: + raise HTTPException(status_code=400, detail="exclusion_already_exists") + + exclude.append(regex) + else: + if regex not in exclude: + raise HTTPException(status_code=400, detail="exclusion_not_found") + + exclude.remove(regex) + + crawl_config.config.exclude = exclude + + # create new config + new_config = CrawlConfigIn(**crawl_config.serialize()) + + # pylint: disable=invalid-name + new_config.oldId = crawl_config.id + + result, _ = await self.add_crawl_config( + new_config, archive, user, for_running_crawl=True + ) + + return result.inserted_id + # ============================================================================ # pylint: disable=redefined-builtin,invalid-name,too-many-locals,too-many-arguments diff --git a/backend/btrixcloud/crawlmanager.py b/backend/btrixcloud/crawlmanager.py index 47707519..2e1983d7 100644 --- a/backend/btrixcloud/crawlmanager.py +++ b/backend/btrixcloud/crawlmanager.py @@ -145,6 +145,11 @@ class BaseCrawlManager(ABC): return await self._post_to_job(crawl_id, aid, f"/scale/{scale}") + async def change_crawl_config(self, crawl_id, aid, new_cid): + """Change crawl config and restart""" + + return await self._post_to_job(crawl_id, aid, f"/change_config/{new_cid}") + async def delete_crawl_configs_for_archive(self, archive): """Delete all crawl configs for given archive""" return await self._delete_crawl_configs(f"btrix.archive={archive}") diff --git a/backend/btrixcloud/crawls.py b/backend/btrixcloud/crawls.py index d8e0f74a..d4826f8b 100644 --- a/backend/btrixcloud/crawls.py +++ b/backend/btrixcloud/crawls.py @@ -94,8 +94,6 @@ class CrawlOut(Crawl): configName: Optional[str] resources: Optional[List[CrawlFileOut]] = [] - watchIPs: Optional[List[str]] = [] - # ============================================================================ class ListCrawlOut(BaseMongoModel): @@ -264,9 +262,6 @@ class CrawlOps: crawl = CrawlOut.from_dict(res) - # pylint: disable=invalid-name - crawl.watchIPs = [str(i) for i in range(crawl.scale)] - return await self._resolve_crawl_refs(crawl, archive) async def _resolve_crawl_refs( @@ -288,6 +283,7 @@ class CrawlOps: async def _resolve_signed_urls(self, files, archive: Archive): if not files: + print("no files") return delta = timedelta(seconds=self.presign_duration) @@ -328,6 +324,8 @@ class CrawlOps: if updates: asyncio.create_task(self._update_presigned(updates)) + print("presigned", out_files) + return out_files async def _update_presigned(self, updates): @@ -417,13 +415,10 @@ class CrawlOps: redis = None try: - redis = await aioredis.from_url( - self.get_redis_url(crawl_id), encoding="utf-8", decode_responses=True - ) - + redis = await self.get_redis(crawl_id) total = await redis.llen(f"{crawl_id}:q") - results = await redis.lrange(f"{crawl_id}:q", offset, count) - results = [json.loads(result)["url"] for result in results] + results = await redis.lrange(f"{crawl_id}:q", -offset - count, -offset - 1) + results = [json.loads(result)["url"] for result in reversed(results)] except exceptions.ConnectionError: # can't connect to redis, likely not initialized yet pass @@ -435,39 +430,130 @@ class CrawlOps: return {"total": total, "results": results, "matched": matched} - async def match_crawl_queue(self, crawl_id, regex): - """get crawl queue""" + async def iter_crawl_queue(self, regex, redis, crawl_id, total, step=50): + """iterate over urls that match regex in crawl queue list""" + async def match_crawl_queue(self, crawl_id, regex): + """get list of urls that match regex""" total = 0 + redis = None try: - redis = await aioredis.from_url( - self.get_redis_url(crawl_id), encoding="utf-8", decode_responses=True - ) - + redis = await self.get_redis(crawl_id) total = await redis.llen(f"{crawl_id}:q") except exceptions.ConnectionError: # can't connect to redis, likely not initialized yet pass - matched = [] regex = re.compile(regex) - + matched = [] step = 50 for count in range(0, total, step): - results = await redis.lrange(f"{crawl_id}:q", count, count + step) - for result in results: + results = await redis.lrange(f"{crawl_id}:q", -count - step, -count - 1) + for result in reversed(results): url = json.loads(result)["url"] if regex.search(url): matched.append(url) return {"total": total, "matched": matched} - def get_redis_url(self, crawl_id): + async def filter_crawl_queue(self, crawl_id, regex): + """filter out urls that match regex""" + total = 0 + redis = None + + q_key = f"{crawl_id}:q" + s_key = f"{crawl_id}:s" + + try: + redis = await self.get_redis(crawl_id) + total = await redis.llen(q_key) + except exceptions.ConnectionError: + # can't connect to redis, likely not initialized yet + pass + + dircount = -1 + regex = re.compile(regex) + step = 50 + + count = 0 + num_removed = 0 + + # pylint: disable=fixme + # todo: do this in a more efficient way? + # currently quite inefficient as redis does not have a way + # to atomically check and remove value from list + # so removing each jsob block by value + while count < total: + if dircount == -1 and count > total / 2: + dircount = 1 + results = await redis.lrange(q_key, -count - step, -count - 1) + count += step + for result in reversed(results): + url = json.loads(result)["url"] + if regex.search(url): + await redis.srem(s_key, url) + res = await redis.lrem(q_key, dircount, result) + if res: + count -= res + num_removed += res + print(f"Removed {result}: {res}", flush=True) + + return num_removed + + async def get_redis(self, crawl_id): """get redis url for crawl id""" # pylint: disable=line-too-long - return f"redis://redis-{crawl_id}-0.redis-{crawl_id}.{self.namespace}.svc.cluster.local/0" + redis_url = f"redis://redis-{crawl_id}-0.redis-{crawl_id}.{self.namespace}.svc.cluster.local/0" + + return await aioredis.from_url( + redis_url, encoding="utf-8", decode_responses=True + ) + + async def add_exclusion(self, crawl_id, regex, archive, user): + """create new config with additional exclusion, copying existing config""" + + raw = await self.get_crawl_raw(crawl_id, archive) + + cid = raw.get("cid") + + new_cid = await self.crawl_configs.copy_add_remove_exclusion( + regex, cid, archive, user, add=True + ) + + await self.crawls.find_one_and_update( + {"_id": crawl_id}, {"$set": {"cid": new_cid}} + ) + + # restart crawl pods + change_c = self.crawl_manager.change_crawl_config(crawl_id, archive.id, new_cid) + + filter_q = self.filter_crawl_queue(crawl_id, regex) + + _, num_removed = await asyncio.gather(change_c, filter_q) + + return {"new_cid": new_cid, "num_removed": num_removed} + + async def remove_exclusion(self, crawl_id, regex, archive, user): + """create new config with exclusion removed, copying existing config""" + + raw = await self.get_crawl_raw(crawl_id, archive) + + cid = raw.get("cid") + + new_cid = await self.crawl_configs.copy_add_remove_exclusion( + regex, cid, archive, user, add=False + ) + + await self.crawls.find_one_and_update( + {"_id": crawl_id}, {"$set": {"cid": new_cid}} + ) + + # restart crawl pods + await self.crawl_manager.change_crawl_config(crawl_id, archive.id, new_cid) + + return {"new_cid": new_cid} # ============================================================================ @@ -595,6 +681,32 @@ def init_crawls_api( return await ops.match_crawl_queue(crawl_id, regex) + @app.post( + "/archives/{aid}/crawls/{crawl_id}/exclusions", + tags=["crawls"], + ) + async def add_exclusion( + crawl_id, + regex: str, + archive: Archive = Depends(archive_crawl_dep), + user: User = Depends(user_dep), + ): + + return await ops.add_exclusion(crawl_id, regex, archive, user) + + @app.delete( + "/archives/{aid}/crawls/{crawl_id}/exclusions", + tags=["crawls"], + ) + async def remove_exclusion( + crawl_id, + regex: str, + archive: Archive = Depends(archive_crawl_dep), + user: User = Depends(user_dep), + ): + + return await ops.remove_exclusion(crawl_id, regex, archive, user) + return ops diff --git a/backend/btrixcloud/k8s/crawl_job.py b/backend/btrixcloud/k8s/crawl_job.py index b1a6b491..6acd2874 100644 --- a/backend/btrixcloud/k8s/crawl_job.py +++ b/backend/btrixcloud/k8s/crawl_job.py @@ -45,6 +45,30 @@ class K8SCrawlJob(K8SJobMixin, CrawlJob): return await super().load_initial_scale() + async def _change_crawl_config(self, cid): + """patch existing crawl statefulset to use new crawlconfig id + this will cause the crawl to restart with new config""" + patch_config = { + "spec": { + "template": { + "spec": { + "volumes": [ + { + "name": "crawl-config", + "configMap": {"name": f"crawl-config-{cid}"}, + } + ] + } + } + } + } + + await self.apps_api.patch_namespaced_stateful_set( + name=f"crawl-{self.job_id}", namespace=self.namespace, body=patch_config + ) + + return {"success": True} + async def _get_crawl(self): try: return await self.apps_api.read_namespaced_stateful_set( diff --git a/backend/btrixcloud/k8s/templates/crawl_job.yaml b/backend/btrixcloud/k8s/templates/crawl_job.yaml index 53669c59..b51c015d 100644 --- a/backend/btrixcloud/k8s/templates/crawl_job.yaml +++ b/backend/btrixcloud/k8s/templates/crawl_job.yaml @@ -80,7 +80,7 @@ spec: valueFrom: configMapKeyRef: name: crawl-config-{{ cid }} - key: STORE_PATH + key: STORE_FILENAME - name: STORAGE_NAME valueFrom: diff --git a/backend/btrixcloud/k8s/templates/crawler.yaml b/backend/btrixcloud/k8s/templates/crawler.yaml index 398ae0d0..a508e4c9 100644 --- a/backend/btrixcloud/k8s/templates/crawler.yaml +++ b/backend/btrixcloud/k8s/templates/crawler.yaml @@ -292,22 +292,13 @@ spec: value: {{ redis_url }}/crawls-done - name: STORE_PATH - valueFrom: - configMapKeyRef: - name: crawl-config-{{ cid }} - key: STORE_PATH + value: {{ env.STORE_PATH }} - name: STORE_FILENAME - valueFrom: - configMapKeyRef: - name: crawl-config-{{ cid }} - key: STORE_FILENAME + value: {{ env.STORAGE_FILENAME }} - name: STORE_USER - valueFrom: - configMapKeyRef: - name: crawl-config-{{ cid }} - key: USER_ID + value: {{ env.USER_ID }} resources: limits: diff --git a/backend/btrixcloud/swarm/crawl_job.py b/backend/btrixcloud/swarm/crawl_job.py index bbafe6eb..19d82e87 100644 --- a/backend/btrixcloud/swarm/crawl_job.py +++ b/backend/btrixcloud/swarm/crawl_job.py @@ -101,6 +101,9 @@ class SwarmCrawlJob(SwarmJobMixin, CrawlJob): # likely fails as containers still shutting down # await loop.run_in_executor(None, delete_volumes, volumes) + async def _change_crawl_config(self, cid): + raise NotImplementedError("Not Supported") + # ============================================================================ @app.on_event("startup")