exclusion optimizations: dynamic exclusions (part of #1216): (#1268)

- instead of restarting crawler when exclusion added/removed, add a
message to a redis list (per crawler instance)
- no longer filtering existing queue on backend, now handled via crawler (implemented in 0.12.0 via webrecorder/browsertrix-crawler#408)
- match response optimization: instead of returning first 1000 matches,
limits response to 500K and returns however many matches fit in that
response size (for optional pagination on frontend)
This commit is contained in:
Ilya Kreymer 2023-11-06 09:36:25 -08:00 committed by GitHub
parent 0b8bbcf8e6
commit 0935d43a97
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 93 additions and 99 deletions

View File

@ -1,7 +1,6 @@
""" Crawl API """ """ Crawl API """
# pylint: disable=too-many-lines # pylint: disable=too-many-lines
import asyncio
import json import json
import re import re
import urllib.parse import urllib.parse
@ -34,6 +33,10 @@ from .models import (
) )
MAX_MATCH_SIZE = 500000
DEFAULT_RANGE_LIMIT = 50
# ============================================================================ # ============================================================================
class CrawlOps(BaseCrawlOps): class CrawlOps(BaseCrawlOps):
"""Crawl Ops""" """Crawl Ops"""
@ -306,16 +309,6 @@ class CrawlOps(BaseCrawlOps):
# fallback to old crawler queue # fallback to old crawler queue
return reversed(await redis.lrange(key, -offset - count, -offset - 1)) return reversed(await redis.lrange(key, -offset - count, -offset - 1))
async def _crawl_queue_rem(self, redis, key, values, dircount=1):
try:
return await redis.zrem(key, *values)
except exceptions.ResponseError:
# fallback to old crawler queue
res = 0
for value in values:
res += await redis.lrem(key, dircount, value)
return res
async def get_crawl_queue(self, crawl_id, offset, count, regex): async def get_crawl_queue(self, crawl_id, offset, count, regex):
"""get crawl queue""" """get crawl queue"""
@ -345,13 +338,13 @@ class CrawlOps(BaseCrawlOps):
return {"total": total, "results": results, "matched": matched} return {"total": total, "results": results, "matched": matched}
async def match_crawl_queue(self, crawl_id, regex, offset=0, limit=1000): async def match_crawl_queue(self, crawl_id, regex, offset=0):
"""get list of urls that match regex, starting at offset and at most """get list of urls that match regex, starting at offset and at most
around 'limit'. (limit rounded to next step boundary, so around 'limit'. (limit rounded to next step boundary, so
limit <= next_offset < limit + step""" limit <= next_offset < limit + step"""
total = 0 total = 0
matched = [] matched = []
step = 50 step = DEFAULT_RANGE_LIMIT
async with self.get_redis(crawl_id) as redis: async with self.get_redis(crawl_id) as redis:
try: try:
@ -366,6 +359,7 @@ class CrawlOps(BaseCrawlOps):
raise HTTPException(status_code=400, detail="invalid_regex") from exc raise HTTPException(status_code=400, detail="invalid_regex") from exc
next_offset = -1 next_offset = -1
size = 0
for count in range(offset, total, step): for count in range(offset, total, step):
results = await self._crawl_queue_range( results = await self._crawl_queue_range(
@ -374,75 +368,17 @@ class CrawlOps(BaseCrawlOps):
for result in results: for result in results:
url = json.loads(result)["url"] url = json.loads(result)["url"]
if regex.search(url): if regex.search(url):
size += len(url)
matched.append(url) matched.append(url)
# if exceeded limit set nextOffset to next step boundary # if size of match response exceeds size limit, set nextOffset
# and break # and break
if len(matched) >= limit: if size > MAX_MATCH_SIZE:
next_offset = count + step next_offset = count + step
break break
return {"total": total, "matched": matched, "nextOffset": next_offset} return {"total": total, "matched": matched, "nextOffset": next_offset}
async def filter_crawl_queue(self, crawl_id, regex):
"""filter out urls that match regex"""
# pylint: disable=too-many-locals
total = 0
q_key = f"{crawl_id}:q"
s_key = f"{crawl_id}:s"
step = 50
num_removed = 0
async with self.get_redis(crawl_id) as redis:
try:
total = await self._crawl_queue_len(redis, f"{crawl_id}:q")
except exceptions.ConnectionError:
# can't connect to redis, likely not initialized yet
pass
dircount = -1
try:
regex = re.compile(regex)
except re.error as exc:
raise HTTPException(status_code=400, detail="invalid_regex") from exc
count = 0
# pylint: disable=fixme
# todo: do this in a more efficient way?
# currently quite inefficient as redis does not have a way
# to atomically check and remove value from list
# so removing each jsob block by value
while count < total:
if dircount == -1 and count > total / 2:
dircount = 1
results = await self._crawl_queue_range(redis, q_key, count, step)
count += step
qrems = []
srems = []
for result in results:
url = json.loads(result)["url"]
if regex.search(url):
srems.append(url)
# await redis.srem(s_key, url)
# res = await self._crawl_queue_rem(redis, q_key, result, dircount)
qrems.append(result)
if not srems:
continue
await redis.srem(s_key, *srems)
res = await self._crawl_queue_rem(redis, q_key, qrems, dircount)
if res:
count -= res
num_removed += res
print(f"Removed {res} from queue", flush=True)
return num_removed
async def get_errors_from_redis( async def get_errors_from_redis(
self, crawl_id: str, page_size: int = DEFAULT_PAGE_SIZE, page: int = 1 self, crawl_id: str, page_size: int = DEFAULT_PAGE_SIZE, page: int = 1
): ):
@ -471,6 +407,18 @@ class CrawlOps(BaseCrawlOps):
cid = crawl_raw.get("cid") cid = crawl_raw.get("cid")
scale = crawl_raw.get("scale", 1)
async with self.get_redis(crawl_id) as redis:
query = {
"regex": regex,
"type": "addExclusion" if add else "removeExclusion",
}
query_str = json.dumps(query)
for i in range(0, scale):
await redis.rpush(f"crawl-{crawl_id}-{i}:msg", query_str)
new_config = await self.crawl_configs.add_or_remove_exclusion( new_config = await self.crawl_configs.add_or_remove_exclusion(
regex, cid, org, user, add regex, cid, org, user, add
) )
@ -480,21 +428,7 @@ class CrawlOps(BaseCrawlOps):
{"$set": {"config": new_config.dict()}}, {"$set": {"config": new_config.dict()}},
) )
resp = {"success": True} return {"success": True}
# restart crawl pods
restart_c = self.crawl_manager.rollover_restart_crawl(crawl_id)
if add:
filter_q = self.filter_crawl_queue(crawl_id, regex)
_, num_removed = await asyncio.gather(restart_c, filter_q)
resp["num_removed"] = num_removed
else:
await restart_c
return resp
async def update_crawl_state_if_allowed( async def update_crawl_state_if_allowed(
self, crawl_id, state, allowed_from, **kwargs self, crawl_id, state, allowed_from, **kwargs
@ -815,12 +749,11 @@ def init_crawls_api(app, user_dep, *args):
crawl_id, crawl_id,
regex: str, regex: str,
offset: int = 0, offset: int = 0,
limit: int = 1000,
org: Organization = Depends(org_crawl_dep), org: Organization = Depends(org_crawl_dep),
): ):
await ops.get_crawl_raw(crawl_id, org) await ops.get_crawl_raw(crawl_id, org)
return await ops.match_crawl_queue(crawl_id, regex, offset, limit) return await ops.match_crawl_queue(crawl_id, regex, offset)
@app.post( @app.post(
"/orgs/{oid}/crawls/{crawl_id}/exclusions", "/orgs/{oid}/crawls/{crawl_id}/exclusions",

View File

@ -5,6 +5,7 @@ import throttle from "lodash/fp/throttle";
import LiteElement, { html } from "../utils/LiteElement"; import LiteElement, { html } from "../utils/LiteElement";
import type { AuthState } from "../utils/AuthService"; import type { AuthState } from "../utils/AuthService";
import { PropertyValueMap } from "lit";
type Pages = string[]; type Pages = string[];
type ResponseData = { type ResponseData = {
@ -46,6 +47,12 @@ export class CrawlQueue extends LiteElement {
/** `new RegExp` constructor string */ /** `new RegExp` constructor string */
regex: string = ""; regex: string = "";
@property({ type: Array })
private exclusions = [];
@state()
private exclusionsRx: RegExp[] = [];
@state() @state()
private queue?: ResponseData; private queue?: ResponseData;
@ -62,6 +69,16 @@ export class CrawlQueue extends LiteElement {
super.disconnectedCallback(); super.disconnectedCallback();
} }
protected updated(
changedProperties: PropertyValueMap<any> | Map<PropertyKey, unknown>
): void {
if (changedProperties.has("exclusions")) {
this.exclusionsRx = this.exclusions
? this.exclusions.map((x) => new RegExp(x))
: [];
}
}
willUpdate(changedProperties: Map<string, any>) { willUpdate(changedProperties: Map<string, any>) {
if ( if (
changedProperties.has("authState") || changedProperties.has("authState") ||
@ -104,6 +121,7 @@ export class CrawlQueue extends LiteElement {
<btrix-numbered-list class="text-xs break-all" aria-live="polite"> <btrix-numbered-list class="text-xs break-all" aria-live="polite">
${this.queue.results.map((url, idx) => { ${this.queue.results.map((url, idx) => {
const isMatch = this.queue!.matched.some((v) => v === url); const isMatch = this.queue!.matched.some((v) => v === url);
const isExcluded = !isMatch && this.isExcluded(url);
return html` return html`
<btrix-numbered-list-item> <btrix-numbered-list-item>
<span class="${isMatch ? "text-red-600" : ""}" slot="marker" <span class="${isMatch ? "text-red-600" : ""}" slot="marker"
@ -112,6 +130,8 @@ export class CrawlQueue extends LiteElement {
<a <a
class="${isMatch class="${isMatch
? "text-red-500 hover:text-red-400" ? "text-red-500 hover:text-red-400"
: isExcluded
? "text-gray-500 hover:text-gray-400 line-through"
: "text-blue-500 hover:text-blue-400"}" : "text-blue-500 hover:text-blue-400"}"
href=${url} href=${url}
target="_blank" target="_blank"
@ -204,6 +224,16 @@ export class CrawlQueue extends LiteElement {
} }
} }
isExcluded(url: string) {
for (const rx of this.exclusionsRx) {
if (rx.test(url)) {
return true;
}
}
return false;
}
private async getQueue(): Promise<ResponseData> { private async getQueue(): Promise<ResponseData> {
const offset = "0"; const offset = "0";
const count = this.pageSize.toString(); const count = this.pageSize.toString();

View File

@ -135,6 +135,7 @@ export class ExclusionEditor extends LiteElement {
crawlId=${this.crawlId!} crawlId=${this.crawlId!}
.authState=${this.authState} .authState=${this.authState}
regex=${this.regex} regex=${this.regex}
.exclusions=${(this.config && this.config.exclude) || []}
matchedTotal=${this.matchedURLs?.length || 0} matchedTotal=${this.matchedURLs?.length || 0}
></btrix-crawl-queue>`; ></btrix-crawl-queue>`;
} }
@ -224,10 +225,21 @@ export class ExclusionEditor extends LiteElement {
return data; return data;
} }
private async handleAddRegex(e: ExclusionAddEvent) { async handleAddRegex(e?: ExclusionAddEvent) {
this.isSubmitting = true; this.isSubmitting = true;
const { regex, onSuccess } = e.detail; let regex = null;
let onSuccess = null;
if (e) {
({ regex, onSuccess } = e.detail);
} else {
// if not provided, use current regex, if set
if (!this.regex) {
return;
}
regex = this.regex;
}
try { try {
const params = new URLSearchParams({ regex }); const params = new URLSearchParams({ regex });
@ -250,7 +262,9 @@ export class ExclusionEditor extends LiteElement {
this.matchedURLs = null; this.matchedURLs = null;
await this.updateComplete; await this.updateComplete;
if (onSuccess) {
onSuccess(); onSuccess();
}
this.dispatchEvent(new CustomEvent("on-success")); this.dispatchEvent(new CustomEvent("on-success"));
} else { } else {
throw data; throw data;
@ -271,4 +285,10 @@ export class ExclusionEditor extends LiteElement {
this.isSubmitting = false; this.isSubmitting = false;
} }
async onClose() {
if (this.regex && this.isActiveCrawl) {
await this.handleAddRegex();
}
}
} }

View File

@ -24,6 +24,7 @@ import { APIPaginatedList } from "../../types/api";
import { inactiveCrawlStates, isActive } from "../../utils/crawler"; import { inactiveCrawlStates, isActive } from "../../utils/crawler";
import { SlSelect } from "@shoelace-style/shoelace"; import { SlSelect } from "@shoelace-style/shoelace";
import type { PageChangeEvent } from "../../components/pagination"; import type { PageChangeEvent } from "../../components/pagination";
import { ExclusionEditor } from "../../components/exclusion-editor";
const SECTIONS = ["crawls", "watch", "settings", "logs"] as const; const SECTIONS = ["crawls", "watch", "settings", "logs"] as const;
type Tab = (typeof SECTIONS)[number]; type Tab = (typeof SECTIONS)[number];
@ -173,8 +174,12 @@ export class WorkflowDetail extends LiteElement {
this.fetchWorkflow(); this.fetchWorkflow();
this.fetchSeeds(); this.fetchSeeds();
} }
if (changedProperties.has("isEditing") && this.isEditing) { if (changedProperties.has("isEditing")) {
if (this.isEditing) {
this.stopPoll(); this.stopPoll();
} else {
this.getActivePanelFromHash();
}
} }
if ( if (
!this.isEditing && !this.isEditing &&
@ -1219,9 +1224,7 @@ export class WorkflowDetail extends LiteElement {
></btrix-exclusion-editor>` ></btrix-exclusion-editor>`
: ""} : ""}
<div slot="footer"> <div slot="footer">
<sl-button <sl-button size="small" @click=${this.onCloseExclusions}
size="small"
@click=${() => (this.openDialogName = undefined)}
>${msg("Done Editing")}</sl-button >${msg("Done Editing")}</sl-button
> >
</div> </div>
@ -1355,6 +1358,14 @@ export class WorkflowDetail extends LiteElement {
return data; return data;
} }
private async onCloseExclusions(e: Event) {
const editor = this.querySelector("btrix-exclusion-editor");
if (editor && editor instanceof ExclusionEditor) {
await editor.onClose();
}
this.openDialogName = undefined;
}
private async fetchSeeds(): Promise<void> { private async fetchSeeds(): Promise<void> {
try { try {
this.getSeedsPromise = this.getSeeds(); this.getSeedsPromise = this.getSeeds();