Skip to content

Commit

Permalink
Added updates to ddb_published_versioned MongoDB collection
Browse files Browse the repository at this point in the history
  • Loading branch information
robsv committed Jul 17, 2023
1 parent 7f3bf6f commit 3e1efa9
Showing 1 changed file with 52 additions and 102 deletions.
154 changes: 52 additions & 102 deletions bin/update_dynamodb_published_versioned.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,19 +4,15 @@

import argparse
from datetime import datetime
import json
import os
from operator import attrgetter
import re
import sys
import time
from types import SimpleNamespace
import boto3
import colorlog
import inquirer
import MySQLdb
from pymongo import MongoClient
import requests
from tqdm import tqdm
import jrc_common.jrc_common as JRC
import neuronbridge_lib as NB

# pylint: disable=W0703, E1101
Expand All @@ -33,6 +29,7 @@
FAILURE = {}
KEYS = {}
KNOWN_PPP = {}
DDB_NB = {}


def terminate_program(msg=None):
Expand All @@ -47,37 +44,6 @@ def terminate_program(msg=None):
sys.exit(-1 if msg else 0)


def call_responder(server, endpoint):
''' Call a responder
Keyword arguments:
server: server
endpoint: REST endpoint
Returns:
JSON response
'''
url = ((getattr(getattr(REST, server), "url") if server else "") if "REST" in globals() \
else (os.environ.get('CONFIG_SERVER_URL') if server else "")) + endpoint
try:
req = requests.get(url, timeout=10)
except requests.exceptions.RequestException as err:
terminate_program(err)
if req.status_code == 200:
return req.json()
terminate_program(f"Could not get response from {url}: {req.text}")
return False


def create_config_object(config):
""" Convert the JSON received from a configuration to an object
Keyword arguments:
config: configuration name
Returns:
Configuration object
"""
data = (call_responder("config", f"config/{config}"))["config"]
return json.loads(json.dumps(data), object_hook=lambda dat: SimpleNamespace(**dat))


def create_dynamodb_table(dynamodb, table):
""" Create a DynamoDB table
Keyword arguments:
Expand All @@ -104,63 +70,26 @@ def create_dynamodb_table(dynamodb, table):
table.wait_until_exists()


def sql_error(err):
""" Log a critical SQL error and exit
Keyword arguments:
err: error object
Returns:
None
"""
try:
msg = f"MySQL error [{err.args[0]}]: {err.args[1]}"
except IndexError:
msg = f"MySQL error: {err}"
return msg


def db_connect(dbc):
""" Connect to a database
Keyword arguments:
dbd: database object
Returns:
connector and cursor
"""
LOGGER.info("Connecting to %s on %s", dbc.name, dbc.host)
try:
conn = MySQLdb.connect(host=dbc.host, user=dbc.user,
passwd=dbc.password, db=dbc.name)
except MySQLdb.Error as err:
terminate_program(sql_error(err))
try:
cursor = conn.cursor(MySQLdb.cursors.DictCursor)
except MySQLdb.Error as err:
terminate_program(sql_error(err))
return conn, cursor


def initialize_program():
""" Initialize the program
Keyword arguments:
None
Returns:
None
"""
dbconfig = create_config_object("db_config")
try:
dbconfig = JRC.get_config("databases")
except Exception as err: # pylint: disable=broad-exception-caught
terminate_program(err)
# MySQL
LOGGER.info("Connecting to SAGE")
dbc = getattr(getattr(dbconfig, "sage"), "prod")
DATABASE['sage'] = {"conn": "", "cursor": ""}
DATABASE['sage']['conn'], DATABASE['sage']['cursor'] = db_connect(dbc)
dbo = attrgetter("sage.prod.read")(dbconfig)
LOGGER.info("Connecting to %s %s on %s as %s", dbo.name, 'prod', dbo.host, dbo.user)
DATABASE['sage'] = JRC.connect_database(dbo)
# MongoDB
LOGGER.info("Connecting to neuronbridge MongoDB on %s", ARG.MONGO)
try:
dbc = getattr(getattr(dbconfig, "neuronbridge-mongo"), ARG.MONGO)
rset = 'rsProd' if ARG.MONGO == 'prod' else 'rsDev'
client = MongoClient(dbc.read.host, replicaSet=rset, username=dbc.read.user,
password=dbc.read.password)
DATABASE["NB"] = client.neuronbridge
except Exception as err:
terminate_program(f"Could not connect to Mongo: {err}")
rwp = "write" if ARG.WRITE else "read"
dbo = attrgetter(f"neuronbridge.{ARG.MONGO}.{rwp}")(dbconfig)
LOGGER.info("Connecting to %s %s on %s as %s", dbo.name, 'prod', dbo.host, dbo.user)
DATABASE['NB'] = JRC.connect_database(dbo)
# DynamoDB
if not ARG.VERSION:
ARG.VERSION = NB.get_neuronbridge_version(DATABASE["NB"]["neuronMetadata"])
Expand All @@ -174,6 +103,7 @@ def initialize_program():
ver = ARG.VERSION
if not ver.startswith("v"):
ver = f"v{ARG.VERSION}"
ARG.DDBVERSION = ver
table = "janelia-neuronbridge-published-" + ver
if ARG.MANIFOLD != "prod":
table += f"-{ARG.MANIFOLD}"
Expand Down Expand Up @@ -209,7 +139,7 @@ def get_release(slide_code):
DATABASE['sage']['cursor'].execute(sql, (slide_code,))
row = DATABASE['sage']['cursor'].fetchone()
except MySQLdb.Error as err:
sql_error(err)
terminate_program(JRC.sql_error(err))
if row and row['alps_release']:
FAILURE[slide_code] = f"Slide code {slide_code} is published to " \
+ f"{row['alps_release']} in SAGE"
Expand All @@ -227,9 +157,10 @@ def valid_row(row):
if "publishedName" not in row or not row["publishedName"]:
get_release(row['slideCode'])
if row['slideCode'] not in FAILURE:
get_release(sample)
get_release(row['sourceRefId'])
LOGGER.error("%s: %s", row['_id'], FAILURE[row['slideCode']])
#LOGGER.error("Missing publishedName for %s (%s) in %s", row['_id'], row['slideCode'], row['libraryName'])
#LOGGER.error("Missing publishedName for %s (%s) in %s", row['_id'], row['slideCode'],
# row['libraryName'])
COUNT["missing"] += 1
return False
if row["publishedName"].lower() == "no consensus":
Expand All @@ -239,6 +170,12 @@ def valid_row(row):


def build_bodyid_list(bodyids):
''' Build a list of body IDs with PPP results
Keyword arguments:
bodyids: list of body IDs
Returns:
List
'''
blist = []
for bid in bodyids:
blist.append({bid: bid in KNOWN_PPP})
Expand Down Expand Up @@ -270,6 +207,18 @@ def batch_row(name, keytype, matches, bodyids=None):
KEYS[name] = True


def update_ddb_nb(library):
''' Update the DDB_NB dict
Keyword arguments:
library: library
Returns:
None
'''
if library not in DDB_NB:
DDB_NB[library] = {"version": ARG.VERSION, "count": 0}
DDB_NB[library]["count"] += 1


def primary_update(rlist, matches):
''' Run primary update to batch simple items (publishingName and bodyID)
Keyword arguments:
Expand All @@ -284,6 +233,7 @@ def primary_update(rlist, matches):
if row["libraryName"].startswith("flyem"):
keytype = "bodyID"
batch_row(name, keytype, matches[name])
update_ddb_nb(row["libraryName"])


def add_neuron(neuron, ntype):
Expand Down Expand Up @@ -426,7 +376,7 @@ def process_results(count, results):
neurons[ntype][row[ntype]] = True
if len(rlist) != len(matches):
terminate_program(f"Unique primary list ({len(rlist)}) != match list({len(matches)})")
print(f"Libraries:")
print("Libraries:")
liblen = cntlen = 0
for lib in library:
if len(lib) > liblen:
Expand Down Expand Up @@ -491,12 +441,23 @@ def update_dynamo():
for row in pppresults:
KNOWN_PPP[row.split("-")[0]] = True
process_results(count, results)
if not ARG.WRITE:
return
coll = DATABASE["NB"]["ddb_published_versioned"]
LOGGER.info("Updating ddb_published_versioned for version %s", ARG.DDBVERSION)
payload = coll.find_one({"dynamodb_version": ARG.DDBVERSION})
if not payload:
payload = {"dynamodb_version": ARG.DDBVERSION,
"components": {}}
for lib in DDB_NB:
payload['components'][lib] = DDB_NB[lib]
results = coll.update_one({"dynamodb_version": ARG.DDBVERSION}, {"$set": payload}, upsert=True)


if __name__ == '__main__':
PARSER = argparse.ArgumentParser(
description="Update a janelia-neuronbridge-published-* table")
PARSER.add_argument('--version', dest='VERSION', default='', help='NeuronBridge version')
PARSER.add_argument('--version', dest='VERSION', default='', help='NeuronBridge tag version')
PARSER.add_argument('--ddbversion', dest='DDBVERSION', default='',
help='DynamoDB NeuronBridge version')
PARSER.add_argument('--mongo', dest='MONGO', action='store',
Expand All @@ -513,18 +474,7 @@ def update_dynamo():
PARSER.add_argument('--debug', dest='DEBUG', action='store_true',
default=False, help='Flag, Very chatty')
ARG = PARSER.parse_args()
LOGGER = colorlog.getLogger()
ATTR = colorlog.colorlog.logging if "colorlog" in dir(colorlog) else colorlog
if ARG.DEBUG:
LOGGER.setLevel(ATTR.DEBUG)
elif ARG.VERBOSE:
LOGGER.setLevel(ATTR.INFO)
else:
LOGGER.setLevel(ATTR.WARNING)
HANDLER = colorlog.StreamHandler()
HANDLER.setFormatter(colorlog.ColoredFormatter())
LOGGER.addHandler(HANDLER)
REST = create_config_object("rest_services")
LOGGER = JRC.setup_logging(ARG)
initialize_program()
update_dynamo()
terminate_program()

0 comments on commit 3e1efa9

Please sign in to comment.