Skip to content

Commit

Permalink
fix multi db dump handling (#2488)
Browse files Browse the repository at this point in the history
it was only handling main db but that didn't work
for legal archive or archived collection.

SDESK-6872
  • Loading branch information
petrjasek authored Nov 9, 2023
1 parent 1913421 commit 2bde9fd
Showing 1 changed file with 67 additions and 55 deletions.
122 changes: 67 additions & 55 deletions superdesk/commands/data_manipulation.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
import platform
import shutil
import bz2
import pymongo.database

from multiprocessing import Process, Lock
from flask import current_app as app
import multiprocessing.synchronize
Expand Down Expand Up @@ -160,7 +162,7 @@ def parse_dump_file(
dump_file: Path,
single_file=True,
metadata_only: bool = False,
keep_existing: bool = False,
db: Optional[pymongo.database.Database] = None,
) -> dict:
"""Restore database from a single file
Expand All @@ -171,7 +173,8 @@ def parse_dump_file(
:return: metadata
"""
db = app.data.pymongo().db
if db is None:
db = app.data.pymongo().db
# we use a state machine to parse JSON progressively, and avoid memory issue for huge databases
if single_file:
collection_name = None
Expand All @@ -187,8 +190,6 @@ def parse_dump_file(
state = State.METADATA_OBJECT_EXPECTED
else:
state = State.COLLECTION_SQ_BRACKET
if not keep_existing:
collection.delete_many({})
metadata = {}
obj_buf = []
escaping = False
Expand Down Expand Up @@ -258,8 +259,6 @@ def parse_dump_file(
collection = db.get_collection(collection_name)
inserted = 0
print(f"parsing collection {collection_name!r}")
if not keep_existing:
collection.delete_many({})
elif c == "\\":
escaping = True
else:
Expand Down Expand Up @@ -399,56 +398,59 @@ def run(
dest_dir_p = Path(dest_dir)
dest_dir_p.mkdir(parents=True, exist_ok=True)
dest_path = dest_dir_p / name
db = app.data.pymongo().db
collections_names = [c["name"] for c in db.list_collections()]
dump_msg = "dumping {name} ({idx}/{total})"
metadata = {
"started": now,
"description": "",
}
if description:
metadata["description"] = description
if single:
dest_path = dest_path.with_suffix(".json.bz2")
with open_dump(dest_path, "w") as f:
f.write(f"{{{dumps(METADATA_KEY)}: {dumps(metadata)},")
dbs = get_dbs()
for db in dbs:
collections_names = [c["name"] for c in db.list_collections()]
dump_msg = "dumping {name} ({idx}/{total})"
metadata = {
"db": db.name,
"started": now,
"description": "",
}
if description:
metadata["description"] = description
if single:
dest_path = dest_path.with_suffix(".json.bz2")
with open_dump(dest_path, "w") as f:
f.write(f"{{{dumps(METADATA_KEY)}: {dumps(metadata)},")
for idx, name in enumerate(collections_names):
if collections and name in collections:
continue
print(dump_msg.format(name=name, idx=idx + 1, total=len(collections_names)))
f.write(f"{dumps(name)}:[")
collection = db.get_collection(name)
cursor = collection.find()
count = cursor.count()
for doc_idx, doc in enumerate(cursor):
f.write(f"{dumps(doc)}")
if doc_idx < count - 1:
f.write(",")
f.write("]")
if idx < (len(collections_names) - 1):
f.write(",")
f.write("}")
else:
db_dest_path = dest_path / db.name
db_dest_path.mkdir(parents=True)
metadata_path = db_dest_path / f"{METADATA_KEY}.json.bz2"
with open_dump(metadata_path, "w") as f:
f.write(dumps(metadata))
for idx, name in enumerate(collections_names):
if collections and name in collections:
continue
print(dump_msg.format(name=name, idx=idx + 1, total=len(collections_names)))
f.write(f"{dumps(name)}:[")
col_path = db_dest_path / f"{name}.json.bz2"
collection = db.get_collection(name)
cursor = collection.find()
count = cursor.count()
for doc_idx, doc in enumerate(cursor):
f.write(f"{dumps(doc)}")
if doc_idx < count - 1:
f.write(",")
f.write("]")
if idx < (len(collections_names) - 1):
f.write(",")
f.write("}")
else:
dest_path.mkdir()
metadata_path = dest_path / f"{METADATA_KEY}.json.bz2"
with open_dump(metadata_path, "w") as f:
f.write(dumps(metadata))
for idx, name in enumerate(collections_names):
if collections and name in collections:
continue
print(dump_msg.format(name=name, idx=idx + 1, total=len(collections_names)))
col_path = dest_path / f"{name}.json.bz2"
collection = db.get_collection(name)
with open_dump(col_path, "w") as f:
f.write("[")
cursor = collection.find()
count = cursor.count()
for doc_idx, doc in enumerate(cursor):
f.write(f"{dumps(doc)}")
if doc_idx < count - 1:
f.write(",")
f.write("]")
print(f"database dumped at {dest_path}")
with open_dump(col_path, "w") as f:
f.write("[")
cursor = collection.find()
count = cursor.count()
for doc_idx, doc in enumerate(cursor):
f.write(f"{dumps(doc)}")
if doc_idx < count - 1:
f.write(",")
f.write("]")
print(f"database {db.name} dumped at {dest_path}")


class StorageRestore(superdesk.Command):
Expand All @@ -466,9 +468,12 @@ class StorageRestore(superdesk.Command):
]

def run(self, dump_path: Union[Path, str], keep_existing: bool = False, no_flush: bool = False) -> None:
self.keep_existing = keep_existing
archive_path = get_dest_path(dump_path)
print("💾 restoring archive")
if keep_existing is False:
for db in get_dbs():
db.client.drop_database(db)
app.init_indexes()
if archive_path.is_file():
self.restore_file(archive_path)
elif archive_path.is_dir():
Expand All @@ -486,12 +491,14 @@ def run(self, dump_path: Union[Path, str], keep_existing: bool = False, no_flush
print("🏁 All done")

def restore_file(self, archive_path: Path):
parse_dump_file(archive_path, keep_existing=self.keep_existing)
parse_dump_file(archive_path)

def restore_dir(self, archive_path: Path):
"""Restore database from a dump directory"""
for collection_path in archive_path.glob("*.json.bz2"):
parse_dump_file(collection_path, single_file=False, keep_existing=self.keep_existing)
for db in get_dbs():
print("RESTORE", db.name)
for collection_path in (archive_path / db.name).glob("*.json.bz2"):
parse_dump_file(collection_path, single_file=False, db=db)

print("👷 restore finished")

Expand Down Expand Up @@ -668,6 +675,7 @@ def run(self, record_file: Union[Path, str], force_db_reset: bool = False, skip_
if confirm.lower() != "y":
print("Restoration cancelled")
sys.exit(1)
print("RESTORE")
StorageRestore().run(keep_existing=False, no_flush=True, dump_path=base_dump_p)
print(f"{INFO} restoring record from {datetime.fromtimestamp(metadata['started']).isoformat()}")
description = metadata.get("description")
Expand Down Expand Up @@ -964,3 +972,7 @@ def create(self, docs, **kwargs):
superdesk.command("storage:restore-record", StorageRestoreRecord())
superdesk.command("storage:list", StorageList())
superdesk.command("storage:upgrade-dumps", StorageMigrateDumps())


def get_dbs():
return [app.data.pymongo(prefix=prefix).db for prefix in [None, "ARCHIVED", "LEGAL_ARCHIVE", "CONTENTAPI_MONGO"]]

0 comments on commit 2bde9fd

Please sign in to comment.