storage: support loading default storage from crawl manangers (#126)

support s3-compatible presigning with default storage
backend support for #120
This commit is contained in:
Ilya Kreymer 2022-01-31 11:22:03 -08:00 committed by GitHub
parent 523b557eac
commit f569125a3d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 103 additions and 38 deletions

View File

@ -150,15 +150,6 @@ class DockerManager:
""" set crawl ops """ """ set crawl ops """
self.crawl_ops = ops self.crawl_ops = ops
async def get_storage(self, storage):
""" get storage from existing storage object or reference """
# pylint: disable=no-else-return
if storage.type == "default":
return self.storages[storage.name], storage.path
else:
return storage, ""
async def check_storage(self, storage_name, is_default=False): async def check_storage(self, storage_name, is_default=False):
""" check if storage_name is valid storage """ """ check if storage_name is valid storage """
# if not default, don't validate # if not default, don't validate
@ -168,6 +159,10 @@ class DockerManager:
# if default, ensure name is in default storages list # if default, ensure name is in default storages list
return self.storages[storage_name] return self.storages[storage_name]
async def get_default_storage(self, name):
""" return default storage by name """
return self.storages[name]
async def update_archive_storage(self, aid, userid, storage): async def update_archive_storage(self, aid, userid, storage):
""" No storage kept for docker manager """ """ No storage kept for docker manager """
@ -188,7 +183,7 @@ class DockerManager:
if storage.type == "default": if storage.type == "default":
labels["btrix.def_storage_path"] = storage.path labels["btrix.def_storage_path"] = storage.path
storage, storage_path = await self.get_storage(storage) storage, storage_path = await self._get_storage_and_path(storage)
if crawlconfig.crawlTimeout: if crawlconfig.crawlTimeout:
labels["btrix.timeout"] = str(crawlconfig.crawlTimeout) labels["btrix.timeout"] = str(crawlconfig.crawlTimeout)
@ -304,7 +299,7 @@ class DockerManager:
archive = await self.archive_ops.get_archive_by_id( archive = await self.archive_ops.get_archive_by_id(
uuid.UUID(labels["btrix.archive"]) uuid.UUID(labels["btrix.archive"])
) )
storage, storage_path = await self.get_storage(archive.storage) storage, storage_path = await self._get_storage_and_path(archive.storage)
# pylint: disable=broad-except # pylint: disable=broad-except
except Exception as exc: except Exception as exc:
@ -403,6 +398,17 @@ class DockerManager:
return name return name
async def _get_storage_and_path(self, storage):
"""get storage from existing storage object or reference
return storage and storage_path (for default storage)
"""
# pylint: disable=no-else-return
if storage.type == "default":
return self.storages[storage.name], storage.path
else:
return storage, ""
async def _add_config_to_volume(self, volume, path, data): async def _add_config_to_volume(self, volume, path, data):
"""Add crawl config to volume, requires tar'ing the data, """Add crawl config to volume, requires tar'ing the data,
creating a dummy container and then deleting""" creating a dummy container and then deleting"""

View File

@ -9,6 +9,7 @@ import base64
from kubernetes_asyncio import client, config, watch from kubernetes_asyncio import client, config, watch
from kubernetes_asyncio.stream import WsApiClient from kubernetes_asyncio.stream import WsApiClient
from archives import S3Storage
from crawls import Crawl, CrawlOut, CrawlFile from crawls import Crawl, CrawlOut, CrawlFile
@ -35,7 +36,7 @@ class K8SManager:
self.batch_beta_api = client.BatchV1beta1Api() self.batch_beta_api = client.BatchV1beta1Api()
self.namespace = namespace self.namespace = namespace
self._default_storage_endpoints = {} self._default_storages = {}
self.crawler_image = os.environ["CRAWLER_IMAGE"] self.crawler_image = os.environ["CRAWLER_IMAGE"]
self.crawler_image_pull_policy = os.environ["CRAWLER_PULL_POLICY"] self.crawler_image_pull_policy = os.environ["CRAWLER_PULL_POLICY"]
@ -88,19 +89,11 @@ class K8SManager:
print(exc) print(exc)
# pylint: disable=unused-argument # pylint: disable=unused-argument
async def get_storage(self, storage_name, is_default=False): async def check_storage(self, storage_name, is_default=False):
"""Check if storage_name is valid by checking existing secret """Check if storage is valid by trying to get the storage secret
is_default flag ignored""" Will throw if not valid, otherwise return True"""
try: await self._get_storage_secret(storage_name)
return await self.core_api.read_namespaced_secret( return True
f"storage-{storage_name}",
namespace=self.namespace,
)
except Exception:
# pylint: disable=broad-except,raise-missing-from
raise Exception(f"Storage {storage_name} not found")
return None
async def update_archive_storage(self, aid, userid, storage): async def update_archive_storage(self, aid, userid, storage):
"""Update storage by either creating a per-archive secret, if using custom storage """Update storage by either creating a per-archive secret, if using custom storage
@ -172,7 +165,7 @@ class K8SManager:
"btrix.crawlconfig": cid, "btrix.crawlconfig": cid,
} }
await self.get_storage(storage_name) await self.check_storage(storage_name)
# Create Config Map # Create Config Map
config_map = self._create_config_map(crawlconfig, labels) config_map = self._create_config_map(crawlconfig, labels)
@ -356,13 +349,32 @@ class K8SManager:
async def get_default_storage_access_endpoint(self, name): async def get_default_storage_access_endpoint(self, name):
""" Get access_endpoint for default storage """ """ Get access_endpoint for default storage """
if name not in self._default_storage_endpoints: return (await self.get_default_storage(name)).access_endpoint_url
storage_secret = await self.get_storage(name, is_default=True)
self._default_storage_endpoints[name] = base64.standard_b64decode(
storage_secret.data["STORE_ACCESS_ENDPOINT_URL"]
).decode()
return self._default_storage_endpoints[name] async def get_default_storage(self, name):
""" get default storage """
if name not in self._default_storages:
storage_secret = await self._get_storage_secret(name)
access_endpoint_url = self._secret_data(
storage_secret, "STORE_ACCESS_ENDPOINT_URL"
)
endpoint_url = self._secret_data(storage_secret, "STORE_ENDPOINT_URL")
access_key = self._secret_data(storage_secret, "STORE_ACCESS_KEY")
secret_key = self._secret_data(storage_secret, "STORE_SECRET_KEY")
self._default_storages[name] = S3Storage(
access_key=access_key,
secret_key=secret_key,
endpoint_url=endpoint_url,
access_endpoint_url=access_endpoint_url,
)
return self._default_storages[name]
async def _secret_data(self, storage, name):
""" decode secret storage data """
return base64.standard_b64decode(storage.data[name]).decode()
async def get_running_crawl(self, name, aid): async def get_running_crawl(self, name, aid):
"""Get running crawl (job) with given name, or none """Get running crawl (job) with given name, or none
@ -524,6 +536,20 @@ class K8SManager:
return config_map return config_map
# pylint: disable=unused-argument
async def _get_storage_secret(self, storage_name):
""" Check if storage_name is valid by checking existing secret """
try:
return await self.core_api.read_namespaced_secret(
f"storage-{storage_name}",
namespace=self.namespace,
)
except Exception:
# pylint: disable=broad-except,raise-missing-from
raise Exception(f"Storage {storage_name} not found")
return None
# pylint: disable=no-self-use # pylint: disable=no-self-use
def _get_schedule_suspend_run_now(self, crawlconfig): def _get_schedule_suspend_run_now(self, crawlconfig):
""" get schedule/suspend/run_now data based on crawlconfig """ """ get schedule/suspend/run_now data based on crawlconfig """

View File

@ -3,6 +3,7 @@ Storage API
""" """
from typing import Union from typing import Union
from urllib.parse import urlsplit from urllib.parse import urlsplit
from contextlib import asynccontextmanager
from fastapi import Depends, HTTPException from fastapi import Depends, HTTPException
from aiobotocore.session import get_session from aiobotocore.session import get_session
@ -52,20 +53,19 @@ def init_storages_api(archive_ops, crawl_manager, user_dep):
# ============================================================================ # ============================================================================
async def verify_storage_upload(storage, filename): @asynccontextmanager
""" Test credentials and storage endpoint by uploading an empty test file """ async def get_s3_client(storage):
""" context manager for s3 client"""
if not storage.endpoint_url.endswith("/"): if not storage.endpoint_url.endswith("/"):
storage.endpoint_url += "/" storage.endpoint_url += "/"
session = get_session()
parts = urlsplit(storage.endpoint_url) parts = urlsplit(storage.endpoint_url)
bucket, key = parts.path[1:].split("/", 1) bucket, key = parts.path[1:].split("/", 1)
key += filename
endpoint_url = parts.scheme + "://" + parts.netloc endpoint_url = parts.scheme + "://" + parts.netloc
session = get_session()
async with session.create_client( async with session.create_client(
"s3", "s3",
region_name="", region_name="",
@ -73,6 +73,39 @@ async def verify_storage_upload(storage, filename):
aws_access_key_id=storage.access_key, aws_access_key_id=storage.access_key,
aws_secret_access_key=storage.secret_key, aws_secret_access_key=storage.secret_key,
) as client: ) as client:
yield client, bucket, key
# ============================================================================
async def verify_storage_upload(storage, filename):
""" Test credentials and storage endpoint by uploading an empty test file """
async with get_s3_client(storage) as (client, bucket, key):
key += filename
data = b"" data = b""
resp = await client.put_object(Bucket=bucket, Key=key, Body=data) resp = await client.put_object(Bucket=bucket, Key=key, Body=data)
assert resp["ResponseMetadata"]["HTTPStatusCode"] == 200 assert resp["ResponseMetadata"]["HTTPStatusCode"] == 200
# ============================================================================
async def get_presigned_url(archive, crawlfile, crawl_manager, duration=3600):
""" generate pre-signed url for crawl file """
if crawlfile.def_storage_name:
s3storage = await crawl_manager.get_default_storage(crawlfile.def_storage_name)
elif archive.storage.type == "s3":
s3storage = archive.storage
else:
raise Exception("No Default Storage Found, Invalid Storage Type")
async with get_s3_client(s3storage) as (client, bucket, key):
key += crawlfile.filename
presigned_url = await client.generate_presigned_url(
"get_object", Params={"Bucket": bucket, "Key": key}, ExpiresIn=duration
)
print("presigned_url", presigned_url)
return presigned_url