migration improvements: (#1228)
* migration improvements + rerunning migrations: (fixes #1227) - avoid starting some workers while migration is still running - ensure workers that aren't performing migration await for migration to complete - backend will not be valid until migration is run * allow rerunning migration from specified version via --set rerun_from_migration=<VERSION> (replaces rerun_last_migration)
This commit is contained in:
		
							parent
							
								
									1f74f03447
								
							
						
					
					
						commit
						86a424af93
					
				| @ -52,15 +52,14 @@ def init_db(): | ||||
| 
 | ||||
| 
 | ||||
| # ============================================================================ | ||||
| async def ping_db(mdb, db_inited): | ||||
| async def ping_db(mdb): | ||||
|     """run in loop until db is up, set db_inited['inited'] property to true""" | ||||
|     print("Waiting DB", flush=True) | ||||
|     while True: | ||||
|         try: | ||||
|             result = await mdb.command("ping") | ||||
|             assert result.get("ok") | ||||
|             db_inited["inited"] = True | ||||
|             print("DB Ready!") | ||||
|             print("DB reached") | ||||
|             break | ||||
|         # pylint: disable=broad-exception-caught | ||||
|         except Exception: | ||||
| @ -88,13 +87,14 @@ async def update_and_prepare_db( | ||||
|     - Create/update default org | ||||
| 
 | ||||
|     """ | ||||
|     await ping_db(mdb, db_inited) | ||||
|     await ping_db(mdb) | ||||
|     print("Database setup started", flush=True) | ||||
|     if await run_db_migrations(mdb, user_manager): | ||||
|         await drop_indexes(mdb) | ||||
|     await create_indexes(org_ops, crawl_ops, crawl_config_ops, coll_ops, invite_ops) | ||||
|     await user_manager.create_super_user() | ||||
|     await org_ops.create_default_org() | ||||
|     db_inited["inited"] = True | ||||
|     print("Database updated and ready", flush=True) | ||||
| 
 | ||||
| 
 | ||||
| @ -141,6 +141,26 @@ async def run_db_migrations(mdb, user_manager): | ||||
|     return migrations_run | ||||
| 
 | ||||
| 
 | ||||
| # ============================================================================ | ||||
| async def await_db_and_migrations(mdb, db_inited): | ||||
|     """await that db is available and any migrations in progress finish""" | ||||
|     await ping_db(mdb) | ||||
|     print("Database setup started", flush=True) | ||||
| 
 | ||||
|     base_migration = BaseMigration(mdb, CURR_DB_VERSION) | ||||
|     while await base_migration.migrate_up_needed(ignore_rerun=True): | ||||
|         version = await base_migration.get_db_version() | ||||
|         print( | ||||
|             f"Waiting for migrations to finish, DB at {version}, latest {CURR_DB_VERSION}", | ||||
|             flush=True, | ||||
|         ) | ||||
| 
 | ||||
|         await asyncio.sleep(5) | ||||
| 
 | ||||
|     db_inited["inited"] = True | ||||
|     print("Database updated and ready", flush=True) | ||||
| 
 | ||||
| 
 | ||||
| # ============================================================================ | ||||
| async def drop_indexes(mdb): | ||||
|     """Drop all database indexes.""" | ||||
|  | ||||
| @ -10,7 +10,7 @@ from fastapi import FastAPI, HTTPException | ||||
| from fastapi.responses import JSONResponse | ||||
| from fastapi.routing import APIRouter | ||||
| 
 | ||||
| from .db import init_db, ping_db, update_and_prepare_db | ||||
| from .db import init_db, await_db_and_migrations, update_and_prepare_db | ||||
| 
 | ||||
| from .emailsender import EmailSender | ||||
| from .invites import init_invites | ||||
| @ -157,7 +157,7 @@ def main(): | ||||
|             ) | ||||
|         ) | ||||
|     else: | ||||
|         asyncio.create_task(ping_db(mdb, db_inited)) | ||||
|         asyncio.create_task(await_db_and_migrations(mdb, db_inited)) | ||||
| 
 | ||||
|     app.include_router(org_ops.router) | ||||
| 
 | ||||
|  | ||||
| @ -15,7 +15,7 @@ class BaseMigration: | ||||
|     def __init__(self, mdb, migration_version="0001"): | ||||
|         self.mdb = mdb | ||||
|         self.migration_version = migration_version | ||||
|         self.rerun_migration = os.environ.get("RERUN_LAST_MIGRATION") == "1" | ||||
|         self.rerun_from_migration = os.environ.get("RERUN_FROM_MIGRATION") | ||||
| 
 | ||||
|     async def get_db_version(self): | ||||
|         """Get current db version from database.""" | ||||
| @ -37,7 +37,7 @@ class BaseMigration: | ||||
|             {}, {"$set": {"version": self.migration_version}}, upsert=True | ||||
|         ) | ||||
| 
 | ||||
|     async def migrate_up_needed(self): | ||||
|     async def migrate_up_needed(self, ignore_rerun=False): | ||||
|         """Verify migration up is needed and return boolean indicator.""" | ||||
|         db_version = await self.get_db_version() | ||||
|         print(f"Current database version before migration: {db_version}") | ||||
| @ -48,8 +48,12 @@ class BaseMigration: | ||||
|         if db_version < self.migration_version: | ||||
|             return True | ||||
| 
 | ||||
|         if self.rerun_migration and db_version == self.migration_version: | ||||
|             print("Rerunning last migration") | ||||
|         if ( | ||||
|             not ignore_rerun | ||||
|             and self.rerun_from_migration | ||||
|             and self.rerun_from_migration <= self.migration_version | ||||
|         ): | ||||
|             print(f"Rerunning migrations from: {self.migration_version}") | ||||
|             return True | ||||
|         return False | ||||
| 
 | ||||
|  | ||||
| @ -34,7 +34,7 @@ data: | ||||
| 
 | ||||
|   IDLE_TIMEOUT: "{{ .Values.profile_browser_idle_seconds | default 60 }}" | ||||
| 
 | ||||
|   RERUN_LAST_MIGRATION: "{{ .Values.rerun_last_migration }}" | ||||
|   RERUN_FROM_MIGRATION: "{{ .Values.rerun_from_migration }}" | ||||
| 
 | ||||
|   PRESIGN_DURATION_MINUTES: "{{ .Values.storage_presign_duration_minutes | default 60 }}" | ||||
| 
 | ||||
|  | ||||
		Loading…
	
		Reference in New Issue
	
	Block a user