forked from mongodb-partners/MongoDB_Fabric_Mirroring
-
Notifications
You must be signed in to change notification settings - Fork 0
/
mongodb_generic_mirroring.py
73 lines (60 loc) · 2.13 KB
/
mongodb_generic_mirroring.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
import os
import logging
from threading import Thread
import pymongo
from dotenv import load_dotenv
import json
from init_sync import init_sync
from listening import listening
def mirror():
load_dotenv()
log_level = os.getenv("LOG_LEVEL", "INFO")
logging.basicConfig(level=log_level)
if (
not os.getenv("MONGO_CONN_STR")
or not os.getenv("MONGO_DB_NAME")
or not os.getenv("MONGO_COLLECTION")
or not os.getenv("LZ_URL")
or not os.getenv("APP_ID")
or not os.getenv("SECRET")
or not os.getenv("TENANT_ID")
or not os.getenv("INIT_LOAD_BATCH_SIZE")
or not os.getenv("DELTA_SYNC_BATCH_SIZE")
):
raise ValueError("Missing environment variable.")
mongodb_coll_name = os.getenv("MONGO_COLLECTION")
collection_list = []
if mongodb_coll_name == "all":
collection_list = __get_all_collections()
elif mongodb_coll_name.startswith("["):
collection_list = json.loads(mongodb_coll_name)
elif isinstance(mongodb_coll_name, str):
collection_list = [mongodb_coll_name]
else:
raise ValueError(
'Invalid parameter value: mongodb_coll_name. "\
"Expected a list of collection names, a str of a single collection"\
" name, or "all" for all collections in the database.'
)
threads: list[Thread] = []
for col in collection_list:
Thread(target=init_sync, args=(col,)).start()
# init_thread = Thread(target=init_sync, args=(col,))
# init_thread.start()
# threads.append(init_thread)
Thread(target=listening, args=(col,)).start()
# listener_thread = Thread(target=listening, args=(col,))
# listener_thread.start()
# threads.append(listener_thread)
# for thread in threads:
# thread.join()
while True:
cmd = input()
if cmd.lower() == "quit":
os._exit(0)
def __get_all_collections() -> list[str]:
client = pymongo.MongoClient(os.getenv("MONGO_CONN_STR"))
db = client[os.getenv("MONGO_DB_NAME")]
return db.list_collection_names()
if __name__ == "__main__":
mirror()