From 3e1efa94afa214264ea69b257088ac2a2c5c62a3 Mon Sep 17 00:00:00 2001 From: Rob Svirskas Date: Mon, 17 Jul 2023 11:53:28 -0400 Subject: [PATCH] Added updates to ddb_published_versioned MongoDB collection --- bin/update_dynamodb_published_versioned.py | 154 +++++++-------------- 1 file changed, 52 insertions(+), 102 deletions(-) diff --git a/bin/update_dynamodb_published_versioned.py b/bin/update_dynamodb_published_versioned.py index 602323e..6baa0bd 100644 --- a/bin/update_dynamodb_published_versioned.py +++ b/bin/update_dynamodb_published_versioned.py @@ -4,19 +4,15 @@ import argparse from datetime import datetime -import json -import os +from operator import attrgetter import re import sys import time -from types import SimpleNamespace import boto3 -import colorlog import inquirer import MySQLdb -from pymongo import MongoClient -import requests from tqdm import tqdm +import jrc_common.jrc_common as JRC import neuronbridge_lib as NB # pylint: disable=W0703, E1101 @@ -33,6 +29,7 @@ FAILURE = {} KEYS = {} KNOWN_PPP = {} +DDB_NB = {} def terminate_program(msg=None): @@ -47,37 +44,6 @@ def terminate_program(msg=None): sys.exit(-1 if msg else 0) -def call_responder(server, endpoint): - ''' Call a responder - Keyword arguments: - server: server - endpoint: REST endpoint - Returns: - JSON response - ''' - url = ((getattr(getattr(REST, server), "url") if server else "") if "REST" in globals() \ - else (os.environ.get('CONFIG_SERVER_URL') if server else "")) + endpoint - try: - req = requests.get(url, timeout=10) - except requests.exceptions.RequestException as err: - terminate_program(err) - if req.status_code == 200: - return req.json() - terminate_program(f"Could not get response from {url}: {req.text}") - return False - - -def create_config_object(config): - """ Convert the JSON received from a configuration to an object - Keyword arguments: - config: configuration name - Returns: - Configuration object - """ - data = (call_responder("config", f"config/{config}"))["config"] - return json.loads(json.dumps(data), object_hook=lambda dat: SimpleNamespace(**dat)) - - def create_dynamodb_table(dynamodb, table): """ Create a DynamoDB table Keyword arguments: @@ -104,40 +70,6 @@ def create_dynamodb_table(dynamodb, table): table.wait_until_exists() -def sql_error(err): - """ Log a critical SQL error and exit - Keyword arguments: - err: error object - Returns: - None - """ - try: - msg = f"MySQL error [{err.args[0]}]: {err.args[1]}" - except IndexError: - msg = f"MySQL error: {err}" - return msg - - -def db_connect(dbc): - """ Connect to a database - Keyword arguments: - dbd: database object - Returns: - connector and cursor - """ - LOGGER.info("Connecting to %s on %s", dbc.name, dbc.host) - try: - conn = MySQLdb.connect(host=dbc.host, user=dbc.user, - passwd=dbc.password, db=dbc.name) - except MySQLdb.Error as err: - terminate_program(sql_error(err)) - try: - cursor = conn.cursor(MySQLdb.cursors.DictCursor) - except MySQLdb.Error as err: - terminate_program(sql_error(err)) - return conn, cursor - - def initialize_program(): """ Initialize the program Keyword arguments: @@ -145,22 +77,19 @@ def initialize_program(): Returns: None """ - dbconfig = create_config_object("db_config") + try: + dbconfig = JRC.get_config("databases") + except Exception as err: # pylint: disable=broad-exception-caught + terminate_program(err) # MySQL - LOGGER.info("Connecting to SAGE") - dbc = getattr(getattr(dbconfig, "sage"), "prod") - DATABASE['sage'] = {"conn": "", "cursor": ""} - DATABASE['sage']['conn'], DATABASE['sage']['cursor'] = db_connect(dbc) + dbo = attrgetter("sage.prod.read")(dbconfig) + LOGGER.info("Connecting to %s %s on %s as %s", dbo.name, 'prod', dbo.host, dbo.user) + DATABASE['sage'] = JRC.connect_database(dbo) # MongoDB - LOGGER.info("Connecting to neuronbridge MongoDB on %s", ARG.MONGO) - try: - dbc = getattr(getattr(dbconfig, "neuronbridge-mongo"), ARG.MONGO) - rset = 'rsProd' if ARG.MONGO == 'prod' else 'rsDev' - client = MongoClient(dbc.read.host, replicaSet=rset, username=dbc.read.user, - password=dbc.read.password) - DATABASE["NB"] = client.neuronbridge - except Exception as err: - terminate_program(f"Could not connect to Mongo: {err}") + rwp = "write" if ARG.WRITE else "read" + dbo = attrgetter(f"neuronbridge.{ARG.MONGO}.{rwp}")(dbconfig) + LOGGER.info("Connecting to %s %s on %s as %s", dbo.name, 'prod', dbo.host, dbo.user) + DATABASE['NB'] = JRC.connect_database(dbo) # DynamoDB if not ARG.VERSION: ARG.VERSION = NB.get_neuronbridge_version(DATABASE["NB"]["neuronMetadata"]) @@ -174,6 +103,7 @@ def initialize_program(): ver = ARG.VERSION if not ver.startswith("v"): ver = f"v{ARG.VERSION}" + ARG.DDBVERSION = ver table = "janelia-neuronbridge-published-" + ver if ARG.MANIFOLD != "prod": table += f"-{ARG.MANIFOLD}" @@ -209,7 +139,7 @@ def get_release(slide_code): DATABASE['sage']['cursor'].execute(sql, (slide_code,)) row = DATABASE['sage']['cursor'].fetchone() except MySQLdb.Error as err: - sql_error(err) + terminate_program(JRC.sql_error(err)) if row and row['alps_release']: FAILURE[slide_code] = f"Slide code {slide_code} is published to " \ + f"{row['alps_release']} in SAGE" @@ -227,9 +157,10 @@ def valid_row(row): if "publishedName" not in row or not row["publishedName"]: get_release(row['slideCode']) if row['slideCode'] not in FAILURE: - get_release(sample) + get_release(row['sourceRefId']) LOGGER.error("%s: %s", row['_id'], FAILURE[row['slideCode']]) - #LOGGER.error("Missing publishedName for %s (%s) in %s", row['_id'], row['slideCode'], row['libraryName']) + #LOGGER.error("Missing publishedName for %s (%s) in %s", row['_id'], row['slideCode'], + # row['libraryName']) COUNT["missing"] += 1 return False if row["publishedName"].lower() == "no consensus": @@ -239,6 +170,12 @@ def valid_row(row): def build_bodyid_list(bodyids): + ''' Build a list of body IDs with PPP results + Keyword arguments: + bodyids: list of body IDs + Returns: + List + ''' blist = [] for bid in bodyids: blist.append({bid: bid in KNOWN_PPP}) @@ -270,6 +207,18 @@ def batch_row(name, keytype, matches, bodyids=None): KEYS[name] = True +def update_ddb_nb(library): + ''' Update the DDB_NB dict + Keyword arguments: + library: library + Returns: + None + ''' + if library not in DDB_NB: + DDB_NB[library] = {"version": ARG.VERSION, "count": 0} + DDB_NB[library]["count"] += 1 + + def primary_update(rlist, matches): ''' Run primary update to batch simple items (publishingName and bodyID) Keyword arguments: @@ -284,6 +233,7 @@ def primary_update(rlist, matches): if row["libraryName"].startswith("flyem"): keytype = "bodyID" batch_row(name, keytype, matches[name]) + update_ddb_nb(row["libraryName"]) def add_neuron(neuron, ntype): @@ -426,7 +376,7 @@ def process_results(count, results): neurons[ntype][row[ntype]] = True if len(rlist) != len(matches): terminate_program(f"Unique primary list ({len(rlist)}) != match list({len(matches)})") - print(f"Libraries:") + print("Libraries:") liblen = cntlen = 0 for lib in library: if len(lib) > liblen: @@ -491,12 +441,23 @@ def update_dynamo(): for row in pppresults: KNOWN_PPP[row.split("-")[0]] = True process_results(count, results) + if not ARG.WRITE: + return + coll = DATABASE["NB"]["ddb_published_versioned"] + LOGGER.info("Updating ddb_published_versioned for version %s", ARG.DDBVERSION) + payload = coll.find_one({"dynamodb_version": ARG.DDBVERSION}) + if not payload: + payload = {"dynamodb_version": ARG.DDBVERSION, + "components": {}} + for lib in DDB_NB: + payload['components'][lib] = DDB_NB[lib] + results = coll.update_one({"dynamodb_version": ARG.DDBVERSION}, {"$set": payload}, upsert=True) if __name__ == '__main__': PARSER = argparse.ArgumentParser( description="Update a janelia-neuronbridge-published-* table") - PARSER.add_argument('--version', dest='VERSION', default='', help='NeuronBridge version') + PARSER.add_argument('--version', dest='VERSION', default='', help='NeuronBridge tag version') PARSER.add_argument('--ddbversion', dest='DDBVERSION', default='', help='DynamoDB NeuronBridge version') PARSER.add_argument('--mongo', dest='MONGO', action='store', @@ -513,18 +474,7 @@ def update_dynamo(): PARSER.add_argument('--debug', dest='DEBUG', action='store_true', default=False, help='Flag, Very chatty') ARG = PARSER.parse_args() - LOGGER = colorlog.getLogger() - ATTR = colorlog.colorlog.logging if "colorlog" in dir(colorlog) else colorlog - if ARG.DEBUG: - LOGGER.setLevel(ATTR.DEBUG) - elif ARG.VERBOSE: - LOGGER.setLevel(ATTR.INFO) - else: - LOGGER.setLevel(ATTR.WARNING) - HANDLER = colorlog.StreamHandler() - HANDLER.setFormatter(colorlog.ColoredFormatter()) - LOGGER.addHandler(HANDLER) - REST = create_config_object("rest_services") + LOGGER = JRC.setup_logging(ARG) initialize_program() update_dynamo() terminate_program()