fix stopping crawls + profiles: (fixes #298) (#309)

- regression fix: ensure correct signals are set to stop crawl (SIGUSER1 + SIGTERM)
- crawl stop: if crawl is still running after 60 seconds, allow signal to be resent
- regression fix: ensure crawling with profile is working in k8s
This commit is contained in:
Ilya Kreymer 2022-09-09 18:31:43 -07:00 committed by GitHub
parent 1216f6cb66
commit 2531a03e41
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 24 additions and 17 deletions

View File

@ -6,6 +6,7 @@ import signal
import os import os
import json import json
import uuid import uuid
import time
from datetime import datetime from datetime import datetime
from abc import ABC, abstractmethod from abc import ABC, abstractmethod
@ -18,6 +19,10 @@ from .db import init_db
from .crawls import Crawl, CrawlFile, CrawlCompleteIn, dt_now from .crawls import Crawl, CrawlFile, CrawlCompleteIn, dt_now
# Seconds before allowing another shutdown attempt
SHUTDOWN_ATTEMPT_WAIT = 60
# ============================================================================= # =============================================================================
# pylint: disable=too-many-instance-attributes,bare-except # pylint: disable=too-many-instance-attributes,bare-except
class CrawlJob(ABC): class CrawlJob(ABC):
@ -30,8 +35,6 @@ class CrawlJob(ABC):
def __init__(self): def __init__(self):
super().__init__() super().__init__()
self.shutdown_pending = False
_, mdb = init_db() _, mdb = init_db()
self.archives = mdb["archives"] self.archives = mdb["archives"]
self.crawls = mdb["crawls"] self.crawls = mdb["crawls"]
@ -59,7 +62,7 @@ class CrawlJob(ABC):
self._cached_params = {} self._cached_params = {}
self._files_added = False self._files_added = False
self._graceful_shutdown_pending = False self._graceful_shutdown_pending = 0
self._delete_pending = False self._delete_pending = False
params = { params = {
@ -299,16 +302,21 @@ class CrawlJob(ABC):
async def graceful_shutdown(self): async def graceful_shutdown(self):
""" attempt to graceful stop the crawl, all data should be uploaded """ """ attempt to graceful stop the crawl, all data should be uploaded """
if self._graceful_shutdown_pending: if (
self._graceful_shutdown_pending
and (time.time() - self._graceful_shutdown_pending) < SHUTDOWN_ATTEMPT_WAIT
):
print("Already trying to stop crawl gracefully", flush=True) print("Already trying to stop crawl gracefully", flush=True)
return {"success": False, "error": "already_stopping"} return {"success": False, "error": "already_stopping"}
print("Stopping crawl", flush=True) print("Stopping crawl", flush=True)
if not await self._send_shutdown_signal(): if not await self._send_shutdown_signal("SIGUSR1"):
return {"success": False, "error": "unreachable"} return {"success": False, "error": "unreachable"}
self._graceful_shutdown_pending = True await self._send_shutdown_signal("SIGTERM")
self._graceful_shutdown_pending = time.time()
await self.update_crawl(state="stopping") await self.update_crawl(state="stopping")
@ -403,7 +411,7 @@ class CrawlJob(ABC):
""" set number of replicas """ """ set number of replicas """
@abstractmethod @abstractmethod
async def _send_shutdown_signal(self): async def _send_shutdown_signal(self, signame):
""" gracefully shutdown crawl """ """ gracefully shutdown crawl """
@property @property

View File

@ -55,17 +55,14 @@ class K8SCrawlJob(K8SJobMixin, CrawlJob):
except: except:
return None return None
async def _send_shutdown_signal(self): async def _send_shutdown_signal(self, signame):
pods = await self.core_api.list_namespaced_pod( pods = await self.core_api.list_namespaced_pod(
namespace=self.namespace, namespace=self.namespace,
label_selector=f"crawl={self.job_id},role=crawler", label_selector=f"crawl={self.job_id},role=crawler",
) )
return await send_signal_to_pods( return await send_signal_to_pods(
self.core_api_ws, self.core_api_ws, self.namespace, pods.items, signame
self.namespace,
pods.items,
"SIGINT",
) )
# pylint: disable=line-too-long # pylint: disable=line-too-long

View File

@ -259,9 +259,9 @@ spec:
- /tmp/crawl-config.json - /tmp/crawl-config.json
- --redisStoreUrl - --redisStoreUrl
- {{ redis_url }} - {{ redis_url }}
{%- if profile_filename %} {%- if env.PROFILE_FILENAME %}
- --profile - --profile
- "@profiles/{{ profile_filename }}" - "@profiles/{{ env.PROFILE_FILENAME }}"
{%- endif %} {%- endif %}
volumeMounts: volumeMounts:

View File

@ -34,6 +34,8 @@ async def send_signal_to_pods(core_api_ws, namespace, pods, signame, func=None):
if func and not func(pod.metadata): if func and not func(pod.metadata):
continue continue
print(f"Sending {signame} to {pod.metadata.name}", flush=True)
await core_api_ws.connect_get_namespaced_pod_exec( await core_api_ws.connect_get_namespaced_pod_exec(
pod.metadata.name, pod.metadata.name,
namespace=namespace, namespace=namespace,

View File

@ -54,15 +54,15 @@ class SwarmCrawlJob(SwarmJobMixin, CrawlJob):
None, runner.get_service, f"crawl-{self.job_id}-0_crawler" None, runner.get_service, f"crawl-{self.job_id}-0_crawler"
) )
async def _send_shutdown_signal(self): async def _send_shutdown_signal(self, signame):
loop = asyncio.get_running_loop() loop = asyncio.get_running_loop()
count = 0 count = 0
for num in range(0, self.scale): for num in range(0, self.scale):
name = f"crawl-{self.job_id}-{num}_crawler" name = f"crawl-{self.job_id}-{num}_crawler"
print(f"Sending SIGABRT to {name}", flush=True) print(f"Sending {signame} to {name}", flush=True)
count += await loop.run_in_executor( count += await loop.run_in_executor(
None, runner.ping_containers, name, "SIGABRT" None, runner.ping_containers, name, signame
) )
# for now, assume success if at least 1 container is signaled # for now, assume success if at least 1 container is signaled