From 0935d43a97edb9f3093d29455112187b664bd35a Mon Sep 17 00:00:00 2001 From: Ilya Kreymer Date: Mon, 6 Nov 2023 09:36:25 -0800 Subject: [PATCH] exclusion optimizations: dynamic exclusions (part of #1216): (#1268) - instead of restarting crawler when exclusion added/removed, add a message to a redis list (per crawler instance) - no longer filtering existing queue on backend, now handled via crawler (implemented in 0.12.0 via webrecorder/browsertrix-crawler#408) - match response optimization: instead of returning first 1000 matches, limits response to 500K and returns however many matches fit in that response size (for optional pagination on frontend) --- backend/btrixcloud/crawls.py | 115 ++++---------------- frontend/src/components/crawl-queue.ts | 30 +++++ frontend/src/components/exclusion-editor.ts | 26 ++++- frontend/src/pages/org/workflow-detail.ts | 21 +++- 4 files changed, 93 insertions(+), 99 deletions(-) diff --git a/backend/btrixcloud/crawls.py b/backend/btrixcloud/crawls.py index d19769e6..8d877d0a 100644 --- a/backend/btrixcloud/crawls.py +++ b/backend/btrixcloud/crawls.py @@ -1,7 +1,6 @@ """ Crawl API """ # pylint: disable=too-many-lines -import asyncio import json import re import urllib.parse @@ -34,6 +33,10 @@ from .models import ( ) +MAX_MATCH_SIZE = 500000 +DEFAULT_RANGE_LIMIT = 50 + + # ============================================================================ class CrawlOps(BaseCrawlOps): """Crawl Ops""" @@ -306,16 +309,6 @@ class CrawlOps(BaseCrawlOps): # fallback to old crawler queue return reversed(await redis.lrange(key, -offset - count, -offset - 1)) - async def _crawl_queue_rem(self, redis, key, values, dircount=1): - try: - return await redis.zrem(key, *values) - except exceptions.ResponseError: - # fallback to old crawler queue - res = 0 - for value in values: - res += await redis.lrem(key, dircount, value) - return res - async def get_crawl_queue(self, crawl_id, offset, count, regex): """get crawl queue""" @@ -345,13 +338,13 @@ class CrawlOps(BaseCrawlOps): return {"total": total, "results": results, "matched": matched} - async def match_crawl_queue(self, crawl_id, regex, offset=0, limit=1000): + async def match_crawl_queue(self, crawl_id, regex, offset=0): """get list of urls that match regex, starting at offset and at most around 'limit'. (limit rounded to next step boundary, so limit <= next_offset < limit + step""" total = 0 matched = [] - step = 50 + step = DEFAULT_RANGE_LIMIT async with self.get_redis(crawl_id) as redis: try: @@ -366,6 +359,7 @@ class CrawlOps(BaseCrawlOps): raise HTTPException(status_code=400, detail="invalid_regex") from exc next_offset = -1 + size = 0 for count in range(offset, total, step): results = await self._crawl_queue_range( @@ -374,75 +368,17 @@ class CrawlOps(BaseCrawlOps): for result in results: url = json.loads(result)["url"] if regex.search(url): + size += len(url) matched.append(url) - # if exceeded limit set nextOffset to next step boundary + # if size of match response exceeds size limit, set nextOffset # and break - if len(matched) >= limit: + if size > MAX_MATCH_SIZE: next_offset = count + step break return {"total": total, "matched": matched, "nextOffset": next_offset} - async def filter_crawl_queue(self, crawl_id, regex): - """filter out urls that match regex""" - # pylint: disable=too-many-locals - total = 0 - q_key = f"{crawl_id}:q" - s_key = f"{crawl_id}:s" - step = 50 - num_removed = 0 - - async with self.get_redis(crawl_id) as redis: - try: - total = await self._crawl_queue_len(redis, f"{crawl_id}:q") - except exceptions.ConnectionError: - # can't connect to redis, likely not initialized yet - pass - - dircount = -1 - - try: - regex = re.compile(regex) - except re.error as exc: - raise HTTPException(status_code=400, detail="invalid_regex") from exc - - count = 0 - - # pylint: disable=fixme - # todo: do this in a more efficient way? - # currently quite inefficient as redis does not have a way - # to atomically check and remove value from list - # so removing each jsob block by value - while count < total: - if dircount == -1 and count > total / 2: - dircount = 1 - results = await self._crawl_queue_range(redis, q_key, count, step) - count += step - - qrems = [] - srems = [] - - for result in results: - url = json.loads(result)["url"] - if regex.search(url): - srems.append(url) - # await redis.srem(s_key, url) - # res = await self._crawl_queue_rem(redis, q_key, result, dircount) - qrems.append(result) - - if not srems: - continue - - await redis.srem(s_key, *srems) - res = await self._crawl_queue_rem(redis, q_key, qrems, dircount) - if res: - count -= res - num_removed += res - print(f"Removed {res} from queue", flush=True) - - return num_removed - async def get_errors_from_redis( self, crawl_id: str, page_size: int = DEFAULT_PAGE_SIZE, page: int = 1 ): @@ -471,6 +407,18 @@ class CrawlOps(BaseCrawlOps): cid = crawl_raw.get("cid") + scale = crawl_raw.get("scale", 1) + + async with self.get_redis(crawl_id) as redis: + query = { + "regex": regex, + "type": "addExclusion" if add else "removeExclusion", + } + query_str = json.dumps(query) + + for i in range(0, scale): + await redis.rpush(f"crawl-{crawl_id}-{i}:msg", query_str) + new_config = await self.crawl_configs.add_or_remove_exclusion( regex, cid, org, user, add ) @@ -480,21 +428,7 @@ class CrawlOps(BaseCrawlOps): {"$set": {"config": new_config.dict()}}, ) - resp = {"success": True} - - # restart crawl pods - restart_c = self.crawl_manager.rollover_restart_crawl(crawl_id) - - if add: - filter_q = self.filter_crawl_queue(crawl_id, regex) - - _, num_removed = await asyncio.gather(restart_c, filter_q) - resp["num_removed"] = num_removed - - else: - await restart_c - - return resp + return {"success": True} async def update_crawl_state_if_allowed( self, crawl_id, state, allowed_from, **kwargs @@ -815,12 +749,11 @@ def init_crawls_api(app, user_dep, *args): crawl_id, regex: str, offset: int = 0, - limit: int = 1000, org: Organization = Depends(org_crawl_dep), ): await ops.get_crawl_raw(crawl_id, org) - return await ops.match_crawl_queue(crawl_id, regex, offset, limit) + return await ops.match_crawl_queue(crawl_id, regex, offset) @app.post( "/orgs/{oid}/crawls/{crawl_id}/exclusions", diff --git a/frontend/src/components/crawl-queue.ts b/frontend/src/components/crawl-queue.ts index 26f61bec..597f5743 100644 --- a/frontend/src/components/crawl-queue.ts +++ b/frontend/src/components/crawl-queue.ts @@ -5,6 +5,7 @@ import throttle from "lodash/fp/throttle"; import LiteElement, { html } from "../utils/LiteElement"; import type { AuthState } from "../utils/AuthService"; +import { PropertyValueMap } from "lit"; type Pages = string[]; type ResponseData = { @@ -46,6 +47,12 @@ export class CrawlQueue extends LiteElement { /** `new RegExp` constructor string */ regex: string = ""; + @property({ type: Array }) + private exclusions = []; + + @state() + private exclusionsRx: RegExp[] = []; + @state() private queue?: ResponseData; @@ -62,6 +69,16 @@ export class CrawlQueue extends LiteElement { super.disconnectedCallback(); } + protected updated( + changedProperties: PropertyValueMap | Map + ): void { + if (changedProperties.has("exclusions")) { + this.exclusionsRx = this.exclusions + ? this.exclusions.map((x) => new RegExp(x)) + : []; + } + } + willUpdate(changedProperties: Map) { if ( changedProperties.has("authState") || @@ -104,6 +121,7 @@ export class CrawlQueue extends LiteElement { ${this.queue.results.map((url, idx) => { const isMatch = this.queue!.matched.some((v) => v === url); + const isExcluded = !isMatch && this.isExcluded(url); return html` { const offset = "0"; const count = this.pageSize.toString(); diff --git a/frontend/src/components/exclusion-editor.ts b/frontend/src/components/exclusion-editor.ts index 08adc375..d826a785 100644 --- a/frontend/src/components/exclusion-editor.ts +++ b/frontend/src/components/exclusion-editor.ts @@ -135,6 +135,7 @@ export class ExclusionEditor extends LiteElement { crawlId=${this.crawlId!} .authState=${this.authState} regex=${this.regex} + .exclusions=${(this.config && this.config.exclude) || []} matchedTotal=${this.matchedURLs?.length || 0} >`; } @@ -224,10 +225,21 @@ export class ExclusionEditor extends LiteElement { return data; } - private async handleAddRegex(e: ExclusionAddEvent) { + async handleAddRegex(e?: ExclusionAddEvent) { this.isSubmitting = true; - const { regex, onSuccess } = e.detail; + let regex = null; + let onSuccess = null; + + if (e) { + ({ regex, onSuccess } = e.detail); + } else { + // if not provided, use current regex, if set + if (!this.regex) { + return; + } + regex = this.regex; + } try { const params = new URLSearchParams({ regex }); @@ -250,7 +262,9 @@ export class ExclusionEditor extends LiteElement { this.matchedURLs = null; await this.updateComplete; - onSuccess(); + if (onSuccess) { + onSuccess(); + } this.dispatchEvent(new CustomEvent("on-success")); } else { throw data; @@ -271,4 +285,10 @@ export class ExclusionEditor extends LiteElement { this.isSubmitting = false; } + + async onClose() { + if (this.regex && this.isActiveCrawl) { + await this.handleAddRegex(); + } + } } diff --git a/frontend/src/pages/org/workflow-detail.ts b/frontend/src/pages/org/workflow-detail.ts index 8a78df27..04a28ccd 100644 --- a/frontend/src/pages/org/workflow-detail.ts +++ b/frontend/src/pages/org/workflow-detail.ts @@ -24,6 +24,7 @@ import { APIPaginatedList } from "../../types/api"; import { inactiveCrawlStates, isActive } from "../../utils/crawler"; import { SlSelect } from "@shoelace-style/shoelace"; import type { PageChangeEvent } from "../../components/pagination"; +import { ExclusionEditor } from "../../components/exclusion-editor"; const SECTIONS = ["crawls", "watch", "settings", "logs"] as const; type Tab = (typeof SECTIONS)[number]; @@ -173,8 +174,12 @@ export class WorkflowDetail extends LiteElement { this.fetchWorkflow(); this.fetchSeeds(); } - if (changedProperties.has("isEditing") && this.isEditing) { - this.stopPoll(); + if (changedProperties.has("isEditing")) { + if (this.isEditing) { + this.stopPoll(); + } else { + this.getActivePanelFromHash(); + } } if ( !this.isEditing && @@ -1219,9 +1224,7 @@ export class WorkflowDetail extends LiteElement { >` : ""}
- (this.openDialogName = undefined)} + ${msg("Done Editing")}
@@ -1355,6 +1358,14 @@ export class WorkflowDetail extends LiteElement { return data; } + private async onCloseExclusions(e: Event) { + const editor = this.querySelector("btrix-exclusion-editor"); + if (editor && editor instanceof ExclusionEditor) { + await editor.onClose(); + } + this.openDialogName = undefined; + } + private async fetchSeeds(): Promise { try { this.getSeedsPromise = this.getSeeds();