Optimize single-page crawl workflows (#2656)
For single page crawls: - Always force 1 browser to be used, ignoring browser windows/scale setting - Don't use custom PVC volumes in crawler / redis, just use emptyDir - no chance of crawler being interrupted and restarted on different machine for a single page. Adds a 'is_single_page' check to CrawlConfig, checking for either limit or scopeType / no extra hops. Fixes #2655
This commit is contained in:
parent
86c4d326e9
commit
8ea16393c5
@ -45,6 +45,7 @@ from .models import (
|
|||||||
CrawlerProxy,
|
CrawlerProxy,
|
||||||
CrawlerProxies,
|
CrawlerProxies,
|
||||||
ValidateCustomBehavior,
|
ValidateCustomBehavior,
|
||||||
|
RawCrawlConfig,
|
||||||
)
|
)
|
||||||
from .utils import (
|
from .utils import (
|
||||||
dt_now,
|
dt_now,
|
||||||
@ -223,15 +224,18 @@ class CrawlConfigOps:
|
|||||||
) -> CrawlConfigAddedResponse:
|
) -> CrawlConfigAddedResponse:
|
||||||
"""Add new crawl config"""
|
"""Add new crawl config"""
|
||||||
|
|
||||||
|
# ensure crawlChannel is valid
|
||||||
|
if not self.get_channel_crawler_image(config_in.crawlerChannel):
|
||||||
|
raise HTTPException(status_code=404, detail="crawler_not_found")
|
||||||
|
|
||||||
# Overrides scale if set
|
# Overrides scale if set
|
||||||
if config_in.browserWindows is None:
|
if config_in.browserWindows is None:
|
||||||
config_in.browserWindows = browser_windows_from_scale(
|
config_in.browserWindows = browser_windows_from_scale(
|
||||||
cast(int, config_in.scale)
|
cast(int, config_in.scale)
|
||||||
)
|
)
|
||||||
|
|
||||||
# ensure crawlChannel is valid
|
if self.is_single_page(config_in.config):
|
||||||
if not self.get_channel_crawler_image(config_in.crawlerChannel):
|
config_in.browserWindows = 1
|
||||||
raise HTTPException(status_code=404, detail="crawler_not_found")
|
|
||||||
|
|
||||||
profileid = None
|
profileid = None
|
||||||
if isinstance(config_in.profileid, UUID):
|
if isinstance(config_in.profileid, UUID):
|
||||||
@ -321,6 +325,19 @@ class CrawlConfigOps:
|
|||||||
execMinutesQuotaReached=exec_mins_quota_reached,
|
execMinutesQuotaReached=exec_mins_quota_reached,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
def is_single_page(self, config: RawCrawlConfig):
|
||||||
|
"""return true if this config represents a single page crawl"""
|
||||||
|
if not config.seeds or len(config.seeds) != 1:
|
||||||
|
return False
|
||||||
|
|
||||||
|
if config.limit == 1:
|
||||||
|
return True
|
||||||
|
|
||||||
|
extra_hops = config.seeds[0].extraHops or config.extraHops
|
||||||
|
scope_type = config.seeds[0].scopeType or config.scopeType
|
||||||
|
|
||||||
|
return extra_hops == 0 and scope_type == "page"
|
||||||
|
|
||||||
def _validate_link_selectors(self, link_selectors: List[str]):
|
def _validate_link_selectors(self, link_selectors: List[str]):
|
||||||
"""Validate link selectors
|
"""Validate link selectors
|
||||||
|
|
||||||
@ -435,6 +452,10 @@ class CrawlConfigOps:
|
|||||||
if update.config and update.config.lang:
|
if update.config and update.config.lang:
|
||||||
validate_language_code(update.config.lang)
|
validate_language_code(update.config.lang)
|
||||||
|
|
||||||
|
if update.config or update.browserWindows:
|
||||||
|
if self.is_single_page(update.config or orig_crawl_config.config):
|
||||||
|
update.browserWindows = 1
|
||||||
|
|
||||||
# indicates if any k8s crawl config settings changed
|
# indicates if any k8s crawl config settings changed
|
||||||
changed = False
|
changed = False
|
||||||
changed = changed or (
|
changed = changed or (
|
||||||
@ -1021,6 +1042,7 @@ class CrawlConfigOps:
|
|||||||
warc_prefix=self.get_warc_prefix(org, crawlconfig),
|
warc_prefix=self.get_warc_prefix(org, crawlconfig),
|
||||||
storage_filename=storage_filename,
|
storage_filename=storage_filename,
|
||||||
profile_filename=profile_filename or "",
|
profile_filename=profile_filename or "",
|
||||||
|
is_single_page=self.is_single_page(crawlconfig.config),
|
||||||
)
|
)
|
||||||
await self.add_new_crawl(crawl_id, crawlconfig, user, org, manual=True)
|
await self.add_new_crawl(crawl_id, crawlconfig, user, org, manual=True)
|
||||||
return crawl_id
|
return crawl_id
|
||||||
|
@ -220,6 +220,7 @@ class CrawlManager(K8sAPI):
|
|||||||
warc_prefix: str,
|
warc_prefix: str,
|
||||||
storage_filename: str,
|
storage_filename: str,
|
||||||
profile_filename: str,
|
profile_filename: str,
|
||||||
|
is_single_page: bool,
|
||||||
) -> str:
|
) -> str:
|
||||||
"""create new crawl job from config"""
|
"""create new crawl job from config"""
|
||||||
cid = str(crawlconfig.id)
|
cid = str(crawlconfig.id)
|
||||||
@ -244,6 +245,7 @@ class CrawlManager(K8sAPI):
|
|||||||
storage_filename=storage_filename,
|
storage_filename=storage_filename,
|
||||||
profile_filename=profile_filename,
|
profile_filename=profile_filename,
|
||||||
proxy_id=crawlconfig.proxyId or DEFAULT_PROXY_ID,
|
proxy_id=crawlconfig.proxyId or DEFAULT_PROXY_ID,
|
||||||
|
is_single_page=is_single_page,
|
||||||
)
|
)
|
||||||
|
|
||||||
async def reload_running_crawl_config(self, crawl_id: str):
|
async def reload_running_crawl_config(self, crawl_id: str):
|
||||||
|
@ -95,6 +95,7 @@ class K8sAPI:
|
|||||||
profile_filename: str = "",
|
profile_filename: str = "",
|
||||||
qa_source: str = "",
|
qa_source: str = "",
|
||||||
proxy_id: str = "",
|
proxy_id: str = "",
|
||||||
|
is_single_page: bool = False,
|
||||||
):
|
):
|
||||||
"""load job template from yaml"""
|
"""load job template from yaml"""
|
||||||
if not crawl_id:
|
if not crawl_id:
|
||||||
@ -119,6 +120,7 @@ class K8sAPI:
|
|||||||
"profile_filename": profile_filename,
|
"profile_filename": profile_filename,
|
||||||
"qa_source": qa_source,
|
"qa_source": qa_source,
|
||||||
"proxy_id": proxy_id,
|
"proxy_id": proxy_id,
|
||||||
|
"is_single_page": "1" if is_single_page else "0",
|
||||||
}
|
}
|
||||||
|
|
||||||
data = self.templates.env.get_template("crawl_job.yaml").render(params)
|
data = self.templates.env.get_template("crawl_job.yaml").render(params)
|
||||||
@ -142,6 +144,7 @@ class K8sAPI:
|
|||||||
profile_filename: str = "",
|
profile_filename: str = "",
|
||||||
qa_source: str = "",
|
qa_source: str = "",
|
||||||
proxy_id: str = "",
|
proxy_id: str = "",
|
||||||
|
is_single_page: bool = False,
|
||||||
) -> str:
|
) -> str:
|
||||||
"""load and init crawl job via k8s api"""
|
"""load and init crawl job via k8s api"""
|
||||||
crawl_id, data = self.new_crawl_job_yaml(
|
crawl_id, data = self.new_crawl_job_yaml(
|
||||||
@ -161,6 +164,7 @@ class K8sAPI:
|
|||||||
profile_filename=profile_filename,
|
profile_filename=profile_filename,
|
||||||
qa_source=qa_source,
|
qa_source=qa_source,
|
||||||
proxy_id=proxy_id,
|
proxy_id=proxy_id,
|
||||||
|
is_single_page=is_single_page,
|
||||||
)
|
)
|
||||||
|
|
||||||
# create job directly
|
# create job directly
|
||||||
|
@ -101,6 +101,8 @@ class CrawlOperator(BaseOperator):
|
|||||||
|
|
||||||
paused_expires_delta: timedelta
|
paused_expires_delta: timedelta
|
||||||
|
|
||||||
|
num_browsers_per_pod: int
|
||||||
|
|
||||||
def __init__(self, *args):
|
def __init__(self, *args):
|
||||||
super().__init__(*args)
|
super().__init__(*args)
|
||||||
|
|
||||||
@ -125,6 +127,8 @@ class CrawlOperator(BaseOperator):
|
|||||||
|
|
||||||
self.paused_expires_delta = timedelta(minutes=paused_crawl_limit_minutes)
|
self.paused_expires_delta = timedelta(minutes=paused_crawl_limit_minutes)
|
||||||
|
|
||||||
|
self.num_browsers_per_pod = int(os.environ.get("NUM_BROWSERS", 1))
|
||||||
|
|
||||||
def init_routes(self, app):
|
def init_routes(self, app):
|
||||||
"""init routes for this operator"""
|
"""init routes for this operator"""
|
||||||
|
|
||||||
@ -181,6 +185,7 @@ class CrawlOperator(BaseOperator):
|
|||||||
max_crawl_size=int(spec.get("maxCrawlSize") or 0),
|
max_crawl_size=int(spec.get("maxCrawlSize") or 0),
|
||||||
scheduled=spec.get("manual") != "1",
|
scheduled=spec.get("manual") != "1",
|
||||||
qa_source_crawl_id=spec.get("qaSourceCrawlId"),
|
qa_source_crawl_id=spec.get("qaSourceCrawlId"),
|
||||||
|
is_single_page=spec.get("isSinglePage") == "1",
|
||||||
)
|
)
|
||||||
|
|
||||||
if crawl.qa_source_crawl_id:
|
if crawl.qa_source_crawl_id:
|
||||||
@ -301,7 +306,7 @@ class CrawlOperator(BaseOperator):
|
|||||||
status.stopReason = stop_reason
|
status.stopReason = stop_reason
|
||||||
await self.mark_finished(crawl, status, state)
|
await self.mark_finished(crawl, status, state)
|
||||||
|
|
||||||
children = self._load_redis(params, status, data.children)
|
children = self._load_redis(params, status, crawl, data.children)
|
||||||
|
|
||||||
storage_path = crawl.storage.get_storage_extra_path(oid)
|
storage_path = crawl.storage.get_storage_extra_path(oid)
|
||||||
storage_secret = crawl.storage.get_storage_secret_name(oid)
|
storage_secret = crawl.storage.get_storage_secret_name(oid)
|
||||||
@ -368,10 +373,8 @@ class CrawlOperator(BaseOperator):
|
|||||||
# crawl_scale is the number of pods to create
|
# crawl_scale is the number of pods to create
|
||||||
crawler_scale = scale_from_browser_windows(crawl.browser_windows)
|
crawler_scale = scale_from_browser_windows(crawl.browser_windows)
|
||||||
|
|
||||||
browsers_per_pod = int(os.environ.get("NUM_BROWSERS", 1))
|
|
||||||
|
|
||||||
for i in range(0, crawler_scale):
|
for i in range(0, crawler_scale):
|
||||||
if status.pagesFound < i * browsers_per_pod:
|
if status.pagesFound < i * self.num_browsers_per_pod:
|
||||||
break
|
break
|
||||||
|
|
||||||
children.extend(
|
children.extend(
|
||||||
@ -392,7 +395,7 @@ class CrawlOperator(BaseOperator):
|
|||||||
"resyncAfterSeconds": status.resync_after,
|
"resyncAfterSeconds": status.resync_after,
|
||||||
}
|
}
|
||||||
|
|
||||||
def _load_redis(self, params, status: CrawlStatus, children):
|
def _load_redis(self, params, status: CrawlStatus, crawl: CrawlSpec, children):
|
||||||
name = f"redis-{params['id']}"
|
name = f"redis-{params['id']}"
|
||||||
has_pod = name in children[POD]
|
has_pod = name in children[POD]
|
||||||
|
|
||||||
@ -400,6 +403,8 @@ class CrawlOperator(BaseOperator):
|
|||||||
params["name"] = name
|
params["name"] = name
|
||||||
params["cpu"] = pod_info.newCpu or params.get("redis_cpu")
|
params["cpu"] = pod_info.newCpu or params.get("redis_cpu")
|
||||||
params["memory"] = pod_info.newMemory or params.get("redis_memory")
|
params["memory"] = pod_info.newMemory or params.get("redis_memory")
|
||||||
|
params["no_pvc"] = crawl.is_single_page
|
||||||
|
|
||||||
restart_reason = None
|
restart_reason = None
|
||||||
if has_pod:
|
if has_pod:
|
||||||
restart_reason = pod_info.should_restart_pod()
|
restart_reason = pod_info.should_restart_pod()
|
||||||
@ -870,7 +875,7 @@ class CrawlOperator(BaseOperator):
|
|||||||
if redis_pod in pods:
|
if redis_pod in pods:
|
||||||
# if has other pods, keep redis pod until they are removed
|
# if has other pods, keep redis pod until they are removed
|
||||||
if len(pods) > 1:
|
if len(pods) > 1:
|
||||||
new_children = self._load_redis(params, status, children)
|
new_children = self._load_redis(params, status, crawl, children)
|
||||||
await self.increment_pod_exec_time(pods, crawl, status)
|
await self.increment_pod_exec_time(pods, crawl, status)
|
||||||
|
|
||||||
# keep pvs until pods are removed
|
# keep pvs until pods are removed
|
||||||
|
@ -140,6 +140,7 @@ class CronJobOperator(BaseOperator):
|
|||||||
storage_filename=self.crawl_config_ops.default_filename_template,
|
storage_filename=self.crawl_config_ops.default_filename_template,
|
||||||
profile_filename=profile_filename or "",
|
profile_filename=profile_filename or "",
|
||||||
proxy_id=crawlconfig.proxyId or "",
|
proxy_id=crawlconfig.proxyId or "",
|
||||||
|
is_single_page=self.crawl_config_ops.is_single_page(crawlconfig.config),
|
||||||
)
|
)
|
||||||
|
|
||||||
return MCDecoratorSyncResponse(attachments=list(yaml.safe_load_all(crawljob)))
|
return MCDecoratorSyncResponse(attachments=list(yaml.safe_load_all(crawljob)))
|
||||||
|
@ -86,6 +86,7 @@ class CrawlSpec(BaseModel):
|
|||||||
max_crawl_size: int = 0
|
max_crawl_size: int = 0
|
||||||
qa_source_crawl_id: Optional[str] = ""
|
qa_source_crawl_id: Optional[str] = ""
|
||||||
proxy_id: Optional[str] = None
|
proxy_id: Optional[str] = None
|
||||||
|
is_single_page: bool = False
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def db_crawl_id(self) -> str:
|
def db_crawl_id(self) -> str:
|
||||||
|
@ -331,7 +331,7 @@ def sample_crawl_data():
|
|||||||
return {
|
return {
|
||||||
"runNow": False,
|
"runNow": False,
|
||||||
"name": "Test Crawl",
|
"name": "Test Crawl",
|
||||||
"config": {"seeds": [{"url": "https://example.com/"}]},
|
"config": {"seeds": [{"url": "https://example.com/"}], "extraHops": 1},
|
||||||
"tags": ["tag1", "tag2"],
|
"tags": ["tag1", "tag2"],
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -6,6 +6,7 @@ from .conftest import API_PREFIX
|
|||||||
|
|
||||||
|
|
||||||
cid = None
|
cid = None
|
||||||
|
cid_single_page = None
|
||||||
UPDATED_NAME = "Updated name"
|
UPDATED_NAME = "Updated name"
|
||||||
UPDATED_DESCRIPTION = "Updated description"
|
UPDATED_DESCRIPTION = "Updated description"
|
||||||
UPDATED_TAGS = ["tag3", "tag4"]
|
UPDATED_TAGS = ["tag3", "tag4"]
|
||||||
@ -67,6 +68,37 @@ def test_verify_default_browser_windows(
|
|||||||
assert data["browserWindows"] == 2
|
assert data["browserWindows"] == 2
|
||||||
|
|
||||||
|
|
||||||
|
def test_add_crawl_config_single_page(
|
||||||
|
crawler_auth_headers, default_org_id, sample_crawl_data
|
||||||
|
):
|
||||||
|
# Create crawl config
|
||||||
|
sample_crawl_data["config"]["limit"] = 1
|
||||||
|
r = requests.post(
|
||||||
|
f"{API_PREFIX}/orgs/{default_org_id}/crawlconfigs/",
|
||||||
|
headers=crawler_auth_headers,
|
||||||
|
json=sample_crawl_data,
|
||||||
|
)
|
||||||
|
assert r.status_code == 200
|
||||||
|
|
||||||
|
data = r.json()
|
||||||
|
global cid_single_page
|
||||||
|
cid_single_page = data["id"]
|
||||||
|
|
||||||
|
|
||||||
|
def test_verify_default_browser_windows_single_page(
|
||||||
|
crawler_auth_headers, default_org_id, sample_crawl_data
|
||||||
|
):
|
||||||
|
r = requests.get(
|
||||||
|
f"{API_PREFIX}/orgs/{default_org_id}/crawlconfigs/{cid_single_page}/",
|
||||||
|
headers=crawler_auth_headers,
|
||||||
|
)
|
||||||
|
assert r.status_code == 200
|
||||||
|
|
||||||
|
data = r.json()
|
||||||
|
assert data.get("scale") is None
|
||||||
|
assert data["browserWindows"] == 1
|
||||||
|
|
||||||
|
|
||||||
def test_custom_browser_windows(
|
def test_custom_browser_windows(
|
||||||
crawler_auth_headers, default_org_id, sample_crawl_data
|
crawler_auth_headers, default_org_id, sample_crawl_data
|
||||||
):
|
):
|
||||||
|
@ -11,8 +11,8 @@ def test_get_config_by_created_by(crawler_auth_headers, default_org_id, crawler_
|
|||||||
f"{API_PREFIX}/orgs/{default_org_id}/crawlconfigs?userid={crawler_userid}",
|
f"{API_PREFIX}/orgs/{default_org_id}/crawlconfigs?userid={crawler_userid}",
|
||||||
headers=crawler_auth_headers,
|
headers=crawler_auth_headers,
|
||||||
)
|
)
|
||||||
assert len(r.json()["items"]) == 7
|
assert len(r.json()["items"]) == 8
|
||||||
assert r.json()["total"] == 7
|
assert r.json()["total"] == 8
|
||||||
|
|
||||||
|
|
||||||
def test_get_config_by_modified_by(
|
def test_get_config_by_modified_by(
|
||||||
@ -23,8 +23,8 @@ def test_get_config_by_modified_by(
|
|||||||
f"{API_PREFIX}/orgs/{default_org_id}/crawlconfigs?modifiedBy={crawler_userid}",
|
f"{API_PREFIX}/orgs/{default_org_id}/crawlconfigs?modifiedBy={crawler_userid}",
|
||||||
headers=crawler_auth_headers,
|
headers=crawler_auth_headers,
|
||||||
)
|
)
|
||||||
assert len(r.json()["items"]) == 7
|
assert len(r.json()["items"]) == 8
|
||||||
assert r.json()["total"] == 7
|
assert r.json()["total"] == 8
|
||||||
|
|
||||||
|
|
||||||
def test_get_configs_by_first_seed(
|
def test_get_configs_by_first_seed(
|
||||||
@ -362,9 +362,9 @@ def test_sort_crawl_configs(
|
|||||||
headers=crawler_auth_headers,
|
headers=crawler_auth_headers,
|
||||||
)
|
)
|
||||||
data = r.json()
|
data = r.json()
|
||||||
assert data["total"] == 13
|
assert data["total"] == 14
|
||||||
items = data["items"]
|
items = data["items"]
|
||||||
assert len(items) == 13
|
assert len(items) == 14
|
||||||
|
|
||||||
last_created = None
|
last_created = None
|
||||||
for config in items:
|
for config in items:
|
||||||
|
@ -39,3 +39,5 @@ spec:
|
|||||||
|
|
||||||
pausedAt: "{{ pausedAt }}"
|
pausedAt: "{{ pausedAt }}"
|
||||||
|
|
||||||
|
isSinglePage: "{{ is_single_page }}"
|
||||||
|
|
||||||
|
@ -1,3 +1,4 @@
|
|||||||
|
{% if not no_pvc %}
|
||||||
# -------
|
# -------
|
||||||
# PVC
|
# PVC
|
||||||
# -------
|
# -------
|
||||||
@ -23,7 +24,7 @@ spec:
|
|||||||
storageClassName: {{ volume_storage_class }}
|
storageClassName: {{ volume_storage_class }}
|
||||||
{% endif %}
|
{% endif %}
|
||||||
|
|
||||||
|
{% endif %}
|
||||||
|
|
||||||
# -------
|
# -------
|
||||||
# CRAWLER
|
# CRAWLER
|
||||||
@ -67,8 +68,12 @@ spec:
|
|||||||
name: qa-replay-{{ qa_source_crawl_id }}
|
name: qa-replay-{{ qa_source_crawl_id }}
|
||||||
{% endif %}
|
{% endif %}
|
||||||
- name: crawl-data
|
- name: crawl-data
|
||||||
|
{% if not no_pvc %}
|
||||||
persistentVolumeClaim:
|
persistentVolumeClaim:
|
||||||
claimName: {{ name }}
|
claimName: {{ name }}
|
||||||
|
{% else %}
|
||||||
|
emptyDir: {}
|
||||||
|
{% endif %}
|
||||||
{% if proxy_id %}
|
{% if proxy_id %}
|
||||||
- name: proxies
|
- name: proxies
|
||||||
secret:
|
secret:
|
||||||
|
@ -1,3 +1,4 @@
|
|||||||
|
{% if not no_pvc %}
|
||||||
# -------
|
# -------
|
||||||
# PVC
|
# PVC
|
||||||
# -------
|
# -------
|
||||||
@ -22,6 +23,7 @@ spec:
|
|||||||
{% if volume_storage_class %}
|
{% if volume_storage_class %}
|
||||||
storageClassName: {{ volume_storage_class }}
|
storageClassName: {{ volume_storage_class }}
|
||||||
{% endif %}
|
{% endif %}
|
||||||
|
{% endif %}
|
||||||
|
|
||||||
# --------
|
# --------
|
||||||
# REDIS
|
# REDIS
|
||||||
@ -51,8 +53,12 @@ spec:
|
|||||||
path: redis.conf
|
path: redis.conf
|
||||||
|
|
||||||
- name: redis-data
|
- name: redis-data
|
||||||
|
{% if not no_pvc %}
|
||||||
persistentVolumeClaim:
|
persistentVolumeClaim:
|
||||||
claimName: {{ name }}
|
claimName: {{ name }}
|
||||||
|
{% else %}
|
||||||
|
emptyDir: {}
|
||||||
|
{% endif %}
|
||||||
|
|
||||||
affinity:
|
affinity:
|
||||||
nodeAffinity:
|
nodeAffinity:
|
||||||
|
@ -277,11 +277,11 @@ crawler_memory_base: 1024Mi
|
|||||||
# number of browser workers per crawler instances
|
# number of browser workers per crawler instances
|
||||||
crawler_browser_instances: 2
|
crawler_browser_instances: 2
|
||||||
|
|
||||||
# number of browser workers per crawler instances for QA runs
|
# number of browser workers per QA pod to run for QA runs
|
||||||
# defaults to 'crawler_browser_instances' if not set
|
# defaults to 'crawler_browser_instances' if not set
|
||||||
# qa_browser_instances: 2
|
qa_browser_instances: 1
|
||||||
|
|
||||||
# fixed scale (number of crawler pods) for QA runs
|
# fixed scale (number of QA pods) to run
|
||||||
qa_scale: 1
|
qa_scale: 1
|
||||||
|
|
||||||
# this value is added to crawler_cpu_base, for each additional browser
|
# this value is added to crawler_cpu_base, for each additional browser
|
||||||
|
Loading…
Reference in New Issue
Block a user