From fd7e81b8b7e92b5557802b64df30863aa6c28860 Mon Sep 17 00:00:00 2001 From: Ilya Kreymer Date: Mon, 8 May 2023 14:02:20 -0700 Subject: [PATCH] stopping fix: backend fixes for #836 + prep for additional status fields (#837) * stopping fix: backend fixes for #836 - sets 'stopping' field on crawl when crawl is being stopped (both via db and on k8s object) - k8s: show 'stopping' as part of crawljob object, update subchart - set 'currCrawlStopping' on workflow - support old and new browsertrix-crawler stopping keys - tests: add tests for new stopping state, also test canceling crawl (disable test for stopping crawl, currently failing) - catch redis error when getting stats operator: additional optimizations: - run pvc removal as background task - catch any exceptions in finalizer stage (eg. if db is down), return false until finalizer completes --- backend/btrixcloud/crawlconfigs.py | 2 + backend/btrixcloud/crawls.py | 46 +++++---- backend/btrixcloud/operator.py | 34 ++++--- backend/btrixcloud/utils.py | 2 +- backend/test/conftest.py | 29 ++++++ backend/test/test_stop_cancel_crawl.py | 108 +++++++++++++++++++++ chart/btrix-crds/templates/crawlerjob.yaml | 5 + chart/charts/btrix-crds-0.1.0.tgz | Bin 965 -> 992 bytes chart/test/test.yaml | 2 + 9 files changed, 196 insertions(+), 32 deletions(-) create mode 100644 backend/test/test_stop_cancel_crawl.py diff --git a/backend/btrixcloud/crawlconfigs.py b/backend/btrixcloud/crawlconfigs.py index 82e5ef08..1da3d668 100644 --- a/backend/btrixcloud/crawlconfigs.py +++ b/backend/btrixcloud/crawlconfigs.py @@ -201,6 +201,7 @@ class CrawlConfigOut(CrawlConfig): currCrawlStartTime: Optional[datetime] currCrawlState: Optional[str] currCrawlSize: Optional[int] = 0 + currCrawlStopping: Optional[bool] = False profileName: Optional[str] @@ -637,6 +638,7 @@ class CrawlConfigOps: crawlconfig.currCrawlStartTime = crawl.started crawlconfig.currCrawlState = crawl.state crawlconfig.currCrawlSize = crawl.stats.get("size", 0) if crawl.stats else 0 + crawlconfig.currCrawlStopping = crawl.stopping async def get_crawl_config_out(self, cid: uuid.UUID, org: Organization): """Return CrawlConfigOut, including state of currently running crawl, if active diff --git a/backend/btrixcloud/crawls.py b/backend/btrixcloud/crawls.py index bdbeddc3..362358b5 100644 --- a/backend/btrixcloud/crawls.py +++ b/backend/btrixcloud/crawls.py @@ -27,16 +27,13 @@ from .users import User from .utils import dt_now, ts_now, get_redis_crawl_stats, parse_jsonl_error_messages -CRAWL_STATES = ( - "starting", - "running", - "stopping", - "complete", - "canceled", - "partial_complete", - "timed_out", - "failed", -) +RUNNING_STATES = ("running", "pending-wait", "generate-wacz", "uploading-wacz") + +RUNNING_AND_STARTING_STATES = ("starting", "waiting", *RUNNING_STATES) + +NON_RUNNING_STATES = ("complete", "canceled", "partial_complete", "timed_out", "failed") + +ALL_CRAWL_STATES = (*NON_RUNNING_STATES, *RUNNING_AND_STARTING_STATES) # ============================================================================ @@ -104,6 +101,8 @@ class Crawl(CrawlConfigCore): errors: Optional[List[str]] = [] + stopping: Optional[bool] = False + # ============================================================================ class CrawlOut(Crawl): @@ -155,6 +154,8 @@ class ListCrawlOut(BaseMongoModel): seedCount: Optional[int] = 0 errors: Optional[List[str]] + stopping: Optional[bool] = False + # ============================================================================ class CrawlCompleteIn(BaseModel): @@ -236,11 +237,11 @@ class CrawlOps: query["userid"] = userid if running_only: - query["state"] = {"$in": ["running", "starting", "stopping"]} + query["state"] = {"$in": list(RUNNING_AND_STARTING_STATES)} # Override running_only if state list is explicitly passed if state: - validated_states = [value for value in state if value in CRAWL_STATES] + validated_states = [value for value in state if value in ALL_CRAWL_STATES] query["state"] = {"$in": validated_states} if crawl_id: @@ -430,9 +431,13 @@ class CrawlOps: # if running, get stats directly from redis # more responsive, saves db update in operator - if crawl.state == "running": - redis = await self.get_redis(crawl.id) - crawl.stats = await get_redis_crawl_stats(redis, crawl.id) + if crawl.state in RUNNING_STATES: + try: + redis = await self.get_redis(crawl.id) + crawl.stats = await get_redis_crawl_stats(redis, crawl.id) + # redis not available, ignore + except exceptions.ConnectionError: + pass return crawl @@ -596,11 +601,12 @@ class CrawlOps: ) if result.get("success"): - # for canceletion, just set to canceled immediately if succeeded - await self.update_crawl_state( - crawl_id, "stopping" if graceful else "canceled" - ) - return {"success": True} + if graceful: + await self.crawls.find_one_and_update( + {"_id": crawl_id, "oid": org.id}, + {"$set": {"stopping": True}}, + ) + return result except Exception as exc: # pylint: disable=raise-missing-from diff --git a/backend/btrixcloud/operator.py b/backend/btrixcloud/operator.py index 7bd28ee0..eb09d909 100644 --- a/backend/btrixcloud/operator.py +++ b/backend/btrixcloud/operator.py @@ -77,7 +77,7 @@ class CrawlSpec(BaseModel): class CrawlStatus(BaseModel): """status from k8s CrawlJob object""" - state: str = "waiting" + state: str = "new" pagesFound: int = 0 pagesDone: int = 0 size: str = "" @@ -149,10 +149,15 @@ class BtrixOperator(K8sAPI): # if finalizing, crawl is being deleted if data.finalizing: # if not yet finished, assume it was canceled, mark as such + print(f"Finalizing crawl {crawl_id}, finished {status.finished}") if not status.finished: - await self.cancel_crawl(redis_url, crawl_id, cid, status, "canceled") + finalize = await self.cancel_crawl( + redis_url, crawl_id, cid, status, "canceled" + ) + else: + finalize = True - return await self.finalize_crawl(crawl_id, status, data.related) + return await self.finalize_crawl(crawl_id, status, data.related, finalize) if status.finished: return await self.handle_finished_delete_if_needed(crawl_id, status, spec) @@ -184,7 +189,7 @@ class BtrixOperator(K8sAPI): has_crawl_children = STS in data.children and crawl_sts in data.children[STS] if has_crawl_children: status = await self.sync_crawl_state(redis_url, crawl, status) - else: + elif not status.finished: status.state = "starting" if status.finished: @@ -278,9 +283,15 @@ class BtrixOperator(K8sAPI): # pylint: disable=too-many-arguments async def cancel_crawl(self, redis_url, crawl_id, cid, status, state): - """immediately cancel crawl with specified state""" - redis = await self._get_redis(redis_url) - await self.mark_finished(redis, crawl_id, cid, status, state) + """immediately cancel crawl with specified state + return true if db mark_finished update succeeds""" + try: + redis = await self._get_redis(redis_url) + await self.mark_finished(redis, crawl_id, cid, status, state) + return True + # pylint: disable=bare-except + except: + return False def _done_response(self, status, finalized=False): """done response for removing crawl""" @@ -290,17 +301,15 @@ class BtrixOperator(K8sAPI): "finalized": finalized, } - async def finalize_crawl(self, crawl_id, status, related): + async def finalize_crawl(self, crawl_id, status, related, finalized=True): """ensure crawl id ready for deletion return with finalized state""" pvcs = list(related[PVC].keys()) if pvcs: print("Deleting PVCs", pvcs) - await self.delete_pvc(crawl_id) + asyncio.create_task(self.delete_pvc(crawl_id)) finalized = False - else: - finalized = True return self._done_response(status, finalized) @@ -391,6 +400,9 @@ class BtrixOperator(K8sAPI): ) if crawl.stopping: + print("Graceful Stop") + await redis.set(f"{crawl.id}:stopping", "1") + # backwards compatibility with older crawler await redis.set("crawl-stop", "1") # optimization: don't update db once crawl is already running diff --git a/backend/btrixcloud/utils.py b/backend/btrixcloud/utils.py index 10cff21e..7f2b7b1e 100644 --- a/backend/btrixcloud/utils.py +++ b/backend/btrixcloud/utils.py @@ -47,7 +47,7 @@ async def get_redis_crawl_stats(redis, crawl_id): pages_done = await redis.llen(f"{crawl_id}:d") pages_found = await redis.scard(f"{crawl_id}:s") - archive_size = await redis.hvals("crawl-size") + archive_size = await redis.hvals(f"{crawl_id}:size") archive_size = sum(int(x) for x in archive_size) stats = {"found": pages_found, "done": pages_done, "size": archive_size} diff --git a/backend/test/conftest.py b/backend/test/conftest.py index 0506db9f..b8b1e6fb 100644 --- a/backend/test/conftest.py +++ b/backend/test/conftest.py @@ -174,6 +174,30 @@ def crawler_userid(crawler_auth_headers): return r.json()["id"] +@pytest.fixture(scope="session") +def _crawler_create_config_only(crawler_auth_headers, default_org_id): + # Start crawl. + crawl_data = { + "runNow": False, + "name": "Crawler User Test Crawl", + "description": "crawler test crawl", + "config": { + "seeds": [{"url": "https://webrecorder.net/"}], + "pageExtraDelay": 5, + "limit": 4, + }, + } + r = requests.post( + f"{API_PREFIX}/orgs/{default_org_id}/crawlconfigs/", + headers=crawler_auth_headers, + json=crawl_data, + ) + data = r.json() + + global _crawler_config_id + _crawler_config_id = data["added"] + + @pytest.fixture(scope="session") def crawler_crawl_id(crawler_auth_headers, default_org_id): # Start crawl. @@ -239,6 +263,11 @@ def crawler_config_id(crawler_crawl_id): return _crawler_config_id +@pytest.fixture(scope="session") +def crawler_config_id_only(_crawler_create_config_only): + return _crawler_config_id + + @pytest.fixture(scope="session") def sample_crawl_data(): return { diff --git a/backend/test/test_stop_cancel_crawl.py b/backend/test/test_stop_cancel_crawl.py new file mode 100644 index 00000000..a6a42f58 --- /dev/null +++ b/backend/test/test_stop_cancel_crawl.py @@ -0,0 +1,108 @@ +import requests +import time +import os +import pytest + +from .conftest import API_PREFIX + +crawl_id = None + + +def get_crawl(org_id, auth_headers, crawl_id): + r = requests.get( + f"{API_PREFIX}/orgs/{org_id}/crawls/{crawl_id}/replay.json", + headers=auth_headers, + ) + assert r.status_code == 200 + return r.json() + + +def test_start_crawl_to_cancel( + default_org_id, crawler_config_id_only, crawler_auth_headers +): + r = requests.post( + f"{API_PREFIX}/orgs/{default_org_id}/crawlconfigs/{crawler_config_id_only}/run", + headers=crawler_auth_headers, + ) + assert r.status_code == 200 + data = r.json() + + assert data.get("started") + + global crawl_id + crawl_id = data["started"] + + +def test_cancel_crawl(default_org_id, crawler_auth_headers): + data = get_crawl(default_org_id, crawler_auth_headers, crawl_id) + while data["state"] == "starting": + time.sleep(5) + data = get_crawl(default_org_id, crawler_auth_headers, crawl_id) + + r = requests.post( + f"{API_PREFIX}/orgs/{default_org_id}/crawls/{crawl_id}/cancel", + headers=crawler_auth_headers, + ) + data = r.json() + assert data["success"] == True + + data = get_crawl(default_org_id, crawler_auth_headers, crawl_id) + + while data["state"] == "running": + data = get_crawl(default_org_id, crawler_auth_headers, crawl_id) + + assert data["state"] == "canceled" + assert data["stopping"] == False + + assert len(data["resources"]) == 0 + + +@pytest.mark.skipif(os.environ.get("CI") is not None, reason="Skip Test on CI") +def test_start_crawl_to_stop( + default_org_id, crawler_config_id_only, crawler_auth_headers +): + r = requests.post( + f"{API_PREFIX}/orgs/{default_org_id}/crawlconfigs/{crawler_config_id_only}/run", + headers=crawler_auth_headers, + ) + assert r.status_code == 200 + data = r.json() + + assert data.get("started") + + global crawl_id + crawl_id = data["started"] + + +@pytest.mark.skipif(os.environ.get("CI") is not None, reason="Skip Test on CI") +def test_stop_crawl(default_org_id, crawler_config_id_only, crawler_auth_headers): + data = get_crawl(default_org_id, crawler_auth_headers, crawl_id) + while data["state"] == "starting": + time.sleep(5) + data = get_crawl(default_org_id, crawler_auth_headers, crawl_id) + + r = requests.post( + f"{API_PREFIX}/orgs/{default_org_id}/crawls/{crawl_id}/stop", + headers=crawler_auth_headers, + ) + data = r.json() + assert data["success"] == True + + # test crawl + data = get_crawl(default_org_id, crawler_auth_headers, crawl_id) + assert data["stopping"] == True + + # test workflow + r = requests.get( + f"{API_PREFIX}/orgs/{default_org_id}/crawlconfigs/{crawler_config_id_only}", + headers=crawler_auth_headers, + ) + assert r.json()["currCrawlStopping"] == True + + while data["state"] == "running": + data = get_crawl(default_org_id, crawler_auth_headers, crawl_id) + + assert data["state"] == "partial_complete" + assert data["stopping"] == True + + assert len(data["resources"]) == 1 diff --git a/chart/btrix-crds/templates/crawlerjob.yaml b/chart/btrix-crds/templates/crawlerjob.yaml index 6fd3dd09..ac49416f 100644 --- a/chart/btrix-crds/templates/crawlerjob.yaml +++ b/chart/btrix-crds/templates/crawlerjob.yaml @@ -66,6 +66,11 @@ spec: jsonPath: .status.finished description: "if set, time crawl has finished" + - name: Stopping + type: boolean + jsonPath: .spec.stopping + description: "if set, crawl is being stopped" + - name: Files Added type: integer jsonPath: .status.filesAdded diff --git a/chart/charts/btrix-crds-0.1.0.tgz b/chart/charts/btrix-crds-0.1.0.tgz index 831b1f1f27c99da5856f27e743a84a9dac66685d..cf45cb258c483c82c264e099a8cf7dba8080978e 100644 GIT binary patch delta 946 zcmV;j15NzJ2jB;gJbzJ-;zkVSIlscX=OzSL*owC;+m+gf6WVR>q`plO1H&dWBYOgS zt@_`q8N!i-(B59zJ?#m z+rRN}GVBemu7=~ua1@V+y?8Jhj>kQS&&;D!O16S}@&CqZAAjzRh%oX45=*V7uo(uz zn9q-KG>GCLLrzH=>s22yAxt%8NbnQsEhD-5fmz8`7cjfOO5=dVk#N|H){4ed+(#kT48``V0_8;*&+?o;Z5^kt3=5n?dj>RW_~q{2p1C zl;SNeq>{cz3$!8=R!jqcT}%5KTI+-(Ut&?3>oN=2VCrl*)vnFoivrn*6te(eNxC%E zEU3(P!Pt2KR1{p?Uv=VDCND|kH88Jbsg`9fsM1%qynoVU-ygyOgpjT|2mq=HQ@seQ z6Kn>R9V2Zpn}Veh_bRJNEU~F86RI-v8Nyi#Tk;hC`um_tMUDpx0GE|}^b?Qg!mg$e zIaTBh>vld?+tMp5+dY+Zg@rgkXoKoz{>z8aL+b^=Zr!Fon-E0jtnuD5^Ihcj4p;+hZGne3Cl^8Us*B)dOM8JW?dK;W3E{>* zZ+URn6na;8^PZZz{#LI3@qn$&^9ru(wks55;djcPPs1+-P0t1iAKG1y&gNSWj*DF6 UB5z*)2><~9|5rZR=KvT007`t?2LJ#7 delta 919 zcmV;I18Dr<2gL`FJb%q@<3 zE&A@ojODc?%k5_CB-n*{F+`F4K5H=o>w;agSS+1-#Jni?l2aiah z?F(kK+;>KiQOiJy0;h1O!oYqqI0a!$E>q!9gH4mrTW9S1l3M>;EKDveaz7=p&5_nR zImN7wXiBA@ld*bNvM~IORJ%5RFA8KMQp^H?CF#;s zv!F8H1!Ly{P*HGkf7OXsnY<*C*TB4%rCOG`ph{oa@_$N`eSZiC5JI}=!^ItxU9$GH|cJo#zYfNoTZAcp=E9;-Kr`pt< z4B_M{e19wxB!w0^G=x5tVW}RK-l}jRG0(V4ViRVUwU-1ES>|h3r7gXjl0wrauxdsaxRw0aI4iZuH(8y`d47%+*{ra@LK#6 z*mt^A+1G=&-8;~CCdb*9e|}xEn|@unb{0o?zkkUB-7%2Gf7PHJ3;48D#@#7@u{+~IQDFkkPBe6KR}JlN-*sGIvCx_92su1 ztb@;&)WgF0x80khR9PfQpcnA*X7&eo4=7EpMTQx$hJ_$lkS@TkB=2Z%r0fcQw51f3G1gw*MV!=-eCNd>6UB1J(dr zTj1f%$wkn->LPgA(q3Ro`}xU8Lb$QdTOQn=jo#JWyr-@Je=Ar2c)(WXc?H*T2f3Ra tey9BTH2hM~^lXsup`}{-=x;qZE^?8Jd=T^UPXGV_|Nk8Bv-SWO0053&(!~G( diff --git a/chart/test/test.yaml b/chart/test/test.yaml index 9bf5fb3b..5792331e 100644 --- a/chart/test/test.yaml +++ b/chart/test/test.yaml @@ -4,6 +4,8 @@ backend_pull_policy: "Never" frontend_pull_policy: "Never" +operator_resync_seconds: 5 + mongo_auth: # specify either username + password (for local mongo) username: root