diff --git a/backend/crawlconfigs.py b/backend/crawlconfigs.py index 1caf2df0..7ec91a3f 100644 --- a/backend/crawlconfigs.py +++ b/backend/crawlconfigs.py @@ -104,13 +104,6 @@ class CrawlOps: self.crawl_configs = mdb["crawl_configs"] self.archive_ops = archive_ops self.crawl_manager = crawl_manager - self.default_crawl_params = [ - "--timeout", - "90", - "--logging", - "behaviors,debug", - "--generateWACZ", - ] self.router = APIRouter( prefix="/crawlconfigs", @@ -134,7 +127,6 @@ class CrawlOps: await self.crawl_manager.add_crawl_config( crawlconfig=crawlconfig, storage=archive.storage, - extra_crawl_params=self.default_crawl_params, ) return result diff --git a/backend/crawls.py b/backend/crawls.py index d1562366..1ee55996 100644 --- a/backend/crawls.py +++ b/backend/crawls.py @@ -86,7 +86,7 @@ class CrawlOps: try: await self.crawls.insert_one(crawl.to_dict()) except pymongo.errors.DuplicateKeyError: - print(f"Crawl Already Added: {crawl.id}") + print(f"Crawl Already Added: {crawl.id} - {crawl.state}") return False dura = int((crawl.finished - crawl.started).total_seconds()) diff --git a/backend/dockerman.py b/backend/dockerman.py index e49b1059..49fa6fdd 100644 --- a/backend/dockerman.py +++ b/backend/dockerman.py @@ -1,18 +1,261 @@ -# pylint: skip-file +""" +Docker crawl manager +""" + +import tarfile +import os +import json import asyncio +from datetime import datetime +from io import BytesIO +from tempfile import NamedTemporaryFile +import aiodocker + +from crawls import Crawl + + +# ============================================================================ class DockerManager: - def __init__(self): - pass + """ Docker Crawl Manager Interface""" + + def __init__(self, archive_ops, extra_crawl_params=None): + self.client = aiodocker.Docker() + + self.crawler_image = os.environ["CRAWLER_IMAGE"] + self.default_network = "crawlercloud_default" + + self.archive_ops = archive_ops + self.crawl_ops = None + + self.loop = asyncio.get_running_loop() + self.loop.create_task(self.run_event_loop()) + + self.extra_crawl_params = extra_crawl_params or [] + + async def run_event_loop(self): + """ Run Docker event loop""" + subscriber = self.client.events.subscribe( + filters=json.dumps({"type": ["container"], "label": ["btrix.archive"]}) + ) + + while True: + event = await subscriber.get() + if event is None: + break + + if event["Action"] == "die": + self.loop.create_task(self._handle_container_die(event["Actor"])) + + def set_crawl_ops(self, ops): + """ set crawl ops """ + self.crawl_ops = ops async def add_crawl_config( self, crawlconfig, storage, - extra_crawl_params: list = None, ): - print("add_crawl_config") - print(crawlconfig) - print(storage) - print(extra_crawl_params) + """ Add new crawl config """ + cid = str(crawlconfig.id) + userid = crawlconfig.user + aid = crawlconfig.archive + + labels = { + "btrix.user": userid, + "btrix.archive": aid, + "btrix.crawlconfig": cid, + "btrix.run.schedule": crawlconfig.schedule, + "btrix.run.manual": "1" if crawlconfig.runNow else "0", + "btrix.coll": crawlconfig.config.collection, + } + + # Create Config Volume + volume = await self._create_volume(crawlconfig, labels) + + await self._run_crawl_now(storage, labels, volume, self.extra_crawl_params) + + async def update_crawl_config(self, crawlconfig): + """ Updating not supported for now (labels can not be altered) """ + raise Exception("Unsupported") + + async def run_crawl_config(self, cid): + """ Run crawl job for cron job based on specified crawlconfig id (cid) """ + + volume_name = f"crawl-config-{cid}" + volume_obj = aiodocker.docker.DockerVolume(self.client, volume_name) + + volume_data = await volume_obj.show() + + labels = volume_data["Labels"] + + archive = None + + try: + archive = await self.archive_ops.get_archive_by_id(labels["btrix.archive"]) + storage = archive.storage + # pylint: disable=broad-except + except Exception as exc: + print(exc, flush=True) + return None + + container = await self._run_crawl_now( + storage, labels, volume_name, self.extra_crawl_params + ) + return container["Id"] + + async def validate_crawl_complete(self, crawlcomplete): + """Validate that crawl is valid by checking that container exists and label matches + Return completed crawl object from container""" + + container = await self.client.containers.get(crawlcomplete.id) + + if container["Config"]["Labels"]["btrix.user"] != crawlcomplete.user: + return None + + crawl = self._make_crawl_for_container( + container, + "complete" if crawlcomplete.completed else "partial_complete", + finish_now=True, + filename=crawlcomplete.filename, + size=crawlcomplete.size, + hashstr=crawlcomplete.hash, + ) + + return crawl + + async def delete_crawl_config_by_id(self, cid): + """ Delete Crawl Config by Crawl Config Id""" + await self._delete_volume_by_labels( + filters={"label": [f"btrix.crawlconfig={cid}"]} + ) + + async def delete_crawl_configs_for_archive(self, aid): + """ Delete Crawl Config by Archive Id""" + await self._delete_volume_by_labels(filters={"label": [f"btrix.archive={aid}"]}) + + # ======================================================================== + async def _create_volume(self, crawlconfig, labels): + """ Create new volume to store the crawl config json """ + + name = f"crawl-config-{crawlconfig.id}" + + await self.client.volumes.create({"Name": name, "Labels": labels}) + + await self._add_config_to_volume( + name, "crawl-config.json", crawlconfig.config.dict() + ) + + return name + + async def _add_config_to_volume(self, volume, path, data): + """Add crawl config to volume, requires tar'ing the data, + creating a dummy container and then deleting""" + + data = json.dumps(data).encode("utf-8") + + container = await self.client.containers.create( + { + "Image": "tianon/true", + "Volumes": {volume: {}}, + "HostConfig": {"Binds": [f"{volume}:/tmp/volume"]}, + } + ) + + # make tarball + tarbuff = BytesIO() + + # note: this does not seem to work with in memory buff! (tar is corrupt...) + with NamedTemporaryFile("w+b") as tempbuff: + tempbuff.write(data) + tempbuff.seek(0) + + with tarfile.open(mode="w", fileobj=tarbuff) as tf_fh: + tf_fh.add(name=tempbuff.name, arcname=path, recursive=False) + + tarbuff.seek(0) + + await container.put_archive("/tmp/volume", tarbuff.read()) + + await container.delete() + + async def _delete_volume_by_labels(self, filters): + """ Delete Crawl Configs by specified filter """ + + # pylint: disable=protected-access + resp = await self.client._query_json( + "volumes", method="GET", params={"filters": json.dumps(filters)} + ) + + for volume in resp["Volumes"]: + print(vol_obj, flush=True) + vol_obj = aiodocker.docker.DockerVolume(self.client, volume["Name"]) + await vol_obj.delete() + + async def _run_crawl_now(self, storage, labels, volume, extra_crawl_params=None): + # Set Run Config + command = ["crawl", "--config", "/tmp/crawlconfig/crawl-config.json"] + + if extra_crawl_params: + command += extra_crawl_params + + endpoint_with_coll_url = os.path.join( + storage.endpoint_url, "collections", labels["btrix.coll"] + "/" + ) + + env_vars = [ + f"STORE_USER={labels['btrix.user']}", + f"STORE_ARCHIVE={labels['btrix.archive']}", + f"STORE_ENDPOINT_URL={endpoint_with_coll_url}", + f"STORE_ACCESS_KEY={storage.access_key}", + f"STORE_SECRET_KEY={storage.secret_key}", + "WEBHOOK_URL=http://backend:8000/crawls/done", + ] + + run_config = { + "Image": self.crawler_image, + "Volumes": {volume: {}}, + "Labels": labels, + "Cmd": command, + "Env": env_vars, + "HostConfig": { + "Binds": [f"{volume}:/tmp/crawlconfig"], + "NetworkMode": self.default_network, + }, + } + + return await self.client.containers.run(run_config) + + async def _handle_container_die(self, actor): + """ Handle crawl container shutdown """ + container = await self.client.containers.get(actor["ID"]) + + if actor["Attributes"]["exitCode"] != 0: + crawl = self._make_crawl_for_container(container, "failed", True) + await self.crawl_ops.store_crawl(crawl) + + await container.delete() + + # pylint: disable=no-self-use,too-many-arguments + def _make_crawl_for_container( + self, container, state, finish_now=False, filename=None, size=None, hashstr=None + ): + """ Make a crawl object from a container data""" + labels = container["Config"]["Labels"] + return Crawl( + id=container["Id"], + state=state, + user=labels["btrix.user"], + aid=labels["btrix.archive"], + cid=labels["btrix.crawlconfig"], + schedule=labels["btrix.run.schedule"], + manual=labels["btrix.run.manual"] == "1", + started=datetime.fromisoformat(container["State"]["StartedAt"][:19]), + finished=datetime.utcnow().replace(microsecond=0, tzinfo=None) + if finish_now + else None, + filename=filename, + size=size, + hash=hashstr, + ) diff --git a/backend/k8sman.py b/backend/k8sman.py index 480f3bdf..67a255ed 100644 --- a/backend/k8sman.py +++ b/backend/k8sman.py @@ -22,7 +22,7 @@ class K8SManager: # pylint: disable=too-many-instance-attributes,too-many-locals,too-many-arguments """K8SManager, manager creation of k8s resources from crawl api requests""" - def __init__(self, namespace=DEFAULT_NAMESPACE): + def __init__(self, extra_crawl_params=None, namespace=DEFAULT_NAMESPACE): config.load_incluster_config() self.crawl_ops = None @@ -33,6 +33,7 @@ class K8SManager: self.batch_beta_api = client.BatchV1beta1Api() self.namespace = namespace + self.extra_crawl_params = extra_crawl_params or [] self.crawler_image = os.environ.get("CRAWLER_IMAGE") self.crawler_image_pull_policy = "IfNotPresent" @@ -86,7 +87,6 @@ class K8SManager: self, crawlconfig, storage, - extra_crawl_params: list = None, ): """add new crawl as cron job, store crawl config in configmap""" cid = str(crawlconfig.id) @@ -137,10 +137,8 @@ class K8SManager: suspend, schedule, run_now = self._get_schedule_suspend_run_now(crawlconfig) - extra_crawl_params = extra_crawl_params or [] - job_template = self._get_job_template( - cid, labels, annotations, crawlconfig.crawlTimeout, extra_crawl_params + cid, labels, annotations, crawlconfig.crawlTimeout, self.extra_crawl_params ) spec = client.V1beta1CronJobSpec( @@ -286,19 +284,13 @@ class K8SManager: if not manual: await self._delete_job(job.metadata.name) - return Crawl( - id=crawlcomplete.id, - state="complete" if crawlcomplete.completed else "partial_complete", - user=crawlcomplete.user, - aid=job.metadata.labels["btrix.archive"], - cid=job.metadata.labels["btrix.crawlconfig"], - schedule=job.metadata.annotations.get("btrix.run.schedule", ""), - manual=manual, - started=job.status.start_time.replace(tzinfo=None), - finished=datetime.datetime.utcnow().replace(microsecond=0, tzinfo=None), + return self._make_crawl_for_job( + job, + "complete" if crawlcomplete.completed else "partial_complete", + finish_now=True, filename=crawlcomplete.filename, size=crawlcomplete.size, - hash=crawlcomplete.hash, + hashstr=crawlcomplete.hash, ) async def stop_crawl(self, job_name, aid, graceful=True): @@ -362,7 +354,9 @@ class K8SManager: # Internal Methods # pylint: disable=no-self-use - def _make_crawl_for_job(self, job, state, finish_now=False): + def _make_crawl_for_job( + self, job, state, finish_now=False, filename=None, size=None, hashstr=None + ): """ Make a crawl object from a job""" return Crawl( id=job.metadata.name, @@ -376,6 +370,9 @@ class K8SManager: finished=datetime.datetime.utcnow().replace(microsecond=0, tzinfo=None) if finish_now else None, + filename=filename, + size=size, + hash=hashstr, ) async def _delete_job(self, name): diff --git a/backend/main.py b/backend/main.py index 6db70b87..adcde24d 100644 --- a/backend/main.py +++ b/backend/main.py @@ -29,7 +29,7 @@ class BrowsertrixAPI: self.app = _app self.default_storage_endpoint_url = os.environ.get( - "STORE_ENDPOINT_URL", "http://localhost:8010/store-bucket/" + "STORE_ENDPOINT_URL", "http://minio:9000/test-bucket/" ) self.default_storage_access_key = os.environ.get("STORE_ACCESS_KEY", "access") @@ -38,16 +38,13 @@ class BrowsertrixAPI: self.email = EmailSender() self.crawl_manager = None - # pylint: disable=import-outside-toplevel - if os.environ.get("KUBERNETES_SERVICE_HOST"): - from k8sman import K8SManager - - self.crawl_manager = K8SManager() - else: - from dockerman import DockerManager - - self.crawl_manager = DockerManager() - # raise Exception("Currently, only running in Kubernetes is supported") + self.default_crawl_params = [ + "--timeout", + "90", + "--logging", + "behaviors,debug", + "--generateWACZ", + ] self.mdb = init_db() @@ -65,6 +62,19 @@ class BrowsertrixAPI: self.app, self.mdb, self.fastapi_users, self.email, current_active_user ) + # pylint: disable=import-outside-toplevel + if os.environ.get("KUBERNETES_SERVICE_HOST"): + from k8sman import K8SManager + + self.crawl_manager = K8SManager(self.default_crawl_params) + else: + from dockerman import DockerManager + + self.crawl_manager = DockerManager( + self.archive_ops, self.default_crawl_params + ) + # raise Exception("Currently, only running in Kubernetes is supported") + self.crawl_config_ops = init_crawl_config_api( self.mdb, current_active_user, diff --git a/backend/requirements.txt b/backend/requirements.txt index 0a58fc3e..63daf7fc 100644 --- a/backend/requirements.txt +++ b/backend/requirements.txt @@ -3,3 +3,4 @@ fastapi-users[mongodb]==6.0.0 loguru aiofiles kubernetes-asyncio +aiodocker diff --git a/docker-compose.yml b/docker-compose.yml index a9dddecb..34cb0692 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -7,26 +7,50 @@ services: ports: - 8000:8000 + volumes: + - /var/run/docker.sock:/var/run/docker.sock + env_file: - ./config.env + depends_on: + - mongo + - minio + mongo: image: mongo - env_file: - - ./config.env - volumes: - mongodata:/data/db + env_file: + - ./config.env + minio: image: minio/minio - command: server /data + command: server /data --console-address :9001 ports: - - 8010:9000 + - 9000:9000 + - 9001:9001 volumes: - miniodata:/data + env_file: + - ./config.env + + init_minio_bucket: + image: minio/mc + entrypoint: "/bin/sh" + command: ['-c', 'mc mb local/test-bucket; mc policy set public local/test-bucket' ] + + env_file: + - ./config.env + + depends_on: + - minio + + + volumes: mongodata: miniodata: