backend: update queue apis to work with new sorted queue apis (also b… (#712)

* backend: update queue apis to work with new sorted queue apis (also backwards compatible to existing apis)
designed for browsertrix-crawler 0.9.0-beta.1 but also backwards compatible with older list-based queue as well
This commit is contained in:
Ilya Kreymer 2023-03-17 21:11:17 -07:00 committed by GitHub
parent b9a24fa5e2
commit 07e9f51292
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -502,6 +502,30 @@ class CrawlOps:
# return whatever detail may be included in the response
raise HTTPException(status_code=400, detail=result.get("error"))
async def _crawl_queue_len(self, redis, key):
try:
return await redis.zcard(key)
except exceptions.ResponseError:
# fallback to old crawler queue
return await redis.llen(key)
async def _crawl_queue_range(self, redis, key, offset, count):
try:
return await redis.zrangebyscore(key, 0, "inf", offset, count)
except exceptions.ResponseError:
# fallback to old crawler queue
return reversed(await redis.lrange(key, -offset - count, -offset - 1))
async def _crawl_queue_rem(self, redis, key, values, dircount=1):
try:
return await redis.zrem(key, *values)
except exceptions.ResponseError:
# fallback to old crawler queue
res = 0
for value in values:
res += await redis.lrem(key, dircount, value)
return res
async def get_crawl_queue(self, crawl_id, offset, count, regex):
"""get crawl queue"""
@ -511,9 +535,12 @@ class CrawlOps:
try:
redis = await self.get_redis(crawl_id)
total = await redis.llen(f"{crawl_id}:q")
results = await redis.lrange(f"{crawl_id}:q", -offset - count, -offset - 1)
results = [json.loads(result)["url"] for result in reversed(results)]
total = await self._crawl_queue_len(redis, f"{crawl_id}:q")
results = await self._crawl_queue_range(
redis, f"{crawl_id}:q", offset, count
)
results = [json.loads(result)["url"] for result in results]
except exceptions.ConnectionError:
# can't connect to redis, likely not initialized yet
pass
@ -525,9 +552,6 @@ class CrawlOps:
return {"total": total, "results": results, "matched": matched}
async def iter_crawl_queue(self, regex, redis, crawl_id, total, step=50):
"""iterate over urls that match regex in crawl queue list"""
async def match_crawl_queue(self, crawl_id, regex):
"""get list of urls that match regex"""
total = 0
@ -535,7 +559,7 @@ class CrawlOps:
try:
redis = await self.get_redis(crawl_id)
total = await redis.llen(f"{crawl_id}:q")
total = await self._crawl_queue_len(redis, f"{crawl_id}:q")
except exceptions.ConnectionError:
# can't connect to redis, likely not initialized yet
pass
@ -545,8 +569,8 @@ class CrawlOps:
step = 50
for count in range(0, total, step):
results = await redis.lrange(f"{crawl_id}:q", -count - step, -count - 1)
for result in reversed(results):
results = await self._crawl_queue_range(redis, f"{crawl_id}:q", count, step)
for result in results:
url = json.loads(result)["url"]
if regex.search(url):
matched.append(url)
@ -555,6 +579,7 @@ class CrawlOps:
async def filter_crawl_queue(self, crawl_id, regex):
"""filter out urls that match regex"""
# pylint: disable=too-many-locals
total = 0
redis = None
@ -563,7 +588,7 @@ class CrawlOps:
try:
redis = await self.get_redis(crawl_id)
total = await redis.llen(q_key)
total = await self._crawl_queue_len(redis, f"{crawl_id}:q")
except exceptions.ConnectionError:
# can't connect to redis, likely not initialized yet
pass
@ -583,17 +608,29 @@ class CrawlOps:
while count < total:
if dircount == -1 and count > total / 2:
dircount = 1
results = await redis.lrange(q_key, -count - step, -count - 1)
results = await self._crawl_queue_range(redis, q_key, count, step)
count += step
for result in reversed(results):
qrems = []
srems = []
for result in results:
url = json.loads(result)["url"]
if regex.search(url):
await redis.srem(s_key, url)
res = await redis.lrem(q_key, dircount, result)
if res:
count -= res
num_removed += res
print(f"Removed {result}: {res}", flush=True)
srems.append(url)
# await redis.srem(s_key, url)
# res = await self._crawl_queue_rem(redis, q_key, result, dircount)
qrems.append(result)
if not srems:
continue
await redis.srem(s_key, *srems)
res = await self._crawl_queue_rem(redis, q_key, qrems, dircount)
if res:
count -= res
num_removed += res
print(f"Removed {res} from queue", flush=True)
return num_removed