browsertrix/backend/btrixcloud/k8s/utils.py
2022-10-19 21:47:34 -07:00

52 lines
1.4 KiB
Python

""" k8s utils """
import os
import yaml
from kubernetes_asyncio.utils import create_from_dict
def get_templates_dir():
"""return directory containing templates for loading"""
return os.path.join(os.path.dirname(__file__), "templates")
async def create_from_yaml(k8s_client, doc, namespace):
"""init k8s objects from yaml"""
yml_document_all = yaml.safe_load_all(doc)
k8s_objects = []
for yml_document in yml_document_all:
created = await create_from_dict(
k8s_client, yml_document, verbose=False, namespace=namespace
)
k8s_objects.append(created)
return k8s_objects
async def send_signal_to_pods(core_api_ws, namespace, pods, signame, func=None):
"""send signal to all pods"""
command = ["kill", "-s", signame, "1"]
signaled = False
try:
for pod in pods:
if func and not func(pod.metadata):
continue
print(f"Sending {signame} to {pod.metadata.name}", flush=True)
await core_api_ws.connect_get_namespaced_pod_exec(
pod.metadata.name,
namespace=namespace,
command=command,
stdout=True,
)
signaled = True
# pylint: disable=broad-except
except Exception as exc:
print(f"Send Signal Error: {exc}", flush=True)
return signaled