diff --git a/backend/dockerman.py b/backend/dockerman.py index 2e1027a0..71f65583 100644 --- a/backend/dockerman.py +++ b/backend/dockerman.py @@ -150,15 +150,6 @@ class DockerManager: """ set crawl ops """ self.crawl_ops = ops - async def get_storage(self, storage): - """ get storage from existing storage object or reference """ - - # pylint: disable=no-else-return - if storage.type == "default": - return self.storages[storage.name], storage.path - else: - return storage, "" - async def check_storage(self, storage_name, is_default=False): """ check if storage_name is valid storage """ # if not default, don't validate @@ -168,6 +159,10 @@ class DockerManager: # if default, ensure name is in default storages list return self.storages[storage_name] + async def get_default_storage(self, name): + """ return default storage by name """ + return self.storages[name] + async def update_archive_storage(self, aid, userid, storage): """ No storage kept for docker manager """ @@ -188,7 +183,7 @@ class DockerManager: if storage.type == "default": labels["btrix.def_storage_path"] = storage.path - storage, storage_path = await self.get_storage(storage) + storage, storage_path = await self._get_storage_and_path(storage) if crawlconfig.crawlTimeout: labels["btrix.timeout"] = str(crawlconfig.crawlTimeout) @@ -304,7 +299,7 @@ class DockerManager: archive = await self.archive_ops.get_archive_by_id( uuid.UUID(labels["btrix.archive"]) ) - storage, storage_path = await self.get_storage(archive.storage) + storage, storage_path = await self._get_storage_and_path(archive.storage) # pylint: disable=broad-except except Exception as exc: @@ -403,6 +398,17 @@ class DockerManager: return name + async def _get_storage_and_path(self, storage): + """get storage from existing storage object or reference + return storage and storage_path (for default storage) + """ + + # pylint: disable=no-else-return + if storage.type == "default": + return self.storages[storage.name], storage.path + else: + return storage, "" + async def _add_config_to_volume(self, volume, path, data): """Add crawl config to volume, requires tar'ing the data, creating a dummy container and then deleting""" diff --git a/backend/k8sman.py b/backend/k8sman.py index 5add66c7..9f1db917 100644 --- a/backend/k8sman.py +++ b/backend/k8sman.py @@ -9,6 +9,7 @@ import base64 from kubernetes_asyncio import client, config, watch from kubernetes_asyncio.stream import WsApiClient +from archives import S3Storage from crawls import Crawl, CrawlOut, CrawlFile @@ -35,7 +36,7 @@ class K8SManager: self.batch_beta_api = client.BatchV1beta1Api() self.namespace = namespace - self._default_storage_endpoints = {} + self._default_storages = {} self.crawler_image = os.environ["CRAWLER_IMAGE"] self.crawler_image_pull_policy = os.environ["CRAWLER_PULL_POLICY"] @@ -88,19 +89,11 @@ class K8SManager: print(exc) # pylint: disable=unused-argument - async def get_storage(self, storage_name, is_default=False): - """Check if storage_name is valid by checking existing secret - is_default flag ignored""" - try: - return await self.core_api.read_namespaced_secret( - f"storage-{storage_name}", - namespace=self.namespace, - ) - except Exception: - # pylint: disable=broad-except,raise-missing-from - raise Exception(f"Storage {storage_name} not found") - - return None + async def check_storage(self, storage_name, is_default=False): + """Check if storage is valid by trying to get the storage secret + Will throw if not valid, otherwise return True""" + await self._get_storage_secret(storage_name) + return True async def update_archive_storage(self, aid, userid, storage): """Update storage by either creating a per-archive secret, if using custom storage @@ -172,7 +165,7 @@ class K8SManager: "btrix.crawlconfig": cid, } - await self.get_storage(storage_name) + await self.check_storage(storage_name) # Create Config Map config_map = self._create_config_map(crawlconfig, labels) @@ -356,13 +349,32 @@ class K8SManager: async def get_default_storage_access_endpoint(self, name): """ Get access_endpoint for default storage """ - if name not in self._default_storage_endpoints: - storage_secret = await self.get_storage(name, is_default=True) - self._default_storage_endpoints[name] = base64.standard_b64decode( - storage_secret.data["STORE_ACCESS_ENDPOINT_URL"] - ).decode() + return (await self.get_default_storage(name)).access_endpoint_url - return self._default_storage_endpoints[name] + async def get_default_storage(self, name): + """ get default storage """ + if name not in self._default_storages: + storage_secret = await self._get_storage_secret(name) + + access_endpoint_url = self._secret_data( + storage_secret, "STORE_ACCESS_ENDPOINT_URL" + ) + endpoint_url = self._secret_data(storage_secret, "STORE_ENDPOINT_URL") + access_key = self._secret_data(storage_secret, "STORE_ACCESS_KEY") + secret_key = self._secret_data(storage_secret, "STORE_SECRET_KEY") + + self._default_storages[name] = S3Storage( + access_key=access_key, + secret_key=secret_key, + endpoint_url=endpoint_url, + access_endpoint_url=access_endpoint_url, + ) + + return self._default_storages[name] + + async def _secret_data(self, storage, name): + """ decode secret storage data """ + return base64.standard_b64decode(storage.data[name]).decode() async def get_running_crawl(self, name, aid): """Get running crawl (job) with given name, or none @@ -524,6 +536,20 @@ class K8SManager: return config_map + # pylint: disable=unused-argument + async def _get_storage_secret(self, storage_name): + """ Check if storage_name is valid by checking existing secret """ + try: + return await self.core_api.read_namespaced_secret( + f"storage-{storage_name}", + namespace=self.namespace, + ) + except Exception: + # pylint: disable=broad-except,raise-missing-from + raise Exception(f"Storage {storage_name} not found") + + return None + # pylint: disable=no-self-use def _get_schedule_suspend_run_now(self, crawlconfig): """ get schedule/suspend/run_now data based on crawlconfig """ diff --git a/backend/storages.py b/backend/storages.py index 626904e8..e62a0c21 100644 --- a/backend/storages.py +++ b/backend/storages.py @@ -3,6 +3,7 @@ Storage API """ from typing import Union from urllib.parse import urlsplit +from contextlib import asynccontextmanager from fastapi import Depends, HTTPException from aiobotocore.session import get_session @@ -52,20 +53,19 @@ def init_storages_api(archive_ops, crawl_manager, user_dep): # ============================================================================ -async def verify_storage_upload(storage, filename): - """ Test credentials and storage endpoint by uploading an empty test file """ +@asynccontextmanager +async def get_s3_client(storage): + """ context manager for s3 client""" if not storage.endpoint_url.endswith("/"): storage.endpoint_url += "/" - session = get_session() - parts = urlsplit(storage.endpoint_url) - bucket, key = parts.path[1:].split("/", 1) - key += filename endpoint_url = parts.scheme + "://" + parts.netloc + session = get_session() + async with session.create_client( "s3", region_name="", @@ -73,6 +73,39 @@ async def verify_storage_upload(storage, filename): aws_access_key_id=storage.access_key, aws_secret_access_key=storage.secret_key, ) as client: + yield client, bucket, key + + +# ============================================================================ +async def verify_storage_upload(storage, filename): + """ Test credentials and storage endpoint by uploading an empty test file """ + + async with get_s3_client(storage) as (client, bucket, key): + key += filename data = b"" + resp = await client.put_object(Bucket=bucket, Key=key, Body=data) assert resp["ResponseMetadata"]["HTTPStatusCode"] == 200 + + +# ============================================================================ +async def get_presigned_url(archive, crawlfile, crawl_manager, duration=3600): + """ generate pre-signed url for crawl file """ + if crawlfile.def_storage_name: + s3storage = await crawl_manager.get_default_storage(crawlfile.def_storage_name) + + elif archive.storage.type == "s3": + s3storage = archive.storage + + else: + raise Exception("No Default Storage Found, Invalid Storage Type") + + async with get_s3_client(s3storage) as (client, bucket, key): + key += crawlfile.filename + + presigned_url = await client.generate_presigned_url( + "get_object", Params={"Bucket": bucket, "Key": key}, ExpiresIn=duration + ) + + print("presigned_url", presigned_url) + return presigned_url