use redis based queue instead of url for crawl done webhook

update docker setup to support redis webhook, add consistent CRAWL_ARGS, additional fixes
This commit is contained in:
Ilya Kreymer 2021-10-10 12:18:28 -07:00
parent 4ae4005d74
commit c38e0b7bf7
7 changed files with 71 additions and 33 deletions

View File

@ -204,7 +204,11 @@ def init_crawl_config_api(mdb, user_dep, archive_ops, crawl_manager):
@router.get("") @router.get("")
async def get_crawl_configs(archive: Archive = Depends(archive_crawl_dep)): async def get_crawl_configs(archive: Archive = Depends(archive_crawl_dep)):
results = await ops.get_crawl_configs(archive) results = await ops.get_crawl_configs(archive)
return {"crawl_configs": [res.serialize() for res in results]} return {
"crawl_configs": [
res.serialize(exclude={"archive", "runNow"}) for res in results
]
}
@router.get("/{cid}") @router.get("/{cid}")
async def get_crawl_config(crawl_config: CrawlConfig = Depends(crawls_dep)): async def get_crawl_config(crawl_config: CrawlConfig = Depends(crawls_dep)):

View File

@ -1,6 +1,7 @@
""" Crawl API """ """ Crawl API """
import asyncio import asyncio
import json
from typing import Optional, List, Dict from typing import Optional, List, Dict
from datetime import datetime from datetime import datetime
@ -88,6 +89,7 @@ class CrawlOps:
self.crawl_manager = crawl_manager self.crawl_manager = crawl_manager
self.crawl_configs = crawl_configs self.crawl_configs = crawl_configs
self.archives = archives self.archives = archives
self.crawls_done_key = "crawls-done"
self.redis = None self.redis = None
asyncio.create_task(self.init_redis(redis_url)) asyncio.create_task(self.init_redis(redis_url))
@ -96,10 +98,29 @@ class CrawlOps:
async def init_redis(self, redis_url): async def init_redis(self, redis_url):
""" init redis async """ """ init redis async """
self.redis = await aioredis.from_url(redis_url) self.redis = await aioredis.from_url(
redis_url, encoding="utf-8", decode_responses=True
)
loop = asyncio.get_running_loop()
loop.create_task(self.run_crawl_complete_loop())
async def run_crawl_complete_loop(self):
""" Wait for any crawls done from redis queue """
while True:
try:
_, value = await self.redis.blpop(self.crawls_done_key, timeout=0)
value = json.loads(value)
await self.on_handle_crawl_complete(CrawlCompleteIn(**value))
# pylint: disable=broad-except
except Exception as exc:
print(f"Retrying crawls done loop: {exc}")
await asyncio.sleep(10)
async def on_handle_crawl_complete(self, msg: CrawlCompleteIn): async def on_handle_crawl_complete(self, msg: CrawlCompleteIn):
""" Handle completed crawl, add to crawls db collection, also update archive usage """ """ Handle completed crawl, add to crawls db collection, also update archive usage """
print(msg, flush=True)
crawl, crawl_file = await self.crawl_manager.process_crawl_complete(msg) crawl, crawl_file = await self.crawl_manager.process_crawl_complete(msg)
if not crawl: if not crawl:
print("Not a valid crawl complete msg!", flush=True) print("Not a valid crawl complete msg!", flush=True)
@ -205,13 +226,6 @@ def init_crawls_api(app, mdb, redis_url, crawl_manager, crawl_config_ops, archiv
archive_crawl_dep = archives.archive_crawl_dep archive_crawl_dep = archives.archive_crawl_dep
@app.post("/_crawls/done", tags=["_internal"])
async def crawl_done(msg: CrawlCompleteIn):
loop = asyncio.get_running_loop()
loop.create_task(ops.on_handle_crawl_complete(msg))
return {"success": True}
@app.get("/archives/{aid}/crawls", tags=["crawls"]) @app.get("/archives/{aid}/crawls", tags=["crawls"])
async def list_crawls(archive: Archive = Depends(archive_crawl_dep)): async def list_crawls(archive: Archive = Depends(archive_crawl_dep)):
return await ops.list_crawls(archive.id) return await ops.list_crawls(archive.id)

View File

@ -41,9 +41,11 @@ class BaseMongoModel(BaseModel):
data["id"] = str(data.pop("_id")) data["id"] = str(data.pop("_id"))
return cls(**data) return cls(**data)
def serialize(self): def serialize(self, **opts):
"""convert Archive to dict""" """convert Archive to dict"""
return self.dict(exclude_unset=True, exclude_defaults=True, exclude_none=True) return self.dict(
exclude_unset=True, exclude_defaults=True, exclude_none=True, **opts
)
def to_dict(self, **opts): def to_dict(self, **opts):
"""convert to dict for mongo""" """convert to dict for mongo"""

View File

@ -19,7 +19,7 @@ from scheduler import run_scheduler
from archives import S3Storage from archives import S3Storage
from crawls import Crawl from crawls import Crawl, CrawlFile
# ============================================================================ # ============================================================================
@ -33,6 +33,11 @@ class DockerManager:
self.crawler_image = os.environ["CRAWLER_IMAGE"] self.crawler_image = os.environ["CRAWLER_IMAGE"]
self.default_network = os.environ.get("CRAWLER_NETWORK", "btrix-cloud-net") self.default_network = os.environ.get("CRAWLER_NETWORK", "btrix-cloud-net")
self.redis_url = os.environ["REDIS_URL"]
self.crawls_done_key = "crawls-done"
self.crawl_args = os.environ["CRAWL_ARGS"]
self.archive_ops = archive_ops self.archive_ops = archive_ops
self.crawl_ops = None self.crawl_ops = None
@ -44,7 +49,7 @@ class DockerManager:
name="default", name="default",
access_key=os.environ["STORE_ACCESS_KEY"], access_key=os.environ["STORE_ACCESS_KEY"],
secret_key=os.environ["STORE_SECRET_KEY"], secret_key=os.environ["STORE_SECRET_KEY"],
endpont_url=os.environ["STORE_ENDPOINT_URL"], endpoint_url=os.environ["STORE_ENDPOINT_URL"],
) )
} }
@ -137,7 +142,7 @@ class DockerManager:
# pylint: disable=no-else-return # pylint: disable=no-else-return
if storage.type == "default": if storage.type == "default":
return self.storages[storage], storage.path return self.storages[storage.name], storage.path
else: else:
return storage, "" return storage, ""
@ -169,7 +174,7 @@ class DockerManager:
"btrix.user": userid, "btrix.user": userid,
"btrix.archive": aid, "btrix.archive": aid,
"btrix.crawlconfig": cid, "btrix.crawlconfig": cid,
"btrix.coll": crawlconfig.config.collection, "btrix.tag.coll": crawlconfig.config.collection,
} }
if crawlconfig.crawlTimeout: if crawlconfig.crawlTimeout:
@ -186,13 +191,15 @@ class DockerManager:
) )
if crawlconfig.runNow: if crawlconfig.runNow:
await self._run_crawl_now( return await self._run_crawl_now(
storage, storage,
storage_path, storage_path,
labels, labels,
volume, volume,
) )
return ""
async def update_crawl_schedule(self, cid, schedule): async def update_crawl_schedule(self, cid, schedule):
""" Update the schedule for existing crawl config """ """ Update the schedule for existing crawl config """
@ -272,10 +279,9 @@ class DockerManager:
print(exc, flush=True) print(exc, flush=True)
return None return None
container = await self._run_crawl_now( return await self._run_crawl_now(
storage, storage_path, labels, volume_name, schedule, manual storage, storage_path, labels, volume_name, schedule, manual
) )
return container["id"][:12]
async def process_crawl_complete(self, crawlcomplete): async def process_crawl_complete(self, crawlcomplete):
"""Validate that crawl is valid by checking that container exists and label matches """Validate that crawl is valid by checking that container exists and label matches
@ -290,12 +296,15 @@ class DockerManager:
container, container,
"complete" if crawlcomplete.completed else "partial_complete", "complete" if crawlcomplete.completed else "partial_complete",
finish_now=True, finish_now=True,
filename=crawlcomplete.filename,
size=crawlcomplete.size,
hashstr=crawlcomplete.hash,
) )
return crawl crawl_file = CrawlFile(
filename=crawlcomplete.filename,
size=crawlcomplete.size,
hash=crawlcomplete.hash,
)
return crawl, crawl_file
async def scale_crawl(self): # job_name, aid, parallelism=1): async def scale_crawl(self): # job_name, aid, parallelism=1):
""" Scale running crawl, currently only supported in k8s""" """ Scale running crawl, currently only supported in k8s"""
@ -394,7 +403,7 @@ class DockerManager:
"--config", "--config",
"/tmp/crawlconfig/crawl-config.json", "/tmp/crawlconfig/crawl-config.json",
"--redisStoreUrl", "--redisStoreUrl",
"redis://redis:6379/0", self.redis_url,
] ]
if self.extra_crawl_params: if self.extra_crawl_params:
@ -411,7 +420,8 @@ class DockerManager:
f"STORE_ACCESS_KEY={storage.access_key}", f"STORE_ACCESS_KEY={storage.access_key}",
f"STORE_SECRET_KEY={storage.secret_key}", f"STORE_SECRET_KEY={storage.secret_key}",
f"STORE_PATH={storage_path}", f"STORE_PATH={storage_path}",
"WEBHOOK_URL=http://backend:8000/_crawls/done", f"WEBHOOK_URL={self.redis_url}/{self.crawls_done_key}",
f"CRAWL_ARGS={self.crawl_args}",
] ]
labels["btrix.run.schedule"] = schedule labels["btrix.run.schedule"] = schedule
@ -429,7 +439,8 @@ class DockerManager:
}, },
} }
return await self.client.containers.run(run_config) container = await self.client.containers.run(run_config)
return container["id"]
async def _list_running_containers(self, labels): async def _list_running_containers(self, labels):
results = await self.client.containers.list( results = await self.client.containers.list(
@ -454,12 +465,15 @@ class DockerManager:
await container.delete() await container.delete()
# pylint: disable=no-self-use,too-many-arguments # pylint: disable=no-self-use,too-many-arguments
def _make_crawl_for_container( def _make_crawl_for_container(self, container, state, finish_now=False):
self, container, state, finish_now=False, filename=None, size=None, hashstr=None
):
""" Make a crawl object from a container data""" """ Make a crawl object from a container data"""
labels = container["Config"]["Labels"] labels = container["Config"]["Labels"]
tags = {}
for name in labels:
if name.startswith("btrix.tag."):
tags[name[len("btrix.tag.") :]] = labels.get(name)
return Crawl( return Crawl(
id=container["Id"], id=container["Id"],
state=state, state=state,
@ -472,7 +486,5 @@ class DockerManager:
finished=datetime.utcnow().replace(microsecond=0, tzinfo=None) finished=datetime.utcnow().replace(microsecond=0, tzinfo=None)
if finish_now if finish_now
else None, else None,
filename=filename, tags=tags,
size=size,
hash=hashstr,
) )

View File

@ -16,6 +16,7 @@ data:
REDIS_URL: "{{ .Values.redis_url }}" REDIS_URL: "{{ .Values.redis_url }}"
REDIS_CRAWLS_DONE_KEY: "crawls-done"
--- ---
apiVersion: v1 apiVersion: v1
@ -26,7 +27,8 @@ metadata:
data: data:
CRAWL_ARGS: "{{ .Values.crawler_args }} --redisStoreUrl {{ .Values.redis_url }}" CRAWL_ARGS: "{{ .Values.crawler_args }} --redisStoreUrl {{ .Values.redis_url }}"
WEBHOOK_URL: "http://browsertrix-cloud.default/_crawls/done" #WEBHOOK_URL: "http://browsertrix-cloud.default/_crawls/done"
WEBHOOK_URL: "{{ .Values.redis_url }}/crawls-done"
STORE_USER: "" STORE_USER: ""
--- ---

View File

@ -65,7 +65,7 @@ crawler_namespace: "crawlers"
crawl_retries: 1 crawl_retries: 1
# browsertrix-crawler args: # browsertrix-crawler args:
crawler_args: "--timeout 90 --logging stats,behaviors,debug --generateWACZ --screencastPort 9037 --headless" crawler_args: "--timeout 90 --logging stats,behaviors,debug --generateWACZ --screencastPort 9037"

View File

@ -14,6 +14,8 @@ STORE_SECRET_KEY=PASSW0RD
MC_HOST_local=http://ADMIN:PASSW0RD@minio:9000 MC_HOST_local=http://ADMIN:PASSW0RD@minio:9000
REDIS_URL=redis://redis/0
# enable to send verification emails # enable to send verification emails
#EMAIL_SMTP_HOST=smtp.gmail.com #EMAIL_SMTP_HOST=smtp.gmail.com
#EMAIL_SMTP_PORT=587 #EMAIL_SMTP_PORT=587
@ -23,3 +25,5 @@ MC_HOST_local=http://ADMIN:PASSW0RD@minio:9000
# Browsertrix Crawler image to use # Browsertrix Crawler image to use
CRAWLER_IMAGE=webrecorder/browsertrix-crawler CRAWLER_IMAGE=webrecorder/browsertrix-crawler
CRAWL_ARGS="--timeout 90 --logging stats,behaviors,debug --generateWACZ --screencastPort 9037"