Modify page upload migration (#2400)

Related to #2396 

Changes to migration 0037:
- Re-adds pages in migration rather than in background job to avoid race
condition with later migrations
- Re-adds pages for all uploads in all orgs

Fix for readd pages for org:
- Ensure org filter is applied!
- Fix wrong type
- Remove distinct, use iterator to iterate over crawls faster.

---------

Co-authored-by: Ilya Kreymer <ikreymer@gmail.com>
This commit is contained in:
Tessa Walsh 2025-02-17 19:47:58 -05:00 committed by GitHub
parent 629cf7c404
commit 6c2d8c88c8
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 44 additions and 43 deletions

View File

@ -2,9 +2,8 @@
Migration 0037 -- upload pages
"""
from uuid import UUID
from btrixcloud.migrations import BaseMigration
from btrixcloud.models import Organization, UploadedCrawl
MIGRATION_VERSION = "0037"
@ -19,54 +18,52 @@ class Migration(BaseMigration):
self.background_job_ops = kwargs.get("background_job_ops")
self.page_ops = kwargs.get("page_ops")
async def org_upload_pages_already_added(self, oid: UUID) -> bool:
"""Check if upload pages have already been added for this org"""
if self.page_ops is None:
print(
f"page_ops missing, assuming pages need to be added for org {oid}",
flush=True,
)
return False
mdb_crawls = self.mdb["crawls"]
async for upload in mdb_crawls.find({"oid": oid, "type": "upload"}):
upload_id = upload["_id"]
_, total = await self.page_ops.list_pages(upload_id)
if total > 0:
return True
return False
self.coll_ops = kwargs.get("coll_ops")
async def migrate_up(self):
"""Perform migration up.
Start background jobs to parse uploads and add their pages to db
"""
if self.background_job_ops is None:
print(
"Unable to start background job, missing background_job_ops", flush=True
)
if not self.background_job_ops or not self.page_ops or not self.coll_ops:
print("Unable to start migration, missing ops", flush=True)
return
mdb_orgs = self.mdb["organizations"]
async for org in mdb_orgs.find():
oid = org["_id"]
mdb_crawls = self.mdb["crawls"]
pages_already_added = await self.org_upload_pages_already_added(oid)
uploads_query = {"type": "upload"}
if pages_already_added:
print(
f"Skipping org {oid}, upload pages already added to db", flush=True
)
continue
# Re-add pages for all uploads
upload_count = await mdb_crawls.count_documents(uploads_query)
current_index = 1
async for res in mdb_crawls.find(uploads_query):
upload = UploadedCrawl.from_dict(res)
print(
f"Adding pages for upload {current_index}/{upload_count}",
flush=True,
)
try:
await self.background_job_ops.create_re_add_org_pages_job(
oid, crawl_type="upload"
)
await self.page_ops.re_add_crawl_pages(upload.id, upload.oid)
# pylint: disable=broad-exception-caught
except Exception as err:
print(
f"Error starting background job to add upload pges to org {oid}: {err}",
f"Error adding pages for upload {upload.id}: {err}",
flush=True,
)
current_index += 1
# Update collections to account for new pages
async for org_dict in mdb_orgs.find({}):
org = Organization.from_dict(org_dict)
try:
await self.coll_ops.recalculate_org_collection_dates(org)
await self.coll_ops.recalculate_org_collection_counts_tags(org)
# pylint: disable=broad-exception-caught
except Exception as err:
print(
f"Error updating collections after adding pages for org {org.id}: {err}",
flush=True,
)

View File

@ -1287,7 +1287,7 @@ class Page(BaseMongoModel):
mime: Optional[str] = None
filename: Optional[str] = None
depth: Optional[int] = None
favIconUrl: Optional[AnyHttpUrl] = None
favIconUrl: Optional[str] = None
isSeed: Optional[bool] = False
# manual review

View File

@ -70,7 +70,7 @@ def init_ops() -> Tuple[
profile_ops,
)
coll_ops = CollectionOps(mdb, crawl_manager, org_ops, event_webhook_ops)
coll_ops = CollectionOps(mdb, storage_ops, org_ops, event_webhook_ops)
base_crawl_init = (
mdb,

View File

@ -841,8 +841,6 @@ class PageOps:
]
)
print(f"Merged QA data from temp db {qa_temp_db_name}")
# async for data in qa_temp_db.find({}):
# print("qa data", data)
assert await cursor.to_list() == []
await qa_temp_db.drop()
@ -855,13 +853,19 @@ class PageOps:
self, org: Organization, crawl_type: Optional[str] = None
):
"""Re-add pages for all crawls and uploads in org"""
match_query: Dict[str, object] = {"finished": {"$ne": None}}
match_query: Dict[str, Union[object, UUID]] = {
"oid": org.id,
"finished": {"$ne": None},
}
if crawl_type in ("crawl", "upload"):
match_query["type"] = crawl_type
crawl_ids = await self.crawls.distinct("_id", match_query)
for crawl_id in crawl_ids:
await self.re_add_crawl_pages(crawl_id, org.id)
count = 1
total = await self.crawls.count_documents(match_query)
async for crawl in self.crawls.find(match_query, projection={"_id": 1}):
print(f"Processing crawl {count} of {total}")
await self.re_add_crawl_pages(crawl.get("_id"), org.id)
count += 1
async def get_qa_run_aggregate_counts(
self,