Fixes #2406 Converts migration 0042 to launch a background job (parallelized across several pods) to migrate all crawls by optimizing their pages and setting `version: 2` on the crawl when complete. Also Optimizes MongoDB queries for better performance. Migration Improvements: - Add `isMigrating` and `version` fields to `BaseCrawl` - Add new background job type to use in migration with accompanying `migration_job.yaml` template that allows for parallelization - Add new API endpoint to launch this crawl migration job, and ensure that we have list and retry endpoints for superusers that work with background jobs that aren't tied to a specific org - Rework background job models and methods now that not all background jobs are tied to a single org - Ensure new crawls and uploads have `version` set to `2` - Modify crawl and collection replay.json endpoints to only include fields for replay optimization (`initialPages`, `pageQueryUrl`, `preloadResources`) if all relevant crawls/uploads have `version` set to `2` - Remove `distinct` calls from migration pathways - Consolidate collection recompute stats Query Optimizations: - Remove all uses of $group and $facet - Optimize /replay.json endpoints to precompute preload_resources, avoid fetching crawl list twice - Optimize /collections endpoint by not fetching resources - Rename /urls -> /pageUrlCounts and avoid $group, instead sort with index, either by seed + ts or by url to get top matches. - Use $gte instead of $regex to get prefix matches on URL - Use $text instead of $regex to get text search on title - Remove total from /pages and /pageUrlCounts queries by not using $facet - frontend: only call /pageUrlCounts when dialog is opened. --------- Co-authored-by: Ilya Kreymer <ikreymer@gmail.com> Co-authored-by: Emma Segal-Grossman <hi@emma.cafe> Co-authored-by: Ilya Kreymer <ikreymer@users.noreply.github.com>
		
			
				
	
	
		
			69 lines
		
	
	
		
			2.2 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			69 lines
		
	
	
		
			2.2 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| """
 | |
| Migration 0037 -- upload pages
 | |
| """
 | |
| 
 | |
| from btrixcloud.migrations import BaseMigration
 | |
| from btrixcloud.models import Organization, UploadedCrawl
 | |
| 
 | |
| 
 | |
| MIGRATION_VERSION = "0037"
 | |
| 
 | |
| 
 | |
| class Migration(BaseMigration):
 | |
|     """Migration class."""
 | |
| 
 | |
|     # pylint: disable=unused-argument
 | |
|     def __init__(self, mdb, **kwargs):
 | |
|         super().__init__(mdb, migration_version=MIGRATION_VERSION)
 | |
| 
 | |
|         self.background_job_ops = kwargs.get("background_job_ops")
 | |
|         self.page_ops = kwargs.get("page_ops")
 | |
|         self.coll_ops = kwargs.get("coll_ops")
 | |
| 
 | |
|     async def migrate_up(self):
 | |
|         """Perform migration up.
 | |
| 
 | |
|         Start background jobs to parse uploads and add their pages to db
 | |
|         """
 | |
|         if not self.background_job_ops or not self.page_ops or not self.coll_ops:
 | |
|             print("Unable to start migration, missing ops", flush=True)
 | |
|             return
 | |
| 
 | |
|         mdb_orgs = self.mdb["organizations"]
 | |
|         mdb_crawls = self.mdb["crawls"]
 | |
| 
 | |
|         uploads_query = {"type": "upload"}
 | |
| 
 | |
|         # Re-add pages for all uploads
 | |
|         upload_count = await mdb_crawls.count_documents(uploads_query)
 | |
|         current_index = 1
 | |
| 
 | |
|         async for res in mdb_crawls.find(uploads_query):
 | |
|             upload = UploadedCrawl.from_dict(res)
 | |
|             print(
 | |
|                 f"Adding pages for upload {current_index}/{upload_count}",
 | |
|                 flush=True,
 | |
|             )
 | |
| 
 | |
|             try:
 | |
|                 await self.page_ops.re_add_crawl_pages(upload.id, upload.oid)
 | |
|             # pylint: disable=broad-exception-caught
 | |
|             except Exception as err:
 | |
|                 print(
 | |
|                     f"Error adding pages for upload {upload.id}: {err}",
 | |
|                     flush=True,
 | |
|                 )
 | |
|             current_index += 1
 | |
| 
 | |
|         # Update collections to account for new pages
 | |
|         async for org_dict in mdb_orgs.find({}):
 | |
|             org = Organization.from_dict(org_dict)
 | |
|             try:
 | |
|                 await self.coll_ops.recalculate_org_collection_stats(org)
 | |
|             # pylint: disable=broad-exception-caught
 | |
|             except Exception as err:
 | |
|                 print(
 | |
|                     f"Error updating collections after adding pages for org {org.id}: {err}",
 | |
|                     flush=True,
 | |
|                 )
 |