optimization: convert all uses of 'async for' to use iterator directly (#1229)

- optimization: convert all uses of 'async for' to use iterator directly instead of converting to list to avoid
unbounded size lists
- additional cursor.to_list() to async for conversions for stats computation, simply crawlconfigs stats computation

---------
Co-authored-by: Tessa Walsh <tessa@bitarchivist.net>
This commit is contained in:
Ilya Kreymer 2023-09-28 12:31:08 -07:00 committed by GitHub
parent cabf4ccc21
commit 7eac0fdf95
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 64 additions and 107 deletions

View File

@ -322,9 +322,7 @@ class CollectionOps:
total_size = 0 total_size = 0
tags = [] tags = []
cursor = self.crawls.find({"collectionIds": collection_id}) async for crawl in self.crawls.find({"collectionIds": collection_id}):
crawls = await cursor.to_list(length=10_000)
for crawl in crawls:
if crawl["state"] not in SUCCESSFUL_STATES: if crawl["state"] not in SUCCESSFUL_STATES:
continue continue
crawl_count += 1 crawl_count += 1

View File

@ -605,7 +605,7 @@ class CrawlConfigOps:
total = await self.config_revs.count_documents(match_query) total = await self.config_revs.count_documents(match_query)
cursor = self.config_revs.find({"cid": cid}, skip=skip, limit=page_size) cursor = self.config_revs.find({"cid": cid}, skip=skip, limit=page_size)
results = await cursor.to_list(length=1000) results = await cursor.to_list(length=page_size)
revisions = [ConfigRevision.from_dict(res) for res in results] revisions = [ConfigRevision.from_dict(res) for res in results]
return revisions, total return revisions, total
@ -704,8 +704,7 @@ class CrawlConfigOps:
descriptions = [description for description in descriptions if description] descriptions = [description for description in descriptions if description]
first_seeds = set() first_seeds = set()
configs = [config async for config in self.crawl_configs.find({"oid": org.id})] async for config in self.crawl_configs.find({"oid": org.id}):
for config in configs:
first_seed = config["config"]["seeds"][0]["url"] first_seed = config["config"]["seeds"][0]["url"]
first_seeds.add(first_seed) first_seeds.add(first_seed)
@ -802,39 +801,47 @@ async def stats_recompute_all(crawl_configs, crawls, cid: uuid.UUID):
} }
match_query = {"cid": cid, "finished": {"$ne": None}} match_query = {"cid": cid, "finished": {"$ne": None}}
cursor = crawls.find(match_query).sort("finished", pymongo.DESCENDING) count = await crawls.count_documents(match_query)
results = await cursor.to_list(length=10_000) if count:
if results: update_query["crawlCount"] = count
update_query["crawlCount"] = len(results)
update_query["crawlSuccessfulCount"] = len( total_size = 0
[res for res in results if res["state"] not in FAILED_STATES] successful_count = 0
)
last_crawl = results[0] last_crawl: Optional[dict[str, object]] = None
last_crawl_size = 0
last_crawl_finished = last_crawl.get("finished") async for res in crawls.find(match_query).sort("finished", pymongo.DESCENDING):
files = res.get("files", [])
crawl_size = 0
for file in files:
crawl_size += file.get("size", 0)
total_size += crawl_size
if res["state"] not in FAILED_STATES:
successful_count += 1
last_crawl = res
last_crawl_size = crawl_size
if last_crawl:
update_query["totalSize"] = total_size
update_query["crawlSuccessfulCount"] = successful_count
update_query["lastCrawlId"] = str(last_crawl.get("_id")) update_query["lastCrawlId"] = str(last_crawl.get("_id"))
update_query["lastCrawlStartTime"] = last_crawl.get("started") update_query["lastCrawlStartTime"] = last_crawl.get("started")
update_query["lastStartedBy"] = last_crawl.get("userid") update_query["lastStartedBy"] = last_crawl.get("userid")
update_query["lastStartedByName"] = last_crawl.get("userName") update_query["lastStartedByName"] = last_crawl.get("userName")
update_query["lastCrawlTime"] = last_crawl_finished
update_query["lastCrawlState"] = last_crawl.get("state") update_query["lastCrawlState"] = last_crawl.get("state")
update_query["lastCrawlSize"] = sum( update_query["lastCrawlSize"] = last_crawl_size
file_.get("size", 0) for file_ in last_crawl.get("files", [])
) last_crawl_finished = last_crawl.get("finished")
update_query["lastCrawlTime"] = last_crawl_finished
if last_crawl_finished: if last_crawl_finished:
update_query["lastRun"] = last_crawl_finished update_query["lastRun"] = last_crawl_finished
total_size = 0
for res in results:
files = res.get("files", [])
for file in files:
total_size += file.get("size", 0)
update_query["totalSize"] = total_size
result = await crawl_configs.find_one_and_update( result = await crawl_configs.find_one_and_update(
{"_id": cid, "inactive": {"$ne": True}}, {"_id": cid, "inactive": {"$ne": True}},
{"$set": update_query}, {"$set": update_query},

View File

@ -26,8 +26,7 @@ class Migration(BaseMigration):
crawl_configs = self.mdb["crawl_configs"] crawl_configs = self.mdb["crawl_configs"]
# Return early if there are no configs # Return early if there are no configs
crawl_config_results = [res async for res in crawl_configs.find({})] if not await crawl_configs.count_documents({}):
if not crawl_config_results:
return return
utc_now_datetime = datetime.utcnow().replace(microsecond=0, tzinfo=None) utc_now_datetime = datetime.utcnow().replace(microsecond=0, tzinfo=None)
@ -45,14 +44,9 @@ class Migration(BaseMigration):
await crawl_configs.update_many({}, [{"$set": {"modified": "$created"}}]) await crawl_configs.update_many({}, [{"$set": {"modified": "$created"}}])
await crawl_configs.update_many({}, {"$set": {"rev": 0}}) await crawl_configs.update_many({}, {"$set": {"rev": 0}})
# Return early if there are no crawls
crawl_results = [res async for res in crawls.find({})]
if not crawl_results:
return
await crawls.update_many({}, {"$set": {"cid_rev": 0}}) await crawls.update_many({}, {"$set": {"cid_rev": 0}})
for crawl_result in crawl_results: async for crawl_result in crawls.find({}):
config_result = await crawl_configs.find_one({"_id": crawl_result["cid"]}) config_result = await crawl_configs.find_one({"_id": crawl_result["cid"]})
if not config_result: if not config_result:
continue continue

View File

@ -25,11 +25,8 @@ class Migration(BaseMigration):
# Migrate workflows # Migrate workflows
crawl_configs = self.mdb["crawl_configs"] crawl_configs = self.mdb["crawl_configs"]
crawl_config_results = [res async for res in crawl_configs.find({})]
if not crawl_config_results:
return
for config_dict in crawl_config_results: async for config_dict in crawl_configs.find({}):
seeds_to_migrate = [] seeds_to_migrate = []
seed_dicts = [] seed_dicts = []
@ -65,9 +62,8 @@ class Migration(BaseMigration):
# Migrate seeds copied into crawls # Migrate seeds copied into crawls
crawls = self.mdb["crawls"] crawls = self.mdb["crawls"]
crawl_results = [res async for res in crawls.find({})]
for crawl_dict in crawl_results: async for crawl_dict in crawls.find({}):
seeds_to_migrate = [] seeds_to_migrate = []
seed_dicts = [] seed_dicts = []
@ -102,15 +98,13 @@ class Migration(BaseMigration):
) )
# Test migration # Test migration
crawl_config_results = [res async for res in crawl_configs.find({})] async for config_dict in crawl_configs.find({}):
for config_dict in crawl_config_results:
config = CrawlConfig.from_dict(config_dict) config = CrawlConfig.from_dict(config_dict)
for seed in config.config.seeds: for seed in config.config.seeds:
assert isinstance(seed, Seed) assert isinstance(seed, Seed)
assert seed.url assert seed.url
crawl_results = [res async for res in crawls.find({})] async for crawl_dict in crawls.find({}):
for crawl_dict in crawl_results:
crawl = Crawl.from_dict(crawl_dict) crawl = Crawl.from_dict(crawl_dict)
for seed in crawl.config.seeds: for seed in crawl.config.seeds:
assert isinstance(seed, Seed) assert isinstance(seed, Seed)

View File

@ -37,8 +37,7 @@ class Migration(BaseMigration):
{"schedule": {"$nin": ["", None]}}, {"schedule": {"$nin": ["", None]}},
] ]
} }
configs_to_update = [res async for res in crawl_configs.find(match_query)] async for config_dict in crawl_configs.find(match_query):
for config_dict in configs_to_update:
config = CrawlConfig.from_dict(config_dict) config = CrawlConfig.from_dict(config_dict)
print( print(
f"Updating Crawl Config {config.id}: schedule: {config.schedule}, " f"Updating Crawl Config {config.id}: schedule: {config.schedule}, "

View File

@ -24,11 +24,7 @@ class Migration(BaseMigration):
crawl_configs = self.mdb["crawl_configs"] crawl_configs = self.mdb["crawl_configs"]
crawls = self.mdb["crawls"] crawls = self.mdb["crawls"]
configs = [res async for res in crawl_configs.find({"inactive": {"$ne": True}})] async for config in crawl_configs.find({"inactive": {"$ne": True}}):
if not configs:
return
for config in configs:
config_id = config["_id"] config_id = config["_id"]
try: try:
await stats_recompute_all(crawl_configs, crawls, config_id) await stats_recompute_all(crawl_configs, crawls, config_id)

View File

@ -24,11 +24,7 @@ class Migration(BaseMigration):
crawls = self.mdb["crawls"] crawls = self.mdb["crawls"]
# Update workflows crawl stats to populate crawlSuccessfulCount # Update workflows crawl stats to populate crawlSuccessfulCount
configs = [res async for res in crawl_configs.find({"inactive": {"$ne": True}})] async for config in crawl_configs.find({"inactive": {"$ne": True}}):
if not configs:
return
for config in configs:
config_id = config["_id"] config_id = config["_id"]
try: try:
await stats_recompute_all(crawl_configs, crawls, config_id) await stats_recompute_all(crawl_configs, crawls, config_id)

View File

@ -23,11 +23,7 @@ class Migration(BaseMigration):
# pylint: disable=duplicate-code # pylint: disable=duplicate-code
crawls = self.mdb["crawls"] crawls = self.mdb["crawls"]
crawls_to_update = [res async for res in crawls.find({})] async for crawl in crawls.find({}):
if not crawls_to_update:
return
for crawl in crawls_to_update:
crawl_id = crawl["_id"] crawl_id = crawl["_id"]
try: try:
await recompute_crawl_file_count_and_size(crawls, crawl_id) await recompute_crawl_file_count_and_size(crawls, crawl_id)

View File

@ -22,11 +22,7 @@ class Migration(BaseMigration):
# pylint: disable=duplicate-code # pylint: disable=duplicate-code
coll_ops = CollectionOps(self.mdb, None, None, None) coll_ops = CollectionOps(self.mdb, None, None, None)
colls_to_update = [res async for res in coll_ops.collections.find({})] async for coll in coll_ops.collections.find({}):
if not colls_to_update:
return
for coll in colls_to_update:
coll_id = coll["_id"] coll_id = coll["_id"]
try: try:
await coll_ops.update_collection_counts_and_tags(coll_id) await coll_ops.update_collection_counts_and_tags(coll_id)

View File

@ -22,11 +22,7 @@ class Migration(BaseMigration):
crawls = self.mdb["crawls"] crawls = self.mdb["crawls"]
crawl_configs = self.mdb["crawl_configs"] crawl_configs = self.mdb["crawl_configs"]
configs = [res async for res in crawl_configs.find({"inactive": {"$ne": True}})] async for config in crawl_configs.find({"inactive": {"$ne": True}}):
if not configs:
return
for config in configs:
config_id = config["_id"] config_id = config["_id"]
try: try:
if not config.get("name"): if not config.get("name"):

View File

@ -23,25 +23,22 @@ class Migration(BaseMigration):
mdb_crawls = self.mdb["crawls"] mdb_crawls = self.mdb["crawls"]
mdb_profiles = self.mdb["profiles"] mdb_profiles = self.mdb["profiles"]
orgs = [res async for res in mdb_orgs.find({})] async for org in mdb_orgs.find({}):
for org in orgs:
oid = org.get("_id") oid = org.get("_id")
bytes_stored = 0 bytes_stored = 0
crawls = [res async for res in mdb_crawls.find({"oid": oid})] async for crawl in mdb_crawls.find({"oid": oid}):
for crawl in crawls:
for crawl_file in crawl.get("files", []): for crawl_file in crawl.get("files", []):
bytes_stored += crawl_file.get("size", 0) bytes_stored += crawl_file.get("size", 0)
profiles = [res async for res in mdb_profiles.find({"oid": oid})] async for profile in mdb_profiles.find({"oid": oid}):
for profile in profiles:
profile_file = profile.get("resource") profile_file = profile.get("resource")
if profile_file: if profile_file:
bytes_stored += profile_file.get("size", 0) bytes_stored += profile_file.get("size", 0)
try: try:
res = await mdb_orgs.find_one_and_update( await mdb_orgs.find_one_and_update(
{"_id": oid}, {"$set": {"bytesStored": bytes_stored}} {"_id": oid}, {"$set": {"bytesStored": bytes_stored}}
) )
# pylint: disable=broad-exception-caught # pylint: disable=broad-exception-caught

View File

@ -28,8 +28,7 @@ class Migration(BaseMigration):
# Update configmap for crawl configs that have non-zero timeout or scale > 1 # Update configmap for crawl configs that have non-zero timeout or scale > 1
match_query = {"schedule": {"$nin": ["", None]}} match_query = {"schedule": {"$nin": ["", None]}}
configs_to_update = [res async for res in crawl_configs.find(match_query)] async for config_dict in crawl_configs.find(match_query):
for config_dict in configs_to_update:
config = CrawlConfig.from_dict(config_dict) config = CrawlConfig.from_dict(config_dict)
print( print(
f"Updating CronJob for Crawl Config {config.id}: schedule: {config.schedule}" f"Updating CronJob for Crawl Config {config.id}: schedule: {config.schedule}"

View File

@ -23,33 +23,24 @@ class Migration(BaseMigration):
mdb_crawls = self.mdb["crawls"] mdb_crawls = self.mdb["crawls"]
mdb_profiles = self.mdb["profiles"] mdb_profiles = self.mdb["profiles"]
orgs = [res async for res in mdb_orgs.find({})] async for org in mdb_orgs.find({}):
for org in orgs:
oid = org.get("_id") oid = org.get("_id")
bytes_stored_crawls = 0 bytes_stored_crawls = 0
bytes_stored_uploads = 0 bytes_stored_uploads = 0
bytes_stored_profiles = 0 bytes_stored_profiles = 0
crawls = [ async for crawl in mdb_crawls.find(
res
async for res in mdb_crawls.find(
{"oid": oid, "type": {"$in": [None, "crawl"]}} {"oid": oid, "type": {"$in": [None, "crawl"]}}
) ):
]
for crawl in crawls:
for crawl_file in crawl.get("files", []): for crawl_file in crawl.get("files", []):
bytes_stored_crawls += crawl_file.get("size", 0) bytes_stored_crawls += crawl_file.get("size", 0)
uploads = [ async for upload in mdb_crawls.find({"oid": oid, "type": "upload"}):
res async for res in mdb_crawls.find({"oid": oid, "type": "upload"})
]
for upload in uploads:
for upload_file in upload.get("files", []): for upload_file in upload.get("files", []):
bytes_stored_uploads += upload_file.get("size", 0) bytes_stored_uploads += upload_file.get("size", 0)
profiles = [res async for res in mdb_profiles.find({"oid": oid})] async for profile in mdb_profiles.find({"oid": oid}):
for profile in profiles:
profile_file = profile.get("resource") profile_file = profile.get("resource")
if profile_file: if profile_file:
bytes_stored_profiles += profile_file.get("size", 0) bytes_stored_profiles += profile_file.get("size", 0)
@ -59,7 +50,7 @@ class Migration(BaseMigration):
) )
try: try:
res = await mdb_orgs.find_one_and_update( await mdb_orgs.find_one_and_update(
{"_id": oid}, {"_id": oid},
{ {
"$set": { "$set": {

View File

@ -349,9 +349,7 @@ class OrgOps:
upload_count = 0 upload_count = 0
page_count = 0 page_count = 0
cursor = self.crawls_db.find({"oid": org.id}) async for item in self.crawls_db.find({"oid": org.id}):
items = await cursor.to_list(length=10_000)
for item in items:
if item["state"] not in SUCCESSFUL_STATES: if item["state"] not in SUCCESSFUL_STATES:
continue continue
archived_item_count += 1 archived_item_count += 1