dockerman: initial pass

- support for creating, deleting crawlconfigs, running crawls on-demand
- config stored in volume
- list to docker events and clean up containers when they exit
This commit is contained in:
Ilya Kreymer 2021-08-24 22:49:06 -07:00
parent 20b19f932f
commit 91e9fc8699
7 changed files with 317 additions and 50 deletions

View File

@ -104,13 +104,6 @@ class CrawlOps:
self.crawl_configs = mdb["crawl_configs"] self.crawl_configs = mdb["crawl_configs"]
self.archive_ops = archive_ops self.archive_ops = archive_ops
self.crawl_manager = crawl_manager self.crawl_manager = crawl_manager
self.default_crawl_params = [
"--timeout",
"90",
"--logging",
"behaviors,debug",
"--generateWACZ",
]
self.router = APIRouter( self.router = APIRouter(
prefix="/crawlconfigs", prefix="/crawlconfigs",
@ -134,7 +127,6 @@ class CrawlOps:
await self.crawl_manager.add_crawl_config( await self.crawl_manager.add_crawl_config(
crawlconfig=crawlconfig, crawlconfig=crawlconfig,
storage=archive.storage, storage=archive.storage,
extra_crawl_params=self.default_crawl_params,
) )
return result return result

View File

@ -86,7 +86,7 @@ class CrawlOps:
try: try:
await self.crawls.insert_one(crawl.to_dict()) await self.crawls.insert_one(crawl.to_dict())
except pymongo.errors.DuplicateKeyError: except pymongo.errors.DuplicateKeyError:
print(f"Crawl Already Added: {crawl.id}") print(f"Crawl Already Added: {crawl.id} - {crawl.state}")
return False return False
dura = int((crawl.finished - crawl.started).total_seconds()) dura = int((crawl.finished - crawl.started).total_seconds())

View File

@ -1,18 +1,261 @@
# pylint: skip-file """
Docker crawl manager
"""
import tarfile
import os
import json
import asyncio import asyncio
from datetime import datetime
from io import BytesIO
from tempfile import NamedTemporaryFile
import aiodocker
from crawls import Crawl
# ============================================================================
class DockerManager: class DockerManager:
def __init__(self): """ Docker Crawl Manager Interface"""
pass
def __init__(self, archive_ops, extra_crawl_params=None):
self.client = aiodocker.Docker()
self.crawler_image = os.environ["CRAWLER_IMAGE"]
self.default_network = "crawlercloud_default"
self.archive_ops = archive_ops
self.crawl_ops = None
self.loop = asyncio.get_running_loop()
self.loop.create_task(self.run_event_loop())
self.extra_crawl_params = extra_crawl_params or []
async def run_event_loop(self):
""" Run Docker event loop"""
subscriber = self.client.events.subscribe(
filters=json.dumps({"type": ["container"], "label": ["btrix.archive"]})
)
while True:
event = await subscriber.get()
if event is None:
break
if event["Action"] == "die":
self.loop.create_task(self._handle_container_die(event["Actor"]))
def set_crawl_ops(self, ops):
""" set crawl ops """
self.crawl_ops = ops
async def add_crawl_config( async def add_crawl_config(
self, self,
crawlconfig, crawlconfig,
storage, storage,
extra_crawl_params: list = None,
): ):
print("add_crawl_config") """ Add new crawl config """
print(crawlconfig) cid = str(crawlconfig.id)
print(storage) userid = crawlconfig.user
print(extra_crawl_params) aid = crawlconfig.archive
labels = {
"btrix.user": userid,
"btrix.archive": aid,
"btrix.crawlconfig": cid,
"btrix.run.schedule": crawlconfig.schedule,
"btrix.run.manual": "1" if crawlconfig.runNow else "0",
"btrix.coll": crawlconfig.config.collection,
}
# Create Config Volume
volume = await self._create_volume(crawlconfig, labels)
await self._run_crawl_now(storage, labels, volume, self.extra_crawl_params)
async def update_crawl_config(self, crawlconfig):
""" Updating not supported for now (labels can not be altered) """
raise Exception("Unsupported")
async def run_crawl_config(self, cid):
""" Run crawl job for cron job based on specified crawlconfig id (cid) """
volume_name = f"crawl-config-{cid}"
volume_obj = aiodocker.docker.DockerVolume(self.client, volume_name)
volume_data = await volume_obj.show()
labels = volume_data["Labels"]
archive = None
try:
archive = await self.archive_ops.get_archive_by_id(labels["btrix.archive"])
storage = archive.storage
# pylint: disable=broad-except
except Exception as exc:
print(exc, flush=True)
return None
container = await self._run_crawl_now(
storage, labels, volume_name, self.extra_crawl_params
)
return container["Id"]
async def validate_crawl_complete(self, crawlcomplete):
"""Validate that crawl is valid by checking that container exists and label matches
Return completed crawl object from container"""
container = await self.client.containers.get(crawlcomplete.id)
if container["Config"]["Labels"]["btrix.user"] != crawlcomplete.user:
return None
crawl = self._make_crawl_for_container(
container,
"complete" if crawlcomplete.completed else "partial_complete",
finish_now=True,
filename=crawlcomplete.filename,
size=crawlcomplete.size,
hashstr=crawlcomplete.hash,
)
return crawl
async def delete_crawl_config_by_id(self, cid):
""" Delete Crawl Config by Crawl Config Id"""
await self._delete_volume_by_labels(
filters={"label": [f"btrix.crawlconfig={cid}"]}
)
async def delete_crawl_configs_for_archive(self, aid):
""" Delete Crawl Config by Archive Id"""
await self._delete_volume_by_labels(filters={"label": [f"btrix.archive={aid}"]})
# ========================================================================
async def _create_volume(self, crawlconfig, labels):
""" Create new volume to store the crawl config json """
name = f"crawl-config-{crawlconfig.id}"
await self.client.volumes.create({"Name": name, "Labels": labels})
await self._add_config_to_volume(
name, "crawl-config.json", crawlconfig.config.dict()
)
return name
async def _add_config_to_volume(self, volume, path, data):
"""Add crawl config to volume, requires tar'ing the data,
creating a dummy container and then deleting"""
data = json.dumps(data).encode("utf-8")
container = await self.client.containers.create(
{
"Image": "tianon/true",
"Volumes": {volume: {}},
"HostConfig": {"Binds": [f"{volume}:/tmp/volume"]},
}
)
# make tarball
tarbuff = BytesIO()
# note: this does not seem to work with in memory buff! (tar is corrupt...)
with NamedTemporaryFile("w+b") as tempbuff:
tempbuff.write(data)
tempbuff.seek(0)
with tarfile.open(mode="w", fileobj=tarbuff) as tf_fh:
tf_fh.add(name=tempbuff.name, arcname=path, recursive=False)
tarbuff.seek(0)
await container.put_archive("/tmp/volume", tarbuff.read())
await container.delete()
async def _delete_volume_by_labels(self, filters):
""" Delete Crawl Configs by specified filter """
# pylint: disable=protected-access
resp = await self.client._query_json(
"volumes", method="GET", params={"filters": json.dumps(filters)}
)
for volume in resp["Volumes"]:
print(vol_obj, flush=True)
vol_obj = aiodocker.docker.DockerVolume(self.client, volume["Name"])
await vol_obj.delete()
async def _run_crawl_now(self, storage, labels, volume, extra_crawl_params=None):
# Set Run Config
command = ["crawl", "--config", "/tmp/crawlconfig/crawl-config.json"]
if extra_crawl_params:
command += extra_crawl_params
endpoint_with_coll_url = os.path.join(
storage.endpoint_url, "collections", labels["btrix.coll"] + "/"
)
env_vars = [
f"STORE_USER={labels['btrix.user']}",
f"STORE_ARCHIVE={labels['btrix.archive']}",
f"STORE_ENDPOINT_URL={endpoint_with_coll_url}",
f"STORE_ACCESS_KEY={storage.access_key}",
f"STORE_SECRET_KEY={storage.secret_key}",
"WEBHOOK_URL=http://backend:8000/crawls/done",
]
run_config = {
"Image": self.crawler_image,
"Volumes": {volume: {}},
"Labels": labels,
"Cmd": command,
"Env": env_vars,
"HostConfig": {
"Binds": [f"{volume}:/tmp/crawlconfig"],
"NetworkMode": self.default_network,
},
}
return await self.client.containers.run(run_config)
async def _handle_container_die(self, actor):
""" Handle crawl container shutdown """
container = await self.client.containers.get(actor["ID"])
if actor["Attributes"]["exitCode"] != 0:
crawl = self._make_crawl_for_container(container, "failed", True)
await self.crawl_ops.store_crawl(crawl)
await container.delete()
# pylint: disable=no-self-use,too-many-arguments
def _make_crawl_for_container(
self, container, state, finish_now=False, filename=None, size=None, hashstr=None
):
""" Make a crawl object from a container data"""
labels = container["Config"]["Labels"]
return Crawl(
id=container["Id"],
state=state,
user=labels["btrix.user"],
aid=labels["btrix.archive"],
cid=labels["btrix.crawlconfig"],
schedule=labels["btrix.run.schedule"],
manual=labels["btrix.run.manual"] == "1",
started=datetime.fromisoformat(container["State"]["StartedAt"][:19]),
finished=datetime.utcnow().replace(microsecond=0, tzinfo=None)
if finish_now
else None,
filename=filename,
size=size,
hash=hashstr,
)

View File

@ -22,7 +22,7 @@ class K8SManager:
# pylint: disable=too-many-instance-attributes,too-many-locals,too-many-arguments # pylint: disable=too-many-instance-attributes,too-many-locals,too-many-arguments
"""K8SManager, manager creation of k8s resources from crawl api requests""" """K8SManager, manager creation of k8s resources from crawl api requests"""
def __init__(self, namespace=DEFAULT_NAMESPACE): def __init__(self, extra_crawl_params=None, namespace=DEFAULT_NAMESPACE):
config.load_incluster_config() config.load_incluster_config()
self.crawl_ops = None self.crawl_ops = None
@ -33,6 +33,7 @@ class K8SManager:
self.batch_beta_api = client.BatchV1beta1Api() self.batch_beta_api = client.BatchV1beta1Api()
self.namespace = namespace self.namespace = namespace
self.extra_crawl_params = extra_crawl_params or []
self.crawler_image = os.environ.get("CRAWLER_IMAGE") self.crawler_image = os.environ.get("CRAWLER_IMAGE")
self.crawler_image_pull_policy = "IfNotPresent" self.crawler_image_pull_policy = "IfNotPresent"
@ -86,7 +87,6 @@ class K8SManager:
self, self,
crawlconfig, crawlconfig,
storage, storage,
extra_crawl_params: list = None,
): ):
"""add new crawl as cron job, store crawl config in configmap""" """add new crawl as cron job, store crawl config in configmap"""
cid = str(crawlconfig.id) cid = str(crawlconfig.id)
@ -137,10 +137,8 @@ class K8SManager:
suspend, schedule, run_now = self._get_schedule_suspend_run_now(crawlconfig) suspend, schedule, run_now = self._get_schedule_suspend_run_now(crawlconfig)
extra_crawl_params = extra_crawl_params or []
job_template = self._get_job_template( job_template = self._get_job_template(
cid, labels, annotations, crawlconfig.crawlTimeout, extra_crawl_params cid, labels, annotations, crawlconfig.crawlTimeout, self.extra_crawl_params
) )
spec = client.V1beta1CronJobSpec( spec = client.V1beta1CronJobSpec(
@ -286,19 +284,13 @@ class K8SManager:
if not manual: if not manual:
await self._delete_job(job.metadata.name) await self._delete_job(job.metadata.name)
return Crawl( return self._make_crawl_for_job(
id=crawlcomplete.id, job,
state="complete" if crawlcomplete.completed else "partial_complete", "complete" if crawlcomplete.completed else "partial_complete",
user=crawlcomplete.user, finish_now=True,
aid=job.metadata.labels["btrix.archive"],
cid=job.metadata.labels["btrix.crawlconfig"],
schedule=job.metadata.annotations.get("btrix.run.schedule", ""),
manual=manual,
started=job.status.start_time.replace(tzinfo=None),
finished=datetime.datetime.utcnow().replace(microsecond=0, tzinfo=None),
filename=crawlcomplete.filename, filename=crawlcomplete.filename,
size=crawlcomplete.size, size=crawlcomplete.size,
hash=crawlcomplete.hash, hashstr=crawlcomplete.hash,
) )
async def stop_crawl(self, job_name, aid, graceful=True): async def stop_crawl(self, job_name, aid, graceful=True):
@ -362,7 +354,9 @@ class K8SManager:
# Internal Methods # Internal Methods
# pylint: disable=no-self-use # pylint: disable=no-self-use
def _make_crawl_for_job(self, job, state, finish_now=False): def _make_crawl_for_job(
self, job, state, finish_now=False, filename=None, size=None, hashstr=None
):
""" Make a crawl object from a job""" """ Make a crawl object from a job"""
return Crawl( return Crawl(
id=job.metadata.name, id=job.metadata.name,
@ -376,6 +370,9 @@ class K8SManager:
finished=datetime.datetime.utcnow().replace(microsecond=0, tzinfo=None) finished=datetime.datetime.utcnow().replace(microsecond=0, tzinfo=None)
if finish_now if finish_now
else None, else None,
filename=filename,
size=size,
hash=hashstr,
) )
async def _delete_job(self, name): async def _delete_job(self, name):

View File

@ -29,7 +29,7 @@ class BrowsertrixAPI:
self.app = _app self.app = _app
self.default_storage_endpoint_url = os.environ.get( self.default_storage_endpoint_url = os.environ.get(
"STORE_ENDPOINT_URL", "http://localhost:8010/store-bucket/" "STORE_ENDPOINT_URL", "http://minio:9000/test-bucket/"
) )
self.default_storage_access_key = os.environ.get("STORE_ACCESS_KEY", "access") self.default_storage_access_key = os.environ.get("STORE_ACCESS_KEY", "access")
@ -38,16 +38,13 @@ class BrowsertrixAPI:
self.email = EmailSender() self.email = EmailSender()
self.crawl_manager = None self.crawl_manager = None
# pylint: disable=import-outside-toplevel self.default_crawl_params = [
if os.environ.get("KUBERNETES_SERVICE_HOST"): "--timeout",
from k8sman import K8SManager "90",
"--logging",
self.crawl_manager = K8SManager() "behaviors,debug",
else: "--generateWACZ",
from dockerman import DockerManager ]
self.crawl_manager = DockerManager()
# raise Exception("Currently, only running in Kubernetes is supported")
self.mdb = init_db() self.mdb = init_db()
@ -65,6 +62,19 @@ class BrowsertrixAPI:
self.app, self.mdb, self.fastapi_users, self.email, current_active_user self.app, self.mdb, self.fastapi_users, self.email, current_active_user
) )
# pylint: disable=import-outside-toplevel
if os.environ.get("KUBERNETES_SERVICE_HOST"):
from k8sman import K8SManager
self.crawl_manager = K8SManager(self.default_crawl_params)
else:
from dockerman import DockerManager
self.crawl_manager = DockerManager(
self.archive_ops, self.default_crawl_params
)
# raise Exception("Currently, only running in Kubernetes is supported")
self.crawl_config_ops = init_crawl_config_api( self.crawl_config_ops = init_crawl_config_api(
self.mdb, self.mdb,
current_active_user, current_active_user,

View File

@ -3,3 +3,4 @@ fastapi-users[mongodb]==6.0.0
loguru loguru
aiofiles aiofiles
kubernetes-asyncio kubernetes-asyncio
aiodocker

View File

@ -7,26 +7,50 @@ services:
ports: ports:
- 8000:8000 - 8000:8000
volumes:
- /var/run/docker.sock:/var/run/docker.sock
env_file: env_file:
- ./config.env - ./config.env
depends_on:
- mongo
- minio
mongo: mongo:
image: mongo image: mongo
env_file:
- ./config.env
volumes: volumes:
- mongodata:/data/db - mongodata:/data/db
env_file:
- ./config.env
minio: minio:
image: minio/minio image: minio/minio
command: server /data command: server /data --console-address :9001
ports: ports:
- 8010:9000 - 9000:9000
- 9001:9001
volumes: volumes:
- miniodata:/data - miniodata:/data
env_file:
- ./config.env
init_minio_bucket:
image: minio/mc
entrypoint: "/bin/sh"
command: ['-c', 'mc mb local/test-bucket; mc policy set public local/test-bucket' ]
env_file:
- ./config.env
depends_on:
- minio
volumes: volumes:
mongodata: mongodata:
miniodata: miniodata: