diff --git a/backend/entityservice/async_worker.py b/backend/entityservice/async_worker.py index 33fd4710..34bd8cdc 100644 --- a/backend/entityservice/async_worker.py +++ b/backend/entityservice/async_worker.py @@ -3,11 +3,12 @@ import os import random -import anonlink import minio +import structlog from celery import Celery, chord -from celery.signals import after_setup_task_logger, after_setup_logger -from celery.utils.log import get_task_logger +from celery.signals import setup_logging, after_setup_task_logger, after_setup_logger, task_prerun + +import anonlink from entityservice import cache from entityservice.database import * @@ -33,20 +34,30 @@ celery.conf.CELERY_ACKS_LATE = config.CELERY_ACKS_LATE celery.conf.CELERY_ROUTES = config.CELERY_ROUTES - -@after_setup_logger.connect -@after_setup_task_logger.connect -def init_celery_logger(logger, **kwargs): - level = logging.DEBUG if config.DEBUG else logging.INFO - - for handler in logger.handlers[:]: - handler.setFormatter(config.consoleFormat) - handler.setLevel(level) - - logger.debug("Set logging up") +structlog.configure( + processors=[ + structlog.stdlib.add_logger_name, + structlog.stdlib.add_log_level, + structlog.stdlib.PositionalArgumentsFormatter(), + structlog.processors.StackInfoRenderer(), + structlog.processors.format_exc_info, + structlog.dev.ConsoleRenderer() + ], + context_class=structlog.threadlocal.wrap_dict(dict), + logger_factory=structlog.stdlib.LoggerFactory(), +) + + +@task_prerun.connect +def configure_structlog(sender, body=None, **kwargs): + logger = structlog.get_logger() + logger.new( + task_id=kwargs['task_id'], + task_name=sender.__name__ + ) -logger = get_task_logger(__name__) +logger = structlog.wrap_logger(logging.getLogger('celery')) logger.info("Setting up celery worker") @@ -98,10 +109,11 @@ def convert_mapping_to_list(permutation): @celery.task(base=BaseTask, ignore_result=True) def handle_raw_upload(project_id, dp_id, receipt_token): - logger.info("Project {} - handling user uploaded file for dp: {}".format(project_id, dp_id)) + log = logger.bind(pid=project_id, dp_id=dp_id) + log.info("Handling user uploaded file") with DBConn() as db: expected_size = get_number_of_hashes(db, dp_id) - logger.info("Expecting to handle {} hashes".format(expected_size)) + log.info("Expecting to handle {} hashes".format(expected_size)) mc = connect_to_object_store() @@ -120,31 +132,31 @@ def handle_raw_upload(project_id, dp_id, receipt_token): clkcounts = [] def filter_generator(): - logger.debug("Deserializing json filters") + log.debug("Deserializing json filters") for i, line in enumerate(text_stream): ba = deserialize_bitarray(line) yield (ba, i, ba.count()) clkcounts.append(ba.count()) - logger.info("Processed {} hashes".format(len(clkcounts))) + log.info("Processed {} hashes".format(len(clkcounts))) python_filters = filter_generator() # If small enough preload the data into our redis cache if expected_size < config.ENTITY_CACHE_THRESHOLD: - logger.info("Caching pickled clk data") + log.info("Caching pickled clk data") python_filters = list(python_filters) cache.set_deserialized_filter(dp_id, python_filters) else: - logger.info("Not caching clk data as it is too large") + log.info("Not caching clk data as it is too large") packed_filters = binary_pack_filters(python_filters) packed_filter_stream = iterable_to_stream(packed_filters) - logger.debug("Creating binary file with index, base64 encoded CLK, popcount") + log.debug("Creating binary file with index, base64 encoded CLK, popcount") # Upload to object store - logger.info("Uploading binary packed clks to object store. Size: {}".format(fmt_bytes(num_bytes))) + log.info("Uploading binary packed clks to object store. Size: {}".format(fmt_bytes(num_bytes))) mc.put_object(config.MINIO_BUCKET, filename, data=packed_filter_stream, length=num_bytes) with DBConn() as conn: @@ -153,7 +165,7 @@ def filter_generator(): # Now work out if all parties have added their data if clks_uploaded_to_project(project_id, check_data_ready=True): - logger.info("Project {} - All parties data present. Scheduling any queued runs".format(project_id)) + log.info("All parties' data present. Scheduling any queued runs") check_for_executable_runs.delay(project_id) @@ -163,16 +175,17 @@ def check_for_executable_runs(project_id): This is called when a run is posted (if project is ready for runs), and also after all dataproviders have uploaded CLKs, and the CLKS are ready. """ - logger.info("Checking for runs that need to be executed for project {}".format(project_id)) + log = logger.bind(pid=project_id) + log.info("Checking for runs that need to be executed") if not clks_uploaded_to_project(project_id, check_data_ready=True): return with DBConn() as conn: new_runs = get_created_runs_and_queue(conn, project_id) - logger.info("Creating tasks for {} created runs for project {}".format(len(new_runs), project_id)) + log.info("Creating tasks for {} created runs for project {}".format(len(new_runs), project_id)) for qr in new_runs: run_id = qr[0] - logger.info('Queueing run {} for computation'.format(qr)) + log.info('Queueing run for computation', run_id=run_id) # Record that the run has reached a new stage progress_stage(conn, run_id) compute_run.delay(project_id, run_id) @@ -180,42 +193,43 @@ def check_for_executable_runs(project_id): @celery.task(base=BaseTask, ignore_result=True) def compute_run(project_id, run_id): - logger.debug("Sanity check that we need to compute run") + log = logger.bind(pid=project_id, run_id=run_id) + log.debug("Sanity check that we need to compute run") with DBConn() as conn: res = get_run(conn, run_id) if res is None: - logger.info(f"Run '{run_id}' not found. Skipping") + log.info(f"Run not found. Skipping") raise RunDeleted(run_id) if res['state'] in {'completed', 'error'}: - logger.info("Run '{}' is already finished. Skipping".format(run_id)) + log.info("Run is already finished. Skipping") return - logger.debug("Setting run as in progress") + log.debug("Setting run as in progress") update_run_set_started(conn, run_id) threshold = res['threshold'] - logger.debug("Getting dp ids for compute similarity task") + log.debug("Getting dp ids for compute similarity task") dp_ids = get_dataprovider_ids(conn, project_id) - logger.debug("Data providers: {}".format(dp_ids)) + log.debug("Data providers: {}".format(dp_ids)) assert len(dp_ids) == 2, "Only support two party comparisons at the moment" compute_similarity.delay(project_id, run_id, dp_ids, threshold) - logger.info("CLK similarity computation scheduled") + log.info("CLK similarity computation scheduled") @celery.task(base=BaseTask, ignore_result=True) def compute_similarity(project_id, run_id, dp_ids, threshold): - + log = logger.bind(pid=project_id, run_id=run_id) assert len(dp_ids) >= 2, "Expected at least 2 data providers" - logger.info("Starting comparison of CLKs from data provider ids: {}, {}".format(dp_ids[0], dp_ids[1])) + log.info("Starting comparison of CLKs from data provider ids: {}, {}".format(dp_ids[0], dp_ids[1])) conn = connect_db() dataset_sizes = get_project_dataset_sizes(conn, project_id) if len(dataset_sizes) < 2: - logger.warning("Unexpected number of dataset sizes in db. Stopping") + log.warning("Unexpected number of dataset sizes in db. Stopping") update_run_mark_failure(conn, run_id) return else: @@ -230,18 +244,18 @@ def compute_similarity(project_id, run_id, dp_ids, threshold): #lenf2 = len(filters2) size = lenf1 * lenf2 - logger.info("Computing similarity for {} x {} entities".format(lenf1, lenf2)) + log.info("Computing similarity for {} x {} entities".format(lenf1, lenf2)) - logger.info("Computing similarity using greedy method") + log.info("Computing similarity using greedy method") filters1_object_filename = get_filter_metadata(conn, dp_ids[0]) filters2_object_filename = get_filter_metadata(conn, dp_ids[1]) - logger.debug("Chunking computation task") + log.debug("Chunking computation task") chunk_size = config.get_task_chunk_size(size, threshold) if chunk_size is None: chunk_size = max(lenf1, lenf2) - logger.info("Chunks will contain {} entities per task".format(chunk_size)) + log.info("Chunks will contain {} entities per task".format(chunk_size)) update_run_chunk(conn, project_id, chunk_size) job_chunks = [] @@ -262,7 +276,7 @@ def compute_similarity(project_id, run_id, dp_ids, threshold): for dp2_chunk in dp2_chunks: job_chunks.append((dp1_chunk, dp2_chunk, )) - logger.info("Chunking into {} computation tasks each with (at most) {} entities.".format( + log.info("Chunking into {} computation tasks each with (at most) {} entities.".format( len(job_chunks), chunk_size)) # Prepare the Celery Chord that will compute all the similarity scores: @@ -296,7 +310,8 @@ def on_chord_error(*args, **kwargs): @celery.task(base=BaseTask, ignore_result=True) def save_and_permute(similarity_result, project_id, run_id): - logger.debug("Saving and possibly permuting data") + log = logger.bind(pid=project_id, run_id=run_id) + log.debug("Saving and possibly permuting data") mapping = similarity_result['mapping'] # Note Postgres requires JSON object keys to be strings @@ -306,15 +321,15 @@ def save_and_permute(similarity_result, project_id, run_id): result_type = get_project_column(db, project_id, 'result_type') # Just save the raw "mapping" - logger.debug("Saving the resulting map data to the db") + log.debug("Saving the resulting map data to the db") result_id = insert_mapping_result(db, run_id, mapping) dp_ids = get_dataprovider_ids(db, project_id) - logger.info("Mapping result saved to db with id {}".format(result_id)) + log.info("Mapping result saved to db with result id {}".format(result_id)) if result_type == "permutations": - logger.debug("Submitting job to permute mapping") + log.debug("Submitting job to permute mapping") permute_mapping_data.apply_async( ( project_id, @@ -324,11 +339,11 @@ def save_and_permute(similarity_result, project_id, run_id): ) ) else: - logger.debug("Mark mapping job as complete") + log.debug("Mark mapping job as complete") mark_mapping_complete.delay(run_id) # Post similarity computation cleanup - logger.debug("Removing clk filters from redis cache") + log.debug("Removing clk filters from redis cache") for dp_id in dp_ids: cache.remove_from_cache(dp_id) @@ -345,14 +360,15 @@ def permute_mapping_data(project_id, run_id, len_filters1, len_filters2): :param len_filters2: """ + log = logger.bind(pid=project_id, run_id=run_id) db = connect_db() mapping_str = get_run_result(db, run_id) # Convert to int: int mapping = {int(k): int(mapping_str[k]) for k in mapping_str} - logger.info("Creating random permutations") - logger.debug("Entities in dataset A: {}, Entities in dataset B: {}".format(len_filters1, len_filters2)) + log.info("Creating random permutations") + log.debug("Entities in dataset A: {}, Entities in dataset B: {}".format(len_filters1, len_filters2)) """ Pack all the entities that match in the **same** random locations in both permutations. @@ -361,7 +377,7 @@ def permute_mapping_data(project_id, run_id, len_filters1, len_filters2): Dictionaries first, then converted to lists. """ smaller_dataset_size = min(len_filters1, len_filters2) - logger.debug("Smaller dataset size is {}".format(smaller_dataset_size)) + log.debug("Smaller dataset size is {}".format(smaller_dataset_size)) number_in_common = len(mapping) a_permutation = {} # Should be length of filters1 b_permutation = {} # length of filters2 @@ -371,9 +387,9 @@ def permute_mapping_data(project_id, run_id, len_filters1, len_filters2): # start with all the possible indexes remaining_new_indexes = list(range(smaller_dataset_size)) - logger.info("Shuffling indices for matched entities") + log.info("Shuffling indices for matched entities") random.shuffle(remaining_new_indexes) - logger.info("Assigning random indexes for {} matched entities".format(number_in_common)) + log.info("Assigning random indexes for {} matched entities".format(number_in_common)) for mapping_number, a_index in enumerate(mapping): b_index = mapping[a_index] @@ -388,7 +404,7 @@ def permute_mapping_data(project_id, run_id, len_filters1, len_filters2): mask[mapping_index] = True remaining_new_indexes = set(remaining_new_indexes[number_in_common:]) - logger.info("Randomly adding all non matched entities") + log.info("Randomly adding all non matched entities") # Note the a and b datasets could be of different size. # At this point, both still have to use the remaining_new_indexes, and any @@ -396,7 +412,7 @@ def permute_mapping_data(project_id, run_id, len_filters1, len_filters2): remaining_a_values = list(set(range(smaller_dataset_size, len_filters1)).union(remaining_new_indexes)) remaining_b_values = list(set(range(smaller_dataset_size, len_filters2)).union(remaining_new_indexes)) - logger.debug("Shuffle the remaining indices") + log.debug("Shuffle the remaining indices") random.shuffle(remaining_a_values) random.shuffle(remaining_b_values) @@ -423,7 +439,7 @@ def permute_mapping_data(project_id, run_id, len_filters1, len_filters2): mapping_index = remaining_b_values.pop() b_permutation[b_index] = mapping_index - logger.debug("Completed creating new permutations for each party") + log.debug("Completed creating new permutations for each party") with db.cursor() as cur: dp_ids = get_dataprovider_ids(db, project_id) @@ -432,20 +448,20 @@ def permute_mapping_data(project_id, run_id, len_filters1, len_filters2): # We convert here because celery and dicts with int keys don't play nice perm_list = convert_mapping_to_list(permutation) - logger.debug("Saving a permutation") + log.debug("Saving a permutation") insert_permutation(cur, dp_ids[i], run_id, perm_list) - logger.debug("Raw permutation data saved. Now saving raw mask") + log.debug("Raw permutation data saved. Now saving raw mask") # Convert the mask dict to a list of 0/1 ints mask_list = convert_mapping_to_list({ int(key): 1 if value else 0 for key, value in mask.items()}) - logger.debug("Saving the mask") + log.debug("Saving the mask") insert_permutation_mask(cur, project_id, run_id, mask_list) - logger.info("Mask saved") + log.info("Mask saved") db.commit() mark_mapping_complete.delay(run_id) @@ -453,11 +469,12 @@ def permute_mapping_data(project_id, run_id, len_filters1, len_filters2): @celery.task(base=BaseTask, ignore_results=True) def mark_mapping_complete(run_id): - logger.debug("Marking run complete") + log = logger.bind(run_id=run_id) + log.debug("Marking run complete") with DBConn() as db: update_run_mark_complete(db, run_id) calculate_comparison_rate.delay() - logger.info("Run {} marked as complete".format(run_id)) + log.info("Run marked as complete") @celery.task(base=BaseTask) @@ -474,24 +491,25 @@ def compute_filter_similarity(chunk_info_dp1, chunk_info_dp2, project_id, run_id :param project_id: :param threshold: """ - logger.debug("Computing similarity for a chunk of filters") + log = logger.bind(pid=project_id, run_id=run_id) + log.debug("Computing similarity for a chunk of filters") - logger.debug("Checking that the resource exists (in case of job being canceled)") + log.debug("Checking that the resource exists (in case of job being canceled)") with DBConn() as db: if not check_run_exists(db, project_id, run_id): - logger.info("Skipping as resource not found.") + log.info("Skipping as resource not found.") return None t0 = time.time() - logger.debug("Fetching and deserializing chunk of filters for dataprovider 1") + log.debug("Fetching and deserializing chunk of filters for dataprovider 1") chunk_dp1, chunk_dp1_size = get_chunk_from_object_store(chunk_info_dp1) t1 = time.time() - logger.debug("Fetching and deserializing chunk of filters for dataprovider 2") + log.debug("Fetching and deserializing chunk of filters for dataprovider 2") chunk_dp2, chunk_dp2_size = get_chunk_from_object_store(chunk_info_dp2) t2 = time.time() - logger.debug("Calculating filter similarity") + log.debug("Calculating filter similarity") chunk_results = anonlink.entitymatch.calculate_filter_similarity(chunk_dp1, chunk_dp2, threshold=threshold, k=min(chunk_dp1_size, chunk_dp2_size), @@ -509,7 +527,7 @@ def compute_filter_similarity(chunk_info_dp1, chunk_info_dp2, project_id, run_id offset_dp1 = chunk_info_dp1[1] offset_dp2 = chunk_info_dp2[1] - logger.debug("Offset DP1 by: {}, DP2 by: {}".format(offset_dp1, offset_dp2)) + log.debug("Offset DP1 by: {}, DP2 by: {}".format(offset_dp1, offset_dp2)) for (ia, score, ib) in chunk_results: partial_sparse_result.append((ia + offset_dp1, ib + offset_dp2, score)) @@ -518,7 +536,7 @@ def compute_filter_similarity(chunk_info_dp1, chunk_info_dp2, project_id, run_id num_results = len(partial_sparse_result) if num_results > 0: result_filename = 'chunk-res-{}.csv'.format(generate_code(12)) - logger.info("Writing {} intermediate results to file: {}".format(num_results, result_filename)) + log.info("Writing {} intermediate results to file: {}".format(num_results, result_filename)) with open(result_filename, 'wt') as f: csvwriter = csv.writer(f) @@ -530,7 +548,7 @@ def compute_filter_similarity(chunk_info_dp1, chunk_info_dp2, project_id, run_id try: mc.fput_object(config.MINIO_BUCKET, result_filename, result_filename) except minio.ResponseError as err: - logger.warning("Failed to store result in minio") + log.warning("Failed to store result in minio") raise # If we don't delete the file we *do* run out of space @@ -539,8 +557,8 @@ def compute_filter_similarity(chunk_info_dp1, chunk_info_dp2, project_id, run_id result_filename = None t6 = time.time() - logger.info("run={} Comparisons: {}, Links above threshold: {}".format(run_id, comparisons_computed, len(chunk_results))) - logger.info("Prep: {:.3f} + {:.3f}, Solve: {:.3f}, Progress: {:.3f}, Offset: {:.3f}, Save: {:.3f}, Total: {:.3f}".format( + log.info("run={} Comparisons: {}, Links above threshold: {}".format(run_id, comparisons_computed, len(chunk_results))) + log.info("Prep: {:.3f} + {:.3f}, Solve: {:.3f}, Progress: {:.3f}, Offset: {:.3f}, Save: {:.3f}, Total: {:.3f}".format( t1 - t0, t2 - t1, t3 - t2, @@ -566,7 +584,7 @@ def get_chunk_from_object_store(chunk_info): def save_current_progress(comparisons, run_id): - logger.debug("Progress. Compared {} CLKS".format(comparisons)) + logger.debug("Progress. Compared {} CLKS".format(comparisons), run_id=run_id) cache.update_progress(comparisons, run_id) @@ -576,6 +594,7 @@ def save_current_progress(comparisons, run_id): autoretry_for=(minio.ResponseError,), retry_backoff=True) def aggregate_filter_chunks(similarity_result_files, project_id, run_id): + log = logger.bind(pid=project_id, run_id=run_id) if similarity_result_files is None: return mc = connect_to_object_store() files = [] @@ -585,12 +604,12 @@ def aggregate_filter_chunks(similarity_result_files, project_id, run_id): files.append(filename) data_size += mc.stat_object(config.MINIO_BUCKET, filename).size - logger.debug("Aggregating result chunks from {} files, total size: {}".format( + log.debug("Aggregating result chunks from {} files, total size: {}".format( len(files), fmt_bytes(data_size))) result_file_stream_generator = (mc.get_object(config.MINIO_BUCKET, result_filename) for result_filename in files) - logger.info("Similarity score results are {}".format(fmt_bytes(data_size))) + log.info("Similarity score results are {}".format(fmt_bytes(data_size))) result_stream = chain_streams(result_file_stream_generator) with DBConn() as db: @@ -610,14 +629,14 @@ def aggregate_filter_chunks(similarity_result_files, project_id, run_id): # DB now committed, we can fire off tasks that depend on the new db state if result_type == "similarity_scores": - logger.info("Deleting intermediate similarity score files from object store") + log.info("Deleting intermediate similarity score files from object store") mc.remove_objects(config.MINIO_BUCKET, files) - logger.debug("Removing clk filters from redis cache") + log.debug("Removing clk filters from redis cache") cache.remove_from_cache(dp_ids[0]) cache.remove_from_cache(dp_ids[1]) # Complete the run - logger.info("Marking run {} as complete".format(run_id)) + log.info("Marking run as complete") mark_mapping_complete.delay(run_id) else: solver_task.delay(result_filename, project_id, run_id, lenf1, lenf2) @@ -625,18 +644,19 @@ def aggregate_filter_chunks(similarity_result_files, project_id, run_id): @celery.task(base=BaseTask, ignore_result=True) def solver_task(similarity_scores_filename, project_id, run_id, lenf1, lenf2): + log = logger.bind(pid=project_id, run_id=run_id) mc = connect_to_object_store() score_file = mc.get_object(config.MINIO_BUCKET, similarity_scores_filename) - logger.debug("Creating python sparse matrix from bytes data") + log.debug("Creating python sparse matrix from bytes data") sparse_matrix = similarity_matrix_from_csv_bytes(score_file.data) - logger.info("Calculating the optimal mapping from similarity matrix") + log.info("Calculating the optimal mapping from similarity matrix") mapping = anonlink.entitymatch.greedy_solver(sparse_matrix) - logger.debug("Converting all indices to strings") + log.debug("Converting all indices to strings") for key in mapping: mapping[key] = str(mapping[key]) - logger.info("Entity mapping has been computed") + log.info("Entity mapping has been computed") res = { "mapping": mapping, @@ -657,9 +677,10 @@ def store_similarity_scores(buffer, run_id, length, conn): :param run_id: :param length: """ + log = logger.bind(run_id=run_id) filename = config.SIMILARITY_SCORES_FILENAME_FMT.format(run_id) - logger.info("Storing similarity score results in CSV file: {}".format(filename)) + log.info("Storing similarity score results in CSV file: {}".format(filename)) mc = connect_to_object_store() mc.put_object( config.MINIO_BUCKET, @@ -670,11 +691,11 @@ def store_similarity_scores(buffer, run_id, length, conn): ) try: - logger.debug("Storing the CSV filename '{}' in the database".format(filename)) + log.debug("Storing the CSV filename '{}' in the database".format(filename)) result_id = insert_similarity_score_file(conn, run_id, filename) - logger.debug("Saved path to similarity scores file to db with id {}".format(result_id)) + log.debug("Saved path to similarity scores file to db with id {}".format(result_id)) except psycopg2.IntegrityError: - logger.info("Error saving similarity score filename to database. Suspect that project has been deleted") + log.info("Error saving similarity score filename to database. Suspect that project has been deleted") raise RunDeleted(run_id) return filename diff --git a/backend/entityservice/cache.py b/backend/entityservice/cache.py index 0c3beb50..61330720 100644 --- a/backend/entityservice/cache.py +++ b/backend/entityservice/cache.py @@ -2,14 +2,14 @@ import pickle -import logging +import structlog from entityservice import serialization from entityservice import database from entityservice.object_store import connect_to_object_store from entityservice.settings import Config as config -logger = logging.getLogger('cache') +logger = structlog.get_logger() redis_host = config.REDIS_SERVER redis_pass = config.REDIS_PASSWORD diff --git a/backend/entityservice/database/util.py b/backend/entityservice/database/util.py index 9d47253a..4be67f6f 100644 --- a/backend/entityservice/database/util.py +++ b/backend/entityservice/database/util.py @@ -4,12 +4,12 @@ import psycopg2 import psycopg2.extras from flask import current_app, g - +from structlog import get_logger from entityservice import database as db from entityservice.errors import DatabaseInconsistent, DBResourceMissing from entityservice.settings import Config as config -logger = logging.getLogger('db') +logger = get_logger() def query_db(db, query, args=(), one=False): @@ -19,7 +19,7 @@ def query_db(db, query, args=(), one=False): https://flask-doc.readthedocs.org/en/latest/patterns/sqlite3.html#easy-querying """ with db.cursor() as cur: - logger.debug(f"Query: query") + logger.debug(f"Query: {query}") cur.execute(query, args) rv = [dict((cur.description[idx][0], value) for idx, value in enumerate(row)) for row in cur.fetchall()] diff --git a/backend/entityservice/models/run.py b/backend/entityservice/models/run.py index eb6930d5..0798078a 100644 --- a/backend/entityservice/models/run.py +++ b/backend/entityservice/models/run.py @@ -36,7 +36,8 @@ def progress_run_stage(conn, run_id): - logger.info("Run progressing to next stage") + log = logger.bind(run_id=run_id) + log.info("Run progressing to next stage") db.progress_run_stage(conn, run_id) # clear progress in cache cache.clear_progress(run_id) diff --git a/backend/entityservice/utils.py b/backend/entityservice/utils.py index b29d1a98..3daa97f8 100644 --- a/backend/entityservice/utils.py +++ b/backend/entityservice/utils.py @@ -1,16 +1,19 @@ #!/usr/bin/env python3 -import binascii + import io import json import os -import logging +import binascii import bitmath from flask import request from connexion import ProblemException +from structlog import get_logger from entityservice.database import connect_db, get_number_parties_uploaded, get_project_column, get_number_parties_ready +logger = get_logger() + def fmt_bytes(num_bytes): """ @@ -170,16 +173,16 @@ def generate_code(length=24): def clks_uploaded_to_project(project_id, check_data_ready=False): """ See if the given mapping has had all parties contribute data. """ - logging.info("Counting contributing parties") + logger.info("Counting contributing parties") conn = connect_db() if check_data_ready: parties_contributed = get_number_parties_ready(conn, project_id) - logging.info("Parties where data is ready: {}".format(parties_contributed)) + logger.info("Parties where data is ready: {}".format(parties_contributed)) else: parties_contributed = get_number_parties_uploaded(conn, project_id) - logging.info("Parties where data is uploaded: {}".format(parties_contributed)) + logger.info("Parties where data is uploaded: {}".format(parties_contributed)) number_parties = get_project_column(conn, project_id, 'parties') - logging.info("{}/{} parties have contributed clks".format(parties_contributed, number_parties)) + logger.info("{}/{} parties have contributed clks".format(parties_contributed, number_parties)) return parties_contributed == number_parties diff --git a/backend/entityservice/views/project.py b/backend/entityservice/views/project.py index 13a28a69..ec2465da 100644 --- a/backend/entityservice/views/project.py +++ b/backend/entityservice/views/project.py @@ -1,5 +1,4 @@ import io -import uuid from io import BytesIO import minio