add exclusion api, fixes #311 (#349)

* add exclusion api, fixes #311
add new apis: `POST crawls/{crawl_id}/exclusion?regex=...` and `DELETE crawls/{crawl_id}/exclusion?regex=...` which will:
- create new config with add 'regex' as exclusion (deleting or making inactive previous config) OR remove as exclusion.
- update crawl to point to new config
- update statefulset to point to new config, causing crawler pods to restart
- filter out urls matching 'regex' from both queue and seen list (currently a bit slow) (when adding only)
- return 400 if exclusion already existing when adding, or doesn't exist when removing
- api reads redis list in reverse to match how exclusion queue is used
This commit is contained in:
Ilya Kreymer 2022-11-12 17:24:30 -08:00 committed by GitHub
parent 95ec1599ef
commit 793611e5bb
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 240 additions and 44 deletions

View File

@ -394,6 +394,10 @@ class CrawlJob(ABC):
async def healthz():
return {}
@app.post("/change_config/{cid}")
async def change_config(cid: str):
return await self._change_crawl_config(cid)
@abstractmethod
async def init_job_objects(self, template, params):
"""base for creating objects"""
@ -414,6 +418,10 @@ class CrawlJob(ABC):
async def _send_shutdown_signal(self, signame):
"""gracefully shutdown crawl"""
@abstractmethod
async def _change_crawl_config(self, cid):
"""change crawl config for this crawl"""
@property
@abstractmethod
def redis_url(self):

View File

@ -54,8 +54,8 @@ class RawCrawlConfig(BaseModel):
scopeType: Optional[ScopeType] = ScopeType.PREFIX
include: Union[str, List[str], None]
exclude: Union[str, List[str], None]
include: Union[str, List[str], None] = None
exclude: Union[str, List[str], None] = None
depth: Optional[int] = -1
limit: Optional[int] = 0
@ -215,7 +215,11 @@ class CrawlConfigOps:
return self._file_rx.sub("-", string.lower())
async def add_crawl_config(
self, config: CrawlConfigIn, archive: Archive, user: User
self,
config: CrawlConfigIn,
archive: Archive,
user: User,
for_running_crawl=False,
):
"""Add new crawl config"""
data = config.dict()
@ -224,6 +228,9 @@ class CrawlConfigOps:
data["_id"] = uuid.uuid4()
data["created"] = datetime.utcnow().replace(microsecond=0, tzinfo=None)
if for_running_crawl:
data["crawlAttemptCount"] = 1
profile_filename = None
if config.profileid:
profile_filename = await self.profiles.get_profile_storage_path(
@ -244,7 +251,9 @@ class CrawlConfigOps:
async with await self.dbclient.start_session() as sesh:
async with sesh.start_transaction():
if (
await self.make_inactive_or_delete(old_config, data["_id"])
await self.make_inactive_or_delete(
old_config, data["_id"], for_running_crawl=for_running_crawl
)
== "deleted"
):
data["oldId"] = old_config.oldId
@ -365,7 +374,6 @@ class CrawlConfigOps:
for res in results:
config = CrawlConfigOut.from_dict(res)
# pylint: disable=invalid-name
print("config", config.id, flush=True)
config.currCrawlId = running.get(config.id)
configs.append(config)
@ -440,7 +448,10 @@ class CrawlConfigOps:
return config_cls.from_dict(res)
async def make_inactive_or_delete(
self, crawlconfig: CrawlConfig, new_id: uuid.UUID = None
self,
crawlconfig: CrawlConfig,
new_id: uuid.UUID = None,
for_running_crawl=False,
):
"""Make config inactive if crawls exist, otherwise move to inactive list"""
@ -449,14 +460,21 @@ class CrawlConfigOps:
if new_id:
crawlconfig.newId = query["newId"] = new_id
if await self.get_running_crawl(crawlconfig):
is_running = await self.get_running_crawl(crawlconfig) is not None
if is_running != for_running_crawl:
raise HTTPException(status_code=400, detail="crawl_running_cant_deactivate")
# set to either "deleted" or "deactivated"
status = None
other_crawl_count = crawlconfig.crawlAttemptCount
# don't count current crawl, if for running crawl
if for_running_crawl:
other_crawl_count -= 1
# if no crawls have been run, actually delete
if not crawlconfig.crawlAttemptCount and not crawlconfig.crawlCount:
if not other_crawl_count:
result = await self.crawl_configs.delete_one(
{"_id": crawlconfig.id, "aid": crawlconfig.aid}
)
@ -495,6 +513,41 @@ class CrawlConfigOps:
return {"success": True, "status": status}
async def copy_add_remove_exclusion(self, regex, cid, archive, user, add=True):
"""create a copy of existing crawl config, with added exclusion regex"""
# get crawl config
crawl_config = await self.get_crawl_config(cid, archive, active_only=False)
# update exclusion
exclude = crawl_config.config.exclude or []
if isinstance(exclude, str):
exclude = [exclude]
if add:
if regex in exclude:
raise HTTPException(status_code=400, detail="exclusion_already_exists")
exclude.append(regex)
else:
if regex not in exclude:
raise HTTPException(status_code=400, detail="exclusion_not_found")
exclude.remove(regex)
crawl_config.config.exclude = exclude
# create new config
new_config = CrawlConfigIn(**crawl_config.serialize())
# pylint: disable=invalid-name
new_config.oldId = crawl_config.id
result, _ = await self.add_crawl_config(
new_config, archive, user, for_running_crawl=True
)
return result.inserted_id
# ============================================================================
# pylint: disable=redefined-builtin,invalid-name,too-many-locals,too-many-arguments

View File

@ -145,6 +145,11 @@ class BaseCrawlManager(ABC):
return await self._post_to_job(crawl_id, aid, f"/scale/{scale}")
async def change_crawl_config(self, crawl_id, aid, new_cid):
"""Change crawl config and restart"""
return await self._post_to_job(crawl_id, aid, f"/change_config/{new_cid}")
async def delete_crawl_configs_for_archive(self, archive):
"""Delete all crawl configs for given archive"""
return await self._delete_crawl_configs(f"btrix.archive={archive}")

View File

@ -94,8 +94,6 @@ class CrawlOut(Crawl):
configName: Optional[str]
resources: Optional[List[CrawlFileOut]] = []
watchIPs: Optional[List[str]] = []
# ============================================================================
class ListCrawlOut(BaseMongoModel):
@ -264,9 +262,6 @@ class CrawlOps:
crawl = CrawlOut.from_dict(res)
# pylint: disable=invalid-name
crawl.watchIPs = [str(i) for i in range(crawl.scale)]
return await self._resolve_crawl_refs(crawl, archive)
async def _resolve_crawl_refs(
@ -288,6 +283,7 @@ class CrawlOps:
async def _resolve_signed_urls(self, files, archive: Archive):
if not files:
print("no files")
return
delta = timedelta(seconds=self.presign_duration)
@ -328,6 +324,8 @@ class CrawlOps:
if updates:
asyncio.create_task(self._update_presigned(updates))
print("presigned", out_files)
return out_files
async def _update_presigned(self, updates):
@ -417,13 +415,10 @@ class CrawlOps:
redis = None
try:
redis = await aioredis.from_url(
self.get_redis_url(crawl_id), encoding="utf-8", decode_responses=True
)
redis = await self.get_redis(crawl_id)
total = await redis.llen(f"{crawl_id}:q")
results = await redis.lrange(f"{crawl_id}:q", offset, count)
results = [json.loads(result)["url"] for result in results]
results = await redis.lrange(f"{crawl_id}:q", -offset - count, -offset - 1)
results = [json.loads(result)["url"] for result in reversed(results)]
except exceptions.ConnectionError:
# can't connect to redis, likely not initialized yet
pass
@ -435,39 +430,130 @@ class CrawlOps:
return {"total": total, "results": results, "matched": matched}
async def match_crawl_queue(self, crawl_id, regex):
"""get crawl queue"""
async def iter_crawl_queue(self, regex, redis, crawl_id, total, step=50):
"""iterate over urls that match regex in crawl queue list"""
async def match_crawl_queue(self, crawl_id, regex):
"""get list of urls that match regex"""
total = 0
redis = None
try:
redis = await aioredis.from_url(
self.get_redis_url(crawl_id), encoding="utf-8", decode_responses=True
)
redis = await self.get_redis(crawl_id)
total = await redis.llen(f"{crawl_id}:q")
except exceptions.ConnectionError:
# can't connect to redis, likely not initialized yet
pass
matched = []
regex = re.compile(regex)
matched = []
step = 50
for count in range(0, total, step):
results = await redis.lrange(f"{crawl_id}:q", count, count + step)
for result in results:
results = await redis.lrange(f"{crawl_id}:q", -count - step, -count - 1)
for result in reversed(results):
url = json.loads(result)["url"]
if regex.search(url):
matched.append(url)
return {"total": total, "matched": matched}
def get_redis_url(self, crawl_id):
async def filter_crawl_queue(self, crawl_id, regex):
"""filter out urls that match regex"""
total = 0
redis = None
q_key = f"{crawl_id}:q"
s_key = f"{crawl_id}:s"
try:
redis = await self.get_redis(crawl_id)
total = await redis.llen(q_key)
except exceptions.ConnectionError:
# can't connect to redis, likely not initialized yet
pass
dircount = -1
regex = re.compile(regex)
step = 50
count = 0
num_removed = 0
# pylint: disable=fixme
# todo: do this in a more efficient way?
# currently quite inefficient as redis does not have a way
# to atomically check and remove value from list
# so removing each jsob block by value
while count < total:
if dircount == -1 and count > total / 2:
dircount = 1
results = await redis.lrange(q_key, -count - step, -count - 1)
count += step
for result in reversed(results):
url = json.loads(result)["url"]
if regex.search(url):
await redis.srem(s_key, url)
res = await redis.lrem(q_key, dircount, result)
if res:
count -= res
num_removed += res
print(f"Removed {result}: {res}", flush=True)
return num_removed
async def get_redis(self, crawl_id):
"""get redis url for crawl id"""
# pylint: disable=line-too-long
return f"redis://redis-{crawl_id}-0.redis-{crawl_id}.{self.namespace}.svc.cluster.local/0"
redis_url = f"redis://redis-{crawl_id}-0.redis-{crawl_id}.{self.namespace}.svc.cluster.local/0"
return await aioredis.from_url(
redis_url, encoding="utf-8", decode_responses=True
)
async def add_exclusion(self, crawl_id, regex, archive, user):
"""create new config with additional exclusion, copying existing config"""
raw = await self.get_crawl_raw(crawl_id, archive)
cid = raw.get("cid")
new_cid = await self.crawl_configs.copy_add_remove_exclusion(
regex, cid, archive, user, add=True
)
await self.crawls.find_one_and_update(
{"_id": crawl_id}, {"$set": {"cid": new_cid}}
)
# restart crawl pods
change_c = self.crawl_manager.change_crawl_config(crawl_id, archive.id, new_cid)
filter_q = self.filter_crawl_queue(crawl_id, regex)
_, num_removed = await asyncio.gather(change_c, filter_q)
return {"new_cid": new_cid, "num_removed": num_removed}
async def remove_exclusion(self, crawl_id, regex, archive, user):
"""create new config with exclusion removed, copying existing config"""
raw = await self.get_crawl_raw(crawl_id, archive)
cid = raw.get("cid")
new_cid = await self.crawl_configs.copy_add_remove_exclusion(
regex, cid, archive, user, add=False
)
await self.crawls.find_one_and_update(
{"_id": crawl_id}, {"$set": {"cid": new_cid}}
)
# restart crawl pods
await self.crawl_manager.change_crawl_config(crawl_id, archive.id, new_cid)
return {"new_cid": new_cid}
# ============================================================================
@ -595,6 +681,32 @@ def init_crawls_api(
return await ops.match_crawl_queue(crawl_id, regex)
@app.post(
"/archives/{aid}/crawls/{crawl_id}/exclusions",
tags=["crawls"],
)
async def add_exclusion(
crawl_id,
regex: str,
archive: Archive = Depends(archive_crawl_dep),
user: User = Depends(user_dep),
):
return await ops.add_exclusion(crawl_id, regex, archive, user)
@app.delete(
"/archives/{aid}/crawls/{crawl_id}/exclusions",
tags=["crawls"],
)
async def remove_exclusion(
crawl_id,
regex: str,
archive: Archive = Depends(archive_crawl_dep),
user: User = Depends(user_dep),
):
return await ops.remove_exclusion(crawl_id, regex, archive, user)
return ops

View File

@ -45,6 +45,30 @@ class K8SCrawlJob(K8SJobMixin, CrawlJob):
return await super().load_initial_scale()
async def _change_crawl_config(self, cid):
"""patch existing crawl statefulset to use new crawlconfig id
this will cause the crawl to restart with new config"""
patch_config = {
"spec": {
"template": {
"spec": {
"volumes": [
{
"name": "crawl-config",
"configMap": {"name": f"crawl-config-{cid}"},
}
]
}
}
}
}
await self.apps_api.patch_namespaced_stateful_set(
name=f"crawl-{self.job_id}", namespace=self.namespace, body=patch_config
)
return {"success": True}
async def _get_crawl(self):
try:
return await self.apps_api.read_namespaced_stateful_set(

View File

@ -80,7 +80,7 @@ spec:
valueFrom:
configMapKeyRef:
name: crawl-config-{{ cid }}
key: STORE_PATH
key: STORE_FILENAME
- name: STORAGE_NAME
valueFrom:

View File

@ -292,22 +292,13 @@ spec:
value: {{ redis_url }}/crawls-done
- name: STORE_PATH
valueFrom:
configMapKeyRef:
name: crawl-config-{{ cid }}
key: STORE_PATH
value: {{ env.STORE_PATH }}
- name: STORE_FILENAME
valueFrom:
configMapKeyRef:
name: crawl-config-{{ cid }}
key: STORE_FILENAME
value: {{ env.STORAGE_FILENAME }}
- name: STORE_USER
valueFrom:
configMapKeyRef:
name: crawl-config-{{ cid }}
key: USER_ID
value: {{ env.USER_ID }}
resources:
limits:

View File

@ -101,6 +101,9 @@ class SwarmCrawlJob(SwarmJobMixin, CrawlJob):
# likely fails as containers still shutting down
# await loop.run_in_executor(None, delete_volumes, volumes)
async def _change_crawl_config(self, cid):
raise NotImplementedError("Not Supported")
# ============================================================================
@app.on_event("startup")