CrawlConfig migration and crawl stats query optimization (#633)
* Drop crawl stats fields from CrawlConfig and add migration * Remove migrate_down from BaseMigration * Get crawl stats from optimized mongo query
This commit is contained in:
parent
1dea7ecdf9
commit
e2f359c352
@ -139,15 +139,6 @@ class CrawlConfig(BaseMongoModel):
|
|||||||
|
|
||||||
crawlAttemptCount: Optional[int] = 0
|
crawlAttemptCount: Optional[int] = 0
|
||||||
|
|
||||||
# These fields would ideally be in CrawlConfigOut, but are being
|
|
||||||
# kept here to prevent the need for a migration. Eventually, we
|
|
||||||
# may want to add a migration and move them, as these values are
|
|
||||||
# now generated dynamically in API endpoints as needed.
|
|
||||||
crawlCount: Optional[int] = 0
|
|
||||||
lastCrawlId: Optional[str]
|
|
||||||
lastCrawlTime: Optional[datetime]
|
|
||||||
lastCrawlState: Optional[str]
|
|
||||||
|
|
||||||
newId: Optional[UUID4]
|
newId: Optional[UUID4]
|
||||||
oldId: Optional[UUID4]
|
oldId: Optional[UUID4]
|
||||||
inactive: Optional[bool] = False
|
inactive: Optional[bool] = False
|
||||||
@ -165,6 +156,11 @@ class CrawlConfigOut(CrawlConfig):
|
|||||||
profileName: Optional[str]
|
profileName: Optional[str]
|
||||||
userName: Optional[str]
|
userName: Optional[str]
|
||||||
|
|
||||||
|
crawlCount: Optional[int] = 0
|
||||||
|
lastCrawlId: Optional[str]
|
||||||
|
lastCrawlTime: Optional[datetime]
|
||||||
|
lastCrawlState: Optional[str]
|
||||||
|
|
||||||
|
|
||||||
# ============================================================================
|
# ============================================================================
|
||||||
class CrawlConfigIdNameOut(BaseMongoModel):
|
class CrawlConfigIdNameOut(BaseMongoModel):
|
||||||
@ -436,21 +432,13 @@ class CrawlConfigOps:
|
|||||||
|
|
||||||
async def _annotate_with_crawl_stats(self, crawlconfig: CrawlConfigOut):
|
async def _annotate_with_crawl_stats(self, crawlconfig: CrawlConfigOut):
|
||||||
"""Annotate crawlconfig with information about associated crawls"""
|
"""Annotate crawlconfig with information about associated crawls"""
|
||||||
crawls = await self.crawl_ops.list_crawls(cid=crawlconfig.id)
|
crawl_stats = await self.crawl_ops.get_latest_crawl_and_count_by_config(
|
||||||
|
cid=crawlconfig.id
|
||||||
crawlconfig.crawlCount = len(crawls)
|
)
|
||||||
|
crawlconfig.crawlCount = crawl_stats["crawl_count"]
|
||||||
finished_crawls = [crawl for crawl in crawls if crawl.finished]
|
crawlconfig.lastCrawlId = crawl_stats["last_crawl_id"]
|
||||||
if not finished_crawls:
|
crawlconfig.lastCrawlTime = crawl_stats["last_crawl_finished"]
|
||||||
return crawlconfig
|
crawlconfig.lastCrawlState = crawl_stats["last_crawl_state"]
|
||||||
|
|
||||||
sorted_crawls = sorted(finished_crawls, key=lambda crawl: crawl.finished)
|
|
||||||
last_crawl = sorted_crawls[-1]
|
|
||||||
|
|
||||||
crawlconfig.lastCrawlId = str(last_crawl.id)
|
|
||||||
crawlconfig.lastCrawlTime = last_crawl.finished
|
|
||||||
crawlconfig.lastCrawlState = last_crawl.state
|
|
||||||
|
|
||||||
return crawlconfig
|
return crawlconfig
|
||||||
|
|
||||||
async def get_crawl_config_out(self, cid: uuid.UUID, org: Organization):
|
async def get_crawl_config_out(self, cid: uuid.UUID, org: Organization):
|
||||||
|
@ -291,6 +291,28 @@ class CrawlOps:
|
|||||||
|
|
||||||
return await self._resolve_crawl_refs(crawl, org)
|
return await self._resolve_crawl_refs(crawl, org)
|
||||||
|
|
||||||
|
async def get_latest_crawl_and_count_by_config(self, cid: str):
|
||||||
|
"""Get crawl statistics for a crawl_config with id cid."""
|
||||||
|
stats = {
|
||||||
|
"crawl_count": 0,
|
||||||
|
"last_crawl_id": None,
|
||||||
|
"last_crawl_finished": None,
|
||||||
|
"last_crawl_state": None,
|
||||||
|
}
|
||||||
|
|
||||||
|
match_query = {"cid": cid, "finished": {"$ne": None}, "inactive": {"$ne": True}}
|
||||||
|
cursor = self.crawls.find(match_query).sort("finished", pymongo.DESCENDING)
|
||||||
|
results = await cursor.to_list(length=1000)
|
||||||
|
if results:
|
||||||
|
stats["crawl_count"] = len(results)
|
||||||
|
|
||||||
|
last_crawl = Crawl.from_dict(results[0])
|
||||||
|
stats["last_crawl_id"] = str(last_crawl.id)
|
||||||
|
stats["last_crawl_finished"] = last_crawl.finished
|
||||||
|
stats["last_crawl_state"] = last_crawl.state
|
||||||
|
|
||||||
|
return stats
|
||||||
|
|
||||||
async def _resolve_crawl_refs(
|
async def _resolve_crawl_refs(
|
||||||
self, crawl: Union[CrawlOut, ListCrawlOut], org: Optional[Organization]
|
self, crawl: Union[CrawlOut, ListCrawlOut], org: Optional[Organization]
|
||||||
):
|
):
|
||||||
|
@ -0,0 +1,68 @@
|
|||||||
|
"""
|
||||||
|
BaseMigration class to subclass in each migration module
|
||||||
|
"""
|
||||||
|
from pymongo.errors import OperationFailure
|
||||||
|
|
||||||
|
|
||||||
|
class BaseMigration:
|
||||||
|
"""Base Migration class."""
|
||||||
|
|
||||||
|
def __init__(self, mdb, migration_version="0001"):
|
||||||
|
self.mdb = mdb
|
||||||
|
self.migration_version = migration_version
|
||||||
|
|
||||||
|
async def get_db_version(self):
|
||||||
|
"""Get current db version from database."""
|
||||||
|
db_version = None
|
||||||
|
version_collection = self.mdb["version"]
|
||||||
|
version_record = await version_collection.find_one()
|
||||||
|
if not version_record:
|
||||||
|
return db_version
|
||||||
|
try:
|
||||||
|
db_version = version_record["version"]
|
||||||
|
except KeyError:
|
||||||
|
pass
|
||||||
|
return db_version
|
||||||
|
|
||||||
|
async def set_db_version(self):
|
||||||
|
"""Set db version to migration_version."""
|
||||||
|
version_collection = self.mdb["version"]
|
||||||
|
await version_collection.find_one_and_update(
|
||||||
|
{}, {"$set": {"version": self.migration_version}}, upsert=True
|
||||||
|
)
|
||||||
|
|
||||||
|
async def migrate_up_needed(self):
|
||||||
|
"""Verify migration up is needed and return boolean indicator."""
|
||||||
|
db_version = await self.get_db_version()
|
||||||
|
print(f"Current database version before migration: {db_version}")
|
||||||
|
print(f"Migration available to apply: {self.migration_version}")
|
||||||
|
# Databases from prior to migrations will not have a version set.
|
||||||
|
if not db_version:
|
||||||
|
return True
|
||||||
|
if db_version < self.migration_version:
|
||||||
|
return True
|
||||||
|
return False
|
||||||
|
|
||||||
|
async def migrate_up(self):
|
||||||
|
"""Perform migration up."""
|
||||||
|
raise NotImplementedError(
|
||||||
|
"Not implemented in base class - implement in subclass"
|
||||||
|
)
|
||||||
|
|
||||||
|
async def run(self):
|
||||||
|
"""Run migrations."""
|
||||||
|
if await self.migrate_up_needed():
|
||||||
|
print("Performing migration up", flush=True)
|
||||||
|
try:
|
||||||
|
await self.migrate_up()
|
||||||
|
await self.set_db_version()
|
||||||
|
except OperationFailure as err:
|
||||||
|
print(f"Error running migration {self.migration_version}: {err}")
|
||||||
|
return False
|
||||||
|
|
||||||
|
else:
|
||||||
|
print("No migration to apply - skipping", flush=True)
|
||||||
|
return False
|
||||||
|
|
||||||
|
print(f"Database successfully migrated to {self.migration_version}", flush=True)
|
||||||
|
return True
|
@ -5,10 +5,14 @@ import os
|
|||||||
|
|
||||||
from pymongo.errors import OperationFailure
|
from pymongo.errors import OperationFailure
|
||||||
|
|
||||||
|
from btrixcloud.migrations import BaseMigration
|
||||||
from btrixcloud.k8s.k8sapi import K8sAPI
|
from btrixcloud.k8s.k8sapi import K8sAPI
|
||||||
|
|
||||||
|
|
||||||
class Migration:
|
MIGRATION_VERSION = "0001"
|
||||||
|
|
||||||
|
|
||||||
|
class Migration(BaseMigration):
|
||||||
"""Migration class."""
|
"""Migration class."""
|
||||||
|
|
||||||
COLLECTIONS_AID_TO_OID = [
|
COLLECTIONS_AID_TO_OID = [
|
||||||
@ -19,42 +23,8 @@ class Migration:
|
|||||||
"profiles",
|
"profiles",
|
||||||
]
|
]
|
||||||
|
|
||||||
MIGRATION_VERSION = "0001"
|
def __init__(self, mdb, migration_version=MIGRATION_VERSION):
|
||||||
|
super().__init__(mdb, migration_version)
|
||||||
def __init__(self, mdb):
|
|
||||||
self.mdb = mdb
|
|
||||||
|
|
||||||
async def get_db_version(self):
|
|
||||||
"""Get current db version from database."""
|
|
||||||
db_version = None
|
|
||||||
version_collection = self.mdb["version"]
|
|
||||||
version_record = await version_collection.find_one()
|
|
||||||
if not version_record:
|
|
||||||
return db_version
|
|
||||||
try:
|
|
||||||
db_version = version_record["version"]
|
|
||||||
except KeyError:
|
|
||||||
pass
|
|
||||||
return db_version
|
|
||||||
|
|
||||||
async def set_db_version(self):
|
|
||||||
"""Set db version to version_number."""
|
|
||||||
version_collection = self.mdb["version"]
|
|
||||||
await version_collection.find_one_and_update(
|
|
||||||
{}, {"$set": {"version": self.MIGRATION_VERSION}}, upsert=True
|
|
||||||
)
|
|
||||||
|
|
||||||
async def migrate_up_needed(self):
|
|
||||||
"""Verify migration up is needed and return boolean indicator."""
|
|
||||||
db_version = await self.get_db_version()
|
|
||||||
print(f"Current database version before migration: {db_version}")
|
|
||||||
print(f"Migration available to apply: {self.MIGRATION_VERSION}")
|
|
||||||
# Databases from prior to migrations will not have a version set.
|
|
||||||
if not db_version:
|
|
||||||
return True
|
|
||||||
if db_version < self.MIGRATION_VERSION:
|
|
||||||
return True
|
|
||||||
return False
|
|
||||||
|
|
||||||
async def migrate_up(self):
|
async def migrate_up(self):
|
||||||
"""Perform migration up."""
|
"""Perform migration up."""
|
||||||
@ -98,25 +68,3 @@ class Migration:
|
|||||||
await k8s_api_instance.core_api.patch_namespaced_config_map(
|
await k8s_api_instance.core_api.patch_namespaced_config_map(
|
||||||
name=item_name, namespace=crawler_namespace, body=item
|
name=item_name, namespace=crawler_namespace, body=item
|
||||||
)
|
)
|
||||||
|
|
||||||
def migrate_down(self):
|
|
||||||
"""Perform migration down."""
|
|
||||||
raise NotImplementedError("Downward migrations not yet added")
|
|
||||||
|
|
||||||
async def run(self):
|
|
||||||
"""Run migrations."""
|
|
||||||
if await self.migrate_up_needed():
|
|
||||||
print("Performing migration up", flush=True)
|
|
||||||
try:
|
|
||||||
await self.migrate_up()
|
|
||||||
await self.set_db_version()
|
|
||||||
except OperationFailure as err:
|
|
||||||
print(f"Error running migration {self.MIGRATION_VERSION}: {err}")
|
|
||||||
return False
|
|
||||||
|
|
||||||
else:
|
|
||||||
print("No migration to apply - skipping", flush=True)
|
|
||||||
return False
|
|
||||||
|
|
||||||
print(f"Database successfully migrated to {self.MIGRATION_VERSION}", flush=True)
|
|
||||||
return True
|
|
||||||
|
@ -0,0 +1,27 @@
|
|||||||
|
"""
|
||||||
|
Migration 0002 - Dropping CrawlConfig crawl stats
|
||||||
|
"""
|
||||||
|
from btrixcloud.migrations import BaseMigration
|
||||||
|
|
||||||
|
|
||||||
|
MIGRATION_VERSION = "0002"
|
||||||
|
|
||||||
|
|
||||||
|
class Migration(BaseMigration):
|
||||||
|
"""Migration class."""
|
||||||
|
|
||||||
|
def __init__(self, mdb, migration_version=MIGRATION_VERSION):
|
||||||
|
super().__init__(mdb, migration_version)
|
||||||
|
|
||||||
|
async def migrate_up(self):
|
||||||
|
"""Perform migration up.
|
||||||
|
|
||||||
|
Drop crawl statistics fields from crawl_config collection documents
|
||||||
|
as these are now generated dynamically from a join as needed in API
|
||||||
|
endpoints.
|
||||||
|
"""
|
||||||
|
crawl_configs = self.mdb["crawl_configs"]
|
||||||
|
await crawl_configs.update_many({}, {"$unset": {"crawlCount": 1}})
|
||||||
|
await crawl_configs.update_many({}, {"$unset": {"lastCrawlId": 1}})
|
||||||
|
await crawl_configs.update_many({}, {"$unset": {"lastCrawlTime": 1}})
|
||||||
|
await crawl_configs.update_many({}, {"$unset": {"lastCrawlState": 1}})
|
Loading…
Reference in New Issue
Block a user