From 292794d835f5650f85b0fc3c53c3360bdedcb22f Mon Sep 17 00:00:00 2001 From: Brian Thorne Date: Mon, 12 Nov 2018 14:32:16 +1100 Subject: [PATCH 01/15] Not every test needed to create a project with data dozens of times --- Jenkinsfile.groovy | 6 ++-- backend/entityservice/api_def/swagger.yaml | 4 --- backend/entityservice/tests/conftest.py | 27 ++++++++++++--- backend/entityservice/tests/test_project.py | 4 +-- .../tests/test_project_run_description.py | 20 ++++++++--- .../tests/test_project_run_listing.py | 34 +++++++++++-------- .../tests/test_project_run_posting.py | 15 +++----- .../tests/test_project_run_results.py | 2 +- .../tests/test_project_with_runs_deletes.py | 19 +++-------- backend/entityservice/tests/util.py | 12 +++++++ 10 files changed, 84 insertions(+), 59 deletions(-) diff --git a/Jenkinsfile.groovy b/Jenkinsfile.groovy index 8c2f04e6..2487e67b 100644 --- a/Jenkinsfile.groovy +++ b/Jenkinsfile.groovy @@ -82,8 +82,8 @@ node('docker&&multicore&&ram') { stage('Integration Tests') { gitCommit.setInProgressStatus(gitContextIntegrationTests); try { - DockerContainer containerTests = new DockerContainer(dockerUtils, composeProject + "_tests_1") - sleep 2 + + sleep 30 timeout(time: 60, unit: 'MINUTES') { containerTests.watchLogs() @@ -292,7 +292,7 @@ node('helm && kubectl') { pvc = createK8sTestJob(DEPLOYMENT, QuayIORepo.ENTITY_SERVICE_APP.getRepo() + ":" + TAG, serviceIP) - sleep(time: 60, unit: "SECONDS") + sleep(time: 300, unit: "SECONDS") def jobPodName = sh(script: """ kubectl get pods -l deployment=${DEPLOYMENT} -o jsonpath="{.items[0].metadata.name}" diff --git a/backend/entityservice/api_def/swagger.yaml b/backend/entityservice/api_def/swagger.yaml index 601c87d5..dd52247f 100644 --- a/backend/entityservice/api_def/swagger.yaml +++ b/backend/entityservice/api_def/swagger.yaml @@ -193,10 +193,6 @@ paths: If the `result_type` is `"mapping"` or `"similarity_scores"` then the results can be accessed with the `result_token``token`, which is provided when initially creating the mapping. - - `"permutation"`\ - If the `result_type` is `"permutation"` then the data provider can access their respective permutation and the - encrypted mask with their respective `receipt_token`, which they obtain when adding data to the mapping. - - `"permutations"`\ If the `result_type` is `permutations`, then the data providers can access their respective permutation with their individual `receipt_token`, which they obtain when adding data to the mapping. diff --git a/backend/entityservice/tests/conftest.py b/backend/entityservice/tests/conftest.py index 7fd30d1b..f6d4fc50 100644 --- a/backend/entityservice/tests/conftest.py +++ b/backend/entityservice/tests/conftest.py @@ -4,8 +4,8 @@ import requests as requests_library import itertools -from entityservice.tests.util import create_project_upload_fake_data, delete_project - +from entityservice.tests.util import create_project_upload_fake_data, delete_project, temporary_blank_project, \ + create_project_no_data THROTTLE_SLEEP = 0.2 @@ -50,6 +50,7 @@ def delay_next(r, *args, **kwargs): else itertools.combinations([1, 10000, 100000, 1000000], 2)) PROJECT_PARAMS = list(itertools.product(SIZES, OVERLAPS, ENCODING_SIZES)) +PROJECT_RESULT_TYPES = ['mapping', 'similarity_scores', 'permutations'] def create_project_response(requests, size, overlap, result_type, encoding_size=128): @@ -86,7 +87,7 @@ def create_project_response(requests, size, overlap, result_type, encoding_size= @pytest.fixture(scope='function', params=PROJECT_PARAMS) def mapping_project(request, requests): size, overlap, encoding_size = request.param - prj = create_project_response(requests, size, overlap, 'mapping') + prj = create_project_response(requests, size, overlap, 'mapping', encoding_size) yield prj delete_project(requests, prj) @@ -94,11 +95,27 @@ def mapping_project(request, requests): @pytest.fixture(scope='function', params=PROJECT_PARAMS) def similarity_scores_project(request, requests): size, overlap, encoding_size = request.param - prj = create_project_response(requests, size, overlap, 'similarity_scores') + prj = create_project_response(requests, size, overlap, 'similarity_scores', encoding_size) yield prj delete_project(requests, prj) +@pytest.fixture(scope='function', params=PROJECT_RESULT_TYPES) +def result_type(request): + yield request.param + + +@pytest.fixture(scope='function', params=PROJECT_RESULT_TYPES) +def project(request, requests): + result_type = request.param + project = create_project_no_data(requests, result_type) + try: + yield project + finally: + # Release project resource + delete_project(requests, project) + + @pytest.fixture(scope='function', params=ENCODING_SIZES) def encoding_size(request): yield request.param @@ -112,6 +129,6 @@ def threshold(request): @pytest.fixture(scope='function', params=PROJECT_PARAMS) def permutations_project(request, requests): size, overlap, encoding_size = request.param - prj = create_project_response(requests, size, overlap, 'permutations') + prj = create_project_response(requests, size, overlap, 'permutations', encoding_size) yield prj delete_project(requests, prj) diff --git a/backend/entityservice/tests/test_project.py b/backend/entityservice/tests/test_project.py index 79ba276c..515e108e 100644 --- a/backend/entityservice/tests/test_project.py +++ b/backend/entityservice/tests/test_project.py @@ -71,8 +71,8 @@ def test_create_then_delete_valid_auth(requests): assert delete_project_response.status_code == 204 -def test_delete_mapping_project(requests, mapping_project): - delete_project(requests, mapping_project) +def test_delete_project_types(requests, project): + delete_project(requests, project) def test_create_then_list(requests): diff --git a/backend/entityservice/tests/test_project_run_description.py b/backend/entityservice/tests/test_project_run_description.py index acedde84..d85256b2 100644 --- a/backend/entityservice/tests/test_project_run_description.py +++ b/backend/entityservice/tests/test_project_run_description.py @@ -3,13 +3,23 @@ from entityservice.tests.util import create_project_upload_fake_data, post_run, get_run -def test_run_description_missing_run(requests, mapping_project): - _ = get_run(requests, mapping_project, 'invalid', expected_status = 403) +def test_run_description_missing_run(requests, project): + _ = get_run(requests, project, 'invalid', expected_status = 403) -def test_run_description(requests, mapping_project): - run_id = post_run(requests, mapping_project, 0.95) - run = get_run(requests, mapping_project, run_id) +def test_run_description_no_data(requests, project): + run_id = post_run(requests, project, 0.95) + run = get_run(requests, project, run_id) + + assert 'run_id' in run + assert 'notes' in run + assert 'threshold' in run + + +def test_run_description(requests, result_type): + project, dp1, dp2 = create_project_upload_fake_data(requests, [100, 100], overlap=0.5, result_type=result_type) + run_id = post_run(requests, project, 0.98) + run = get_run(requests, project, run_id) assert 'run_id' in run assert 'notes' in run diff --git a/backend/entityservice/tests/test_project_run_listing.py b/backend/entityservice/tests/test_project_run_listing.py index afdf06bf..92a65b37 100644 --- a/backend/entityservice/tests/test_project_run_listing.py +++ b/backend/entityservice/tests/test_project_run_listing.py @@ -1,26 +1,30 @@ from entityservice.tests.config import url -from entityservice.tests.util import create_project_upload_fake_data, create_project_no_data, get_runs, post_run +from entityservice.tests.util import create_project_upload_fake_data, create_project_no_data, get_runs, post_run, \ + temporary_blank_project -def test_empty_list_run(requests, mapping_project): - r = get_runs(requests, mapping_project) - assert r == [] +def test_empty_list_run(requests): + with temporary_blank_project(requests) as project: + r = get_runs(requests, project) + assert r == [] -def test_list_run_noauth(requests, mapping_project): - r = requests.get(url + '/projects/{}/runs'.format(mapping_project['project_id'])) - assert r.status_code == 400 +def test_list_run_noauth(requests): + with temporary_blank_project(requests) as project: + r = requests.get(url + '/projects/{}/runs'.format(project['project_id'])) + assert r.status_code == 400 -def test_list_run_invalid_auth(requests, mapping_project): - _ = get_runs(requests, mapping_project, 'invalid', expected_status = 403) +def test_list_run_invalid_auth(requests): + with temporary_blank_project(requests) as project: + _ = get_runs(requests, project, 'invalid', expected_status = 403) def test_list_run_after_posting_runs(requests): - project = create_project_no_data(requests) + with temporary_blank_project(requests, result_type='mapping') as project: - for i in range(1, 11): - run_id = post_run(requests, project, 0.95) - # Check run listing has changed - runs = get_runs(requests, project) - assert len(runs) == i + for i in range(1, 11): + run_id = post_run(requests, project, 0.95) + # Check run listing has changed + runs = get_runs(requests, project) + assert len(runs) == i diff --git a/backend/entityservice/tests/test_project_run_posting.py b/backend/entityservice/tests/test_project_run_posting.py index 78c8c24e..ad6c3d42 100644 --- a/backend/entityservice/tests/test_project_run_posting.py +++ b/backend/entityservice/tests/test_project_run_posting.py @@ -1,12 +1,7 @@ -from entityservice.tests.config import url -from entityservice.tests.util import create_project_no_data, create_project_upload_fake_data, post_run, get_runs +from entityservice.tests.util import post_run, get_runs -# TODO: These two tests differ only in whether project has data in it -# or not; refactor with a fixture that gives different project types. -def test_posting_run_before_data_upload(requests): - project = create_project_no_data(requests) - +def test_posting_run_before_data_upload(requests, project): run_id = post_run(requests, project, 0.95) runs = get_runs(requests, project) @@ -18,9 +13,9 @@ def test_posting_run_before_data_upload(requests): assert run['state'] == 'created' -def test_posting_run_after_data_upload(requests, mapping_project): - run_id = post_run(requests, mapping_project, 0.95) - runs = get_runs(requests, mapping_project) +def test_posting_run_after_data_upload(requests, project): + run_id = post_run(requests, project, 0.95) + runs = get_runs(requests, project) assert len(runs) == 1 for run in runs: diff --git a/backend/entityservice/tests/test_project_run_results.py b/backend/entityservice/tests/test_project_run_results.py index ee2ff524..e152f38e 100644 --- a/backend/entityservice/tests/test_project_run_results.py +++ b/backend/entityservice/tests/test_project_run_results.py @@ -16,7 +16,7 @@ def test_run_similarity_score_results(requests, similarity_scores_project, thres assert 'similarity_scores' in result -def test_run_permutation_unencrypted_results(requests, permutations_project, threshold): +def test_run_permutations_results(requests, permutations_project, threshold): run_id = post_run(requests, permutations_project, threshold) mask_result = get_run_result(requests, permutations_project, run_id) assert 'mask' in mask_result diff --git a/backend/entityservice/tests/test_project_with_runs_deletes.py b/backend/entityservice/tests/test_project_with_runs_deletes.py index 2e4547b5..a92ff874 100644 --- a/backend/entityservice/tests/test_project_with_runs_deletes.py +++ b/backend/entityservice/tests/test_project_with_runs_deletes.py @@ -1,17 +1,8 @@ -from entityservice.tests.util import post_run, delete_project +from entityservice.tests.util import post_run, delete_project, create_project_upload_fake_data -def test_delete_mapping_project_after_creating_run_with_clks(requests, mapping_project): - post_run(requests, mapping_project, 0.9) - delete_project(requests, mapping_project) - - -def test_delete_similarity_project_after_creating_run_with_clks(requests, similarity_scores_project): - post_run(requests, similarity_scores_project, 0.9) - delete_project(requests, similarity_scores_project) - - -def test_delete_permutations_project_after_creating_run_with_clks(requests, permutations_project): - post_run(requests, permutations_project, 0.9) - delete_project(requests, permutations_project) +def test_deleteproject_after_creating_run_with_clks(requests, result_type): + project, dp1, dp2 = create_project_upload_fake_data(requests, [100, 100], overlap=0.5, result_type=result_type) + post_run(requests, project, 0.9) + delete_project(requests, project) diff --git a/backend/entityservice/tests/util.py b/backend/entityservice/tests/util.py index b2b09953..3412751e 100644 --- a/backend/entityservice/tests/util.py +++ b/backend/entityservice/tests/util.py @@ -4,6 +4,7 @@ import os import random import time +from contextlib import contextmanager from enum import IntEnum from bitarray import bitarray @@ -82,6 +83,17 @@ def create_project_no_data(requests, result_type='mapping'): return new_project_response.json() +@contextmanager +def temporary_blank_project(requests, result_type='mapping'): + # Code to acquire resource, e.g.: + project = create_project_no_data(requests, result_type) + try: + yield project + finally: + # Release project resource + delete_project(requests, project) + + def create_project_upload_fake_data(requests, size, overlap=0.75, result_type='mapping', encoding_size=128): d1, d2 = generate_overlapping_clk_data(size, overlap=overlap, encoding_size=encoding_size) return create_project_upload_data(requests, d1, d2, result_type=result_type) From 49305989e487df3f71260abe10e8533eb797356e Mon Sep 17 00:00:00 2001 From: Brian Thorne Date: Mon, 12 Nov 2018 14:43:56 +1100 Subject: [PATCH 02/15] Update jenkinsfile script to work with recent docker-compose --- Jenkinsfile.groovy | 18 +++++++++++------- 1 file changed, 11 insertions(+), 7 deletions(-) diff --git a/Jenkinsfile.groovy b/Jenkinsfile.groovy index 2487e67b..e3c9962d 100644 --- a/Jenkinsfile.groovy +++ b/Jenkinsfile.groovy @@ -64,7 +64,7 @@ node('docker&&multicore&&ram') { echo("Start all the containers (including tests)") sh """ docker-compose -v - docker-compose \ + docker-compose --no-ansi \ -f tools/docker-compose.yml \ -f tools/ci.yml \ -p ${composeProject} up -d \ @@ -82,15 +82,19 @@ node('docker&&multicore&&ram') { stage('Integration Tests') { gitCommit.setInProgressStatus(gitContextIntegrationTests); try { - + testContainerID = sh ( + script: "docker-compose -f tools/docker-compose.yml -f tools/ci.yml -p ${composeProject} ps -q tests", + returnStdout: true + ).trim() + DockerContainer containerTests = new DockerContainer(dockerUtils, testContainerID) sleep 30 timeout(time: 60, unit: 'MINUTES') { - containerTests.watchLogs() - sh("docker logs " + composeProject + "_nginx_1" + " &> nginx.log") - sh("docker logs " + composeProject + "_backend_1" + " &> backend.log") - sh("docker logs " + composeProject + "_worker_1" + " &> worker.log") - sh("docker logs " + composeProject + "_db_1" + " &> db.log") + containerTests.watchLogs() + sh("docker-compose -f tools/docker-compose.yml -f tools/ci.yml -p ${composeProject} logs nginx &> nginx.log") + sh("docker-compose -f tools/docker-compose.yml -f tools/ci.yml -p ${composeProject} logs backend &> backend.log") + sh("docker-compose -f tools/docker-compose.yml -f tools/ci.yml -p ${composeProject} logs worker &> worker.log") + sh("docker-compose -f tools/docker-compose.yml -f tools/ci.yml -p ${composeProject} logs db &> db.log") archiveArtifacts artifacts: "*.log", fingerprint: false if (containerTests.getExitCode() != "0") { From d212af6ca9b888aaf1e253464878abdbda113577 Mon Sep 17 00:00:00 2001 From: Brian Thorne Date: Mon, 12 Nov 2018 15:17:45 +1100 Subject: [PATCH 03/15] Use a docker compose helper function for jenkins --- Jenkinsfile.groovy | 50 ++++++++++++++++++++++------------------------ 1 file changed, 24 insertions(+), 26 deletions(-) diff --git a/Jenkinsfile.groovy b/Jenkinsfile.groovy index e3c9962d..d1dd186b 100644 --- a/Jenkinsfile.groovy +++ b/Jenkinsfile.groovy @@ -17,6 +17,16 @@ DockerUtils dockerUtils GitCommit gitCommit String composeProject +String composeCmd(String composeProject, String cmd) { + return sh ( + script: """ docker-compose --no-ansi \ + -f tools/docker-compose.yml \ + -f tools/ci.yml \ + -p ${composeProject} ${cmd}""", + returnStdout: true + ).trim() +} + node('docker&&multicore&&ram') { stage ('Initialisation') { @@ -36,19 +46,19 @@ node('docker&&multicore&&ram') { try { dir("backend") { String imageNameLabel = QuayIORepo.ENTITY_SERVICE_APP.getRepo() + ":latest" - dockerUtils.dockerCommand("build -t " + imageNameLabel + " .") + dockerUtils.dockerCommand("build -t ${imageNameLabel} .") } dir("frontend") { String imageNameLabel = QuayIORepo.ENTITY_SERVICE_NGINX.getRepo() + ":latest" - dockerUtils.dockerCommand("build -t " + imageNameLabel + " .") + dockerUtils.dockerCommand("build -t ${imageNameLabel} .") } dir("docs") { String imageNameLabel = "quay.io/n1analytics/entity-docs:latest" - dockerUtils.dockerCommand("build -t " + imageNameLabel + " .") + dockerUtils.dockerCommand("build -t ${imageNameLabel} .") } dir("benchmarking") { String imageNameLabel = "quay.io/n1analytics/entity-benchmark:latest" - dockerUtils.dockerCommand("build -t " + imageNameLabel + " .") + dockerUtils.dockerCommand("build -t ${imageNameLabel} .") } gitCommit.setSuccessStatus(gitContextDockerBuild) } catch (err) { @@ -62,15 +72,8 @@ node('docker&&multicore&&ram') { gitCommit.setInProgressStatus(gitContextComposeDeploy); try { echo("Start all the containers (including tests)") - sh """ - docker-compose -v - docker-compose --no-ansi \ - -f tools/docker-compose.yml \ - -f tools/ci.yml \ - -p ${composeProject} up -d \ - db minio redis backend db_init worker nginx tests - """ - + sh "docker-compose -v" + composeCmd(composeProject, "up -d db minio redis backend db_init worker nginx tests") gitCommit.setSuccessStatus(gitContextComposeDeploy) } catch (err) { print("Error in compose deploy stage:\n" + err) @@ -82,19 +85,16 @@ node('docker&&multicore&&ram') { stage('Integration Tests') { gitCommit.setInProgressStatus(gitContextIntegrationTests); try { - testContainerID = sh ( - script: "docker-compose -f tools/docker-compose.yml -f tools/ci.yml -p ${composeProject} ps -q tests", - returnStdout: true - ).trim() + testContainerID = composeCmd(composeProject,"ps -q tests") DockerContainer containerTests = new DockerContainer(dockerUtils, testContainerID) sleep 30 timeout(time: 60, unit: 'MINUTES') { containerTests.watchLogs() - sh("docker-compose -f tools/docker-compose.yml -f tools/ci.yml -p ${composeProject} logs nginx &> nginx.log") - sh("docker-compose -f tools/docker-compose.yml -f tools/ci.yml -p ${composeProject} logs backend &> backend.log") - sh("docker-compose -f tools/docker-compose.yml -f tools/ci.yml -p ${composeProject} logs worker &> worker.log") - sh("docker-compose -f tools/docker-compose.yml -f tools/ci.yml -p ${composeProject} logs db &> db.log") + composeCmd(composeProject, "logs nginx &> nginx.log") + composeCmd(composeProject, "logs backend &> backend.log") + composeCmd(composeProject, "logs worker &> worker.log") + composeCmd(composeProject, "logs db &> db.log") archiveArtifacts artifacts: "*.log", fingerprint: false if (containerTests.getExitCode() != "0") { @@ -216,8 +216,7 @@ node('docker&&multicore&&ram') { stage("Cleaning") { try { dockerUtils.dockerLogoutQuayIOWithoutFail() - String cmdTearDown = "docker-compose -f tools/docker-compose.yml -f tools/ci.yml -p " + composeProject + " down -v" - sh cmdTearDown + composeCmd(composeProject, " down -v") } catch(Exception err) { print("Error in cleaning stage, but do nothing about it:\n" + err) // Do nothing on purpose. @@ -283,8 +282,7 @@ node('helm && kubectl') { -f values.yaml -f minimal-values.yaml -f test-versions.yaml \ --set api.app.debug=true \ --set api.ingress.enabled=false \ - --set api.certManager.enabled=false \ - --set provision.redis=true + --set api.certManager.enabled=false """ // give the cluster a chance to provision volumes etc, assign an IP to the service, then create a new job to test it sleep(time: 120, unit: "SECONDS") @@ -296,7 +294,7 @@ node('helm && kubectl') { pvc = createK8sTestJob(DEPLOYMENT, QuayIORepo.ENTITY_SERVICE_APP.getRepo() + ":" + TAG, serviceIP) - sleep(time: 300, unit: "SECONDS") + sleep(time: 120, unit: "SECONDS") def jobPodName = sh(script: """ kubectl get pods -l deployment=${DEPLOYMENT} -o jsonpath="{.items[0].metadata.name}" From b56c2b6057f7895239fe67b36064b149ff16cd0b Mon Sep 17 00:00:00 2001 From: Brian Thorne Date: Mon, 12 Nov 2018 15:52:41 +1100 Subject: [PATCH 04/15] Stop after first failed test --- Jenkinsfile.groovy | 1 + tools/ci.yml | 2 +- 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/Jenkinsfile.groovy b/Jenkinsfile.groovy index d1dd186b..f307db84 100644 --- a/Jenkinsfile.groovy +++ b/Jenkinsfile.groovy @@ -484,6 +484,7 @@ String createK8sTestJob(String deploymentName, String imageNameWithTag, String s "-m", "pytest", "entityservice/tests", + "-x", "--junit-xml=/mnt/results.xml" ], "volumeMounts": [[ diff --git a/tools/ci.yml b/tools/ci.yml index 793704a7..cc7a5107 100644 --- a/tools/ci.yml +++ b/tools/ci.yml @@ -7,7 +7,7 @@ services: environment: - ENTITY_SERVICE_URL=http://nginx:8851/api/v1 - INITIAL_DELAY=20 - entrypoint: /bin/sh -c "sleep 10; python -m pytest entityservice/tests" + entrypoint: /bin/sh -c "sleep 10; python -m pytest entityservice/tests -x" depends_on: - db - redis From 50983e702dfe9423ed0bf282fb8c1d1217f33bb1 Mon Sep 17 00:00:00 2001 From: Brian Thorne Date: Mon, 12 Nov 2018 16:11:35 +1100 Subject: [PATCH 05/15] Increase minimal k8s limits --- deployment/entity-service/minimal-values.yaml | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/deployment/entity-service/minimal-values.yaml b/deployment/entity-service/minimal-values.yaml index 494cd7eb..9dabe7e0 100644 --- a/deployment/entity-service/minimal-values.yaml +++ b/deployment/entity-service/minimal-values.yaml @@ -5,7 +5,7 @@ api: resources: limits: memory: 512Mi - cpu: 50m + cpu: 250m requests: cpu: 50m memory: 256Mi @@ -30,7 +30,7 @@ workers: cpu: 50m limits: memory: 512Mi - cpu: 200m + cpu: 250m highmemory: replicaCount: 1 resources: @@ -38,7 +38,7 @@ workers: memory: 512Mi cpu: 100m limits: - memory: 512Mi + memory: 1024Mi cpu: 200m postgresql: From f5325df2469dd8ef7ba76f701321cb96199c8c56 Mon Sep 17 00:00:00 2001 From: Brian Thorne Date: Mon, 12 Nov 2018 17:08:43 +1100 Subject: [PATCH 06/15] Set jaeger and celery logging level to WARNING --- backend/entityservice/async_worker.py | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/backend/entityservice/async_worker.py b/backend/entityservice/async_worker.py index 4468e8f1..c3474158 100644 --- a/backend/entityservice/async_worker.py +++ b/backend/entityservice/async_worker.py @@ -35,11 +35,16 @@ logger_factory=structlog.stdlib.LoggerFactory(), ) +# Set logging level of other python libraries that we use +logging.getLogger('celery').setLevel(logging.WARNING) +logging.getLogger('jaeger_tracing').setLevel(logging.WARNING) + +# Set up our logging logger = structlog.wrap_logger(logging.getLogger('celery.es')) if config.DEBUG: - logging.getLogger('celery').setLevel(logging.INFO) logging.getLogger('celery.es').setLevel(logging.DEBUG) - logging.getLogger('jaeger_tracing').setLevel(logging.WARNING) + logging.getLogger('celery').setLevel(logging.INFO) + logger.info("Setting up celery worker") logger.debug("Debug logging enabled") From 389342a8f05bdbcc74d863a97523687b79ba8bcd Mon Sep 17 00:00:00 2001 From: Brian Thorne Date: Wed, 14 Nov 2018 11:08:46 +1100 Subject: [PATCH 07/15] No need to wrap fixtures in a try/except --- backend/entityservice/tests/conftest.py | 8 +++----- .../entityservice/tests/test_project_with_runs_deletes.py | 2 +- backend/entityservice/tests/util.py | 8 +++----- 3 files changed, 7 insertions(+), 11 deletions(-) diff --git a/backend/entityservice/tests/conftest.py b/backend/entityservice/tests/conftest.py index f6d4fc50..5e8be566 100644 --- a/backend/entityservice/tests/conftest.py +++ b/backend/entityservice/tests/conftest.py @@ -109,11 +109,9 @@ def result_type(request): def project(request, requests): result_type = request.param project = create_project_no_data(requests, result_type) - try: - yield project - finally: - # Release project resource - delete_project(requests, project) + yield project + # Release project resource + delete_project(requests, project) @pytest.fixture(scope='function', params=ENCODING_SIZES) diff --git a/backend/entityservice/tests/test_project_with_runs_deletes.py b/backend/entityservice/tests/test_project_with_runs_deletes.py index a92ff874..9a62dbba 100644 --- a/backend/entityservice/tests/test_project_with_runs_deletes.py +++ b/backend/entityservice/tests/test_project_with_runs_deletes.py @@ -2,7 +2,7 @@ from entityservice.tests.util import post_run, delete_project, create_project_upload_fake_data -def test_deleteproject_after_creating_run_with_clks(requests, result_type): +def test_delete_project_after_creating_run_with_clks(requests, result_type): project, dp1, dp2 = create_project_upload_fake_data(requests, [100, 100], overlap=0.5, result_type=result_type) post_run(requests, project, 0.9) delete_project(requests, project) diff --git a/backend/entityservice/tests/util.py b/backend/entityservice/tests/util.py index 3412751e..710b1a39 100644 --- a/backend/entityservice/tests/util.py +++ b/backend/entityservice/tests/util.py @@ -87,11 +87,9 @@ def create_project_no_data(requests, result_type='mapping'): def temporary_blank_project(requests, result_type='mapping'): # Code to acquire resource, e.g.: project = create_project_no_data(requests, result_type) - try: - yield project - finally: - # Release project resource - delete_project(requests, project) + yield project + # Release project resource + delete_project(requests, project) def create_project_upload_fake_data(requests, size, overlap=0.75, result_type='mapping', encoding_size=128): From 090ce47a615849231f4349d1d51f8c912d5f994a Mon Sep 17 00:00:00 2001 From: Brian Thorne Date: Wed, 5 Dec 2018 13:20:10 +1000 Subject: [PATCH 08/15] Import tidy ups and minor code cleanup for pep8 (#302) * Import tidy ups and minor code cleanup for pep8 * Doc update --- backend/entityservice/__init__.py | 9 +++--- backend/entityservice/async_worker.py | 21 ++++++------- .../entityservice/database/authorization.py | 2 +- backend/entityservice/database/insertions.py | 3 +- backend/entityservice/database/metrics.py | 12 ++++++- backend/entityservice/database/util.py | 7 ++--- backend/entityservice/models/project.py | 3 +- backend/entityservice/models/run.py | 4 +-- backend/entityservice/tasks/base_task.py | 15 ++++----- backend/entityservice/tasks/comparing.py | 21 +++++++------ .../entityservice/tasks/encoding_uploading.py | 13 ++++---- backend/entityservice/tasks/permutation.py | 7 +++-- .../entityservice/tasks/project_cleanup.py | 6 ++-- backend/entityservice/tasks/run.py | 4 +-- backend/entityservice/tasks/solver.py | 1 - backend/entityservice/tasks/stats.py | 10 ++---- backend/entityservice/utils.py | 31 ++++++++----------- backend/entityservice/views/auth_checks.py | 5 +-- backend/entityservice/views/project.py | 29 ++++++++--------- .../entityservice/views/run/description.py | 3 +- docs/benchmarking.rst | 3 +- docs/conf.py | 6 ++-- docs/index.rst | 1 + docs/production-deployment.rst | 2 +- 24 files changed, 110 insertions(+), 108 deletions(-) diff --git a/backend/entityservice/__init__.py b/backend/entityservice/__init__.py index 8426c5ce..5e5511c1 100644 --- a/backend/entityservice/__init__.py +++ b/backend/entityservice/__init__.py @@ -31,10 +31,6 @@ # Logging setup logger = structlog.get_logger() -if config.LOGFILE is not None: - fileHandler = logging.FileHandler(config.LOGFILE) - fileHandler.setLevel(logging.INFO) - fileHandler.setFormatter(config.fileFormat) consoleHandler = logging.StreamHandler() consoleHandler.setLevel(logging.DEBUG) structlog.configure( @@ -62,7 +58,10 @@ app.logger.setLevel(gunicorn_logger.level) if config.LOGFILE is not None: - app.logger.addHandler(fileHandler) + fileHandler = logging.FileHandler(config.LOGFILE) + fileHandler.setLevel(logging.INFO) + fileHandler.setFormatter(config.fileFormat) + app.logger.addHandler(fileHandler) # Tracer setup (currently just trace all requests) flask_tracer = FlaskTracer(initialize_tracer, True, app) diff --git a/backend/entityservice/async_worker.py b/backend/entityservice/async_worker.py index c3474158..ec174ca3 100644 --- a/backend/entityservice/async_worker.py +++ b/backend/entityservice/async_worker.py @@ -1,25 +1,25 @@ import logging from celery import Celery -from celery.signals import task_prerun, task_postrun +from celery.signals import task_prerun import structlog -from entityservice.settings import Config as config +from entityservice.settings import Config celery = Celery('tasks', - broker=config.BROKER_URL, - backend=config.CELERY_RESULT_BACKEND + broker=Config.BROKER_URL, + backend=Config.CELERY_RESULT_BACKEND ) celery.conf.CELERY_TASK_SERIALIZER = 'json' celery.conf.CELERY_ACCEPT_CONTENT = ['json'] celery.conf.CELERY_RESULT_SERIALIZER = 'json' -celery.conf.CELERY_ANNOTATIONS = config.CELERY_ANNOTATIONS -celery.conf.CELERYD_PREFETCH_MULTIPLIER = config.CELERYD_PREFETCH_MULTIPLIER -celery.conf.CELERYD_MAX_TASKS_PER_CHILD = config.CELERYD_MAX_TASKS_PER_CHILD -celery.conf.CELERY_ACKS_LATE = config.CELERY_ACKS_LATE -celery.conf.CELERY_ROUTES = config.CELERY_ROUTES +celery.conf.CELERY_ANNOTATIONS = Config.CELERY_ANNOTATIONS +celery.conf.CELERYD_PREFETCH_MULTIPLIER = Config.CELERYD_PREFETCH_MULTIPLIER +celery.conf.CELERYD_MAX_TASKS_PER_CHILD = Config.CELERYD_MAX_TASKS_PER_CHILD +celery.conf.CELERY_ACKS_LATE = Config.CELERY_ACKS_LATE +celery.conf.CELERY_ROUTES = Config.CELERY_ROUTES structlog.configure( @@ -41,7 +41,7 @@ # Set up our logging logger = structlog.wrap_logger(logging.getLogger('celery.es')) -if config.DEBUG: +if Config.DEBUG: logging.getLogger('celery.es').setLevel(logging.DEBUG) logging.getLogger('celery').setLevel(logging.INFO) @@ -57,4 +57,3 @@ def configure_structlog(sender, body=None, **kwargs): task_id=kwargs['task_id'], task_name=sender.__name__ ) - diff --git a/backend/entityservice/database/authorization.py b/backend/entityservice/database/authorization.py index c7b88455..cebd0523 100644 --- a/backend/entityservice/database/authorization.py +++ b/backend/entityservice/database/authorization.py @@ -15,4 +15,4 @@ def check_project_auth(db, project_id, results_token): def check_update_auth(db, update_token): - return get_dataprovider_id(db, update_token) is not None \ No newline at end of file + return get_dataprovider_id(db, update_token) is not None diff --git a/backend/entityservice/database/insertions.py b/backend/entityservice/database/insertions.py index e23fa7fa..b0c31d4b 100644 --- a/backend/entityservice/database/insertions.py +++ b/backend/entityservice/database/insertions.py @@ -1,7 +1,6 @@ -# Insertion Queries - import psycopg2 import psycopg2.extras + from entityservice.database.util import execute_returning_id, logger from entityservice.errors import RunDeleted diff --git a/backend/entityservice/database/metrics.py b/backend/entityservice/database/metrics.py index 51d20c4f..eb5a07e6 100644 --- a/backend/entityservice/database/metrics.py +++ b/backend/entityservice/database/metrics.py @@ -1,6 +1,6 @@ from datetime import timedelta -from entityservice.database.util import query_db, execute_returning_id, logger +from entityservice.database.util import query_db, execute_returning_id def get_latest_rate(db): @@ -37,3 +37,13 @@ def insert_comparison_rate(cur, rate): RETURNING id; """ return execute_returning_id(cur, insertion_stmt, [rate]) + + +def get_elapsed_run_times(db): + elapsed_run_time_query = """ + select run_id, project as project_id, (time_completed - time_started) as elapsed + from runs + WHERE + runs.state='completed' + """ + return query_db(db, elapsed_run_time_query) diff --git a/backend/entityservice/database/util.py b/backend/entityservice/database/util.py index c6160fe5..014e6548 100644 --- a/backend/entityservice/database/util.py +++ b/backend/entityservice/database/util.py @@ -1,12 +1,11 @@ -import logging import time import psycopg2 import psycopg2.extras from flask import current_app, g from structlog import get_logger -from entityservice import database as db -from entityservice.errors import DatabaseInconsistent, DBResourceMissing + +from entityservice import database from entityservice.settings import Config as config logger = get_logger() @@ -102,5 +101,5 @@ def get_db(): conn = getattr(g, 'db', None) if conn is None: logger.debug("Caching a new database connection in application context") - conn = g.db = db.connect_db() + conn = g.db = database.connect_db() return conn diff --git a/backend/entityservice/models/project.py b/backend/entityservice/models/project.py index e8cfef06..6e177662 100644 --- a/backend/entityservice/models/project.py +++ b/backend/entityservice/models/project.py @@ -1,7 +1,8 @@ +from structlog import get_logger + from entityservice.messages import INVALID_RESULT_TYPE_MESSAGE from entityservice.utils import generate_code import entityservice.database as db -from structlog import get_logger logger = get_logger() diff --git a/backend/entityservice/models/run.py b/backend/entityservice/models/run.py index 0798078a..c94ebfd6 100644 --- a/backend/entityservice/models/run.py +++ b/backend/entityservice/models/run.py @@ -1,10 +1,10 @@ +from structlog import get_logger + import entityservice.database as db -from entityservice import app from entityservice import cache from entityservice.settings import Config as config from entityservice.utils import generate_code -from structlog import get_logger logger = get_logger() diff --git a/backend/entityservice/tasks/base_task.py b/backend/entityservice/tasks/base_task.py index 76de686d..89388a9f 100644 --- a/backend/entityservice/tasks/base_task.py +++ b/backend/entityservice/tasks/base_task.py @@ -1,15 +1,16 @@ -from entityservice.async_worker import celery, logger -from entityservice.database import logger, DBConn, update_run_mark_failure +import time +from abc import ABC -from entityservice.tracing import create_tracer from opentracing.propagation import Format +import psycopg2 -import time +from entityservice.async_worker import celery, logger +from entityservice.database import DBConn, update_run_mark_failure +from entityservice.tracing import create_tracer from entityservice.errors import DBResourceMissing -import psycopg2 -class BaseTask(celery.Task): +class BaseTask(celery.Task, ABC): """Abstract base class for all tasks in Entity Service""" abstract = True @@ -86,7 +87,7 @@ def __call__(self, *args, **kwargs): return super(TracedTask, self).__call__(*args, **kwargs) def after_return(self, status, retval, task_id, args, kwargs, einfo): - time.sleep(2) # jaeger bug + time.sleep(2) # jaeger bug self.tracer.close() self._tracer = None return super(TracedTask, self).after_return(status, retval, task_id, args, kwargs, einfo) diff --git a/backend/entityservice/tasks/comparing.py b/backend/entityservice/tasks/comparing.py index a58b3f1d..72722d66 100644 --- a/backend/entityservice/tasks/comparing.py +++ b/backend/entityservice/tasks/comparing.py @@ -1,21 +1,22 @@ + +import csv +import os import time import anonlink -import csv import minio -import os from celery import chord from entityservice.utils import fmt_bytes from entityservice.object_store import connect_to_object_store from entityservice.async_worker import celery, logger -from entityservice.database import connect_db, check_project_exists, check_run_exists, \ +from entityservice.database import check_project_exists, check_run_exists, \ get_project_dataset_sizes, update_run_mark_failure, get_project_encoding_size, get_filter_metadata, \ update_run_chunk, DBConn, get_project_column, get_dataprovider_ids, get_run from entityservice.models.run import progress_run_stage as progress_stage from entityservice.object_store import store_similarity_scores from entityservice.serialization import get_chunk_from_object_store -from entityservice.settings import Config as config +from entityservice.settings import Config from entityservice.tasks.base_task import TracedTask, celery_bug_fix, on_chord_error from entityservice.tasks.solver import solver_task from entityservice.tasks import mark_run_complete @@ -61,7 +62,7 @@ def create_comparison_jobs(project_id, run_id, parent_span=None): current_span.log_kv({"event": 'get-metadata'}) log.debug("Chunking computation task") - chunk_size = config.get_task_chunk_size(size, threshold) + chunk_size = Config.get_task_chunk_size(size, threshold) if chunk_size is None: chunk_size = max(lenf1, lenf2) log.info("Chunks will contain {} entities per task".format(chunk_size)) @@ -182,7 +183,7 @@ def compute_filter_similarity(chunk_info_dp1, chunk_info_dp2, project_id, run_id # Will write a csv file for now mc = connect_to_object_store() try: - mc.fput_object(config.MINIO_BUCKET, result_filename, result_filename) + mc.fput_object(Config.MINIO_BUCKET, result_filename, result_filename) except minio.ResponseError as err: log.warning("Failed to store result in minio") raise @@ -201,7 +202,7 @@ def compute_filter_similarity(chunk_info_dp1, chunk_info_dp2, project_id, run_id t4 - t3, t4 - t4, t6 - t5, - t6 - t0, ) + t6 - t0) ) return num_results, result_filename @@ -222,12 +223,12 @@ def aggregate_comparisons(similarity_result_files, project_id, run_id, parent_sp for num, filename in similarity_result_files: if num > 0: files.append(filename) - data_size += mc.stat_object(config.MINIO_BUCKET, filename).size + data_size += mc.stat_object(Config.MINIO_BUCKET, filename).size log.debug("Aggregating result chunks from {} files, total size: {}".format( len(files), fmt_bytes(data_size))) - result_file_stream_generator = (mc.get_object(config.MINIO_BUCKET, result_filename) for result_filename in files) + result_file_stream_generator = (mc.get_object(Config.MINIO_BUCKET, result_filename) for result_filename in files) log.info("Similarity score results are {}".format(fmt_bytes(data_size))) result_stream = chain_streams(result_file_stream_generator) @@ -250,7 +251,7 @@ def aggregate_comparisons(similarity_result_files, project_id, run_id, parent_sp # DB now committed, we can fire off tasks that depend on the new db state if result_type == "similarity_scores": log.info("Deleting intermediate similarity score files from object store") - mc.remove_objects(config.MINIO_BUCKET, files) + mc.remove_objects(Config.MINIO_BUCKET, files) log.debug("Removing clk filters from redis cache") remove_from_cache(dp_ids[0]) remove_from_cache(dp_ids[1]) diff --git a/backend/entityservice/tasks/encoding_uploading.py b/backend/entityservice/tasks/encoding_uploading.py index 57cd2a40..dd9acba8 100644 --- a/backend/entityservice/tasks/encoding_uploading.py +++ b/backend/entityservice/tasks/encoding_uploading.py @@ -9,11 +9,10 @@ InvalidEncodingError from entityservice.object_store import connect_to_object_store from entityservice.serialization import binary_pack_filters, deserialize_bitarray, binary_format -from entityservice.settings import Config as config +from entityservice.settings import Config from entityservice.async_worker import celery, logger from entityservice.tasks.base_task import TracedTask from entityservice.tasks.pre_run_check import check_for_executable_runs - from entityservice.utils import iterable_to_stream, fmt_bytes, clks_uploaded_to_project @@ -33,8 +32,8 @@ def handle_raw_upload(project_id, dp_id, receipt_token, parent_span=None): mc = connect_to_object_store() # Input file is line separated base64 record encodings. - raw_file = config.RAW_FILENAME_FMT.format(receipt_token) - raw_data_response = mc.get_object(config.MINIO_BUCKET, raw_file) + raw_file = Config.RAW_FILENAME_FMT.format(receipt_token) + raw_data_response = mc.get_object(Config.MINIO_BUCKET, raw_file) # Set up streaming processing pipeline buffered_stream = iterable_to_stream(raw_data_response.stream()) @@ -76,12 +75,12 @@ def filter_generator(): update_encoding_metadata_set_encoding_size(db, dp_id, uploaded_encoding_size) # Output file is our custom binary packed file - filename = config.BIN_FILENAME_FMT.format(receipt_token) + filename = Config.BIN_FILENAME_FMT.format(receipt_token) bit_packed_element_size = binary_format(uploaded_encoding_size).size num_bytes = expected_count * bit_packed_element_size # If small enough preload the data into our redis cache - if expected_count < config.ENTITY_CACHE_THRESHOLD: + if expected_count < Config.ENTITY_CACHE_THRESHOLD: log.info("Caching pickled clk data") python_filters = list(python_filters) cache.set_deserialized_filter(dp_id, python_filters) @@ -94,7 +93,7 @@ def filter_generator(): # Upload to object store log.info(f"Uploading {expected_count} encodings of size {uploaded_encoding_size} " + f"to object store. Total Size: {fmt_bytes(num_bytes)}") - mc.put_object(config.MINIO_BUCKET, filename, data=packed_filter_stream, length=num_bytes) + mc.put_object(Config.MINIO_BUCKET, filename, data=packed_filter_stream, length=num_bytes) with DBConn() as conn: update_encoding_metadata(conn, filename, dp_id, 'ready') diff --git a/backend/entityservice/tasks/permutation.py b/backend/entityservice/tasks/permutation.py index 468cc625..9a458187 100644 --- a/backend/entityservice/tasks/permutation.py +++ b/backend/entityservice/tasks/permutation.py @@ -2,8 +2,8 @@ from entityservice import cache from entityservice.async_worker import celery, logger -from entityservice.database import logger, DBConn, get_project_column, insert_mapping_result, get_dataprovider_ids, \ - connect_db, get_run_result, insert_permutation, insert_permutation_mask +from entityservice.database import DBConn, get_project_column, insert_mapping_result, get_dataprovider_ids, \ + get_run_result, insert_permutation, insert_permutation_mask from entityservice.tasks.base_task import TracedTask from entityservice.tasks import mark_run_complete from entityservice.tasks.stats import calculate_comparison_rate @@ -57,7 +57,8 @@ def permute_mapping_data(project_id, run_id, len_filters1, len_filters2, parent_ """ Task which will create a permutation after a mapping has been completed. - :param project_id: The project resource + :param project_id: The project resource id + :param run_id: The run id :param len_filters1: :param len_filters2: diff --git a/backend/entityservice/tasks/project_cleanup.py b/backend/entityservice/tasks/project_cleanup.py index 8f27ee9b..90c579c8 100644 --- a/backend/entityservice/tasks/project_cleanup.py +++ b/backend/entityservice/tasks/project_cleanup.py @@ -4,7 +4,7 @@ from entityservice.object_store import connect_to_object_store from entityservice.async_worker import celery, logger from entityservice.tasks.base_task import TracedTask -from entityservice.settings import Config as config +from entityservice.settings import Config @celery.task(base=TracedTask, @@ -35,6 +35,6 @@ def delete_minio_objects(filenames, project_id): mc = connect_to_object_store() log.info(f"Deleting {len(filenames)} files from object store") try: - mc.remove_objects(config.MINIO_BUCKET, filenames) + mc.remove_objects(Config.MINIO_BUCKET, filenames) except MinioError as e: - log.warning(f"Error occurred while removing object {filename}. Ignoring.") + log.warning(f"Error occurred while removing object {filenames}. Ignoring.") diff --git a/backend/entityservice/tasks/run.py b/backend/entityservice/tasks/run.py index 4e8e1cf4..09ae9b7b 100644 --- a/backend/entityservice/tasks/run.py +++ b/backend/entityservice/tasks/run.py @@ -1,5 +1,5 @@ -from entityservice.database import DBConn, logger, \ - check_project_exists, get_run, update_run_set_started, get_dataprovider_ids +from entityservice.database import DBConn, check_project_exists, get_run +from entityservice.database import update_run_set_started, get_dataprovider_ids from entityservice.errors import RunDeleted, ProjectDeleted from entityservice.tasks.base_task import TracedTask from entityservice.tasks.comparing import create_comparison_jobs diff --git a/backend/entityservice/tasks/solver.py b/backend/entityservice/tasks/solver.py index 0f3bf261..dfcf0a41 100644 --- a/backend/entityservice/tasks/solver.py +++ b/backend/entityservice/tasks/solver.py @@ -1,5 +1,4 @@ import anonlink -from structlog import get_logger from entityservice.object_store import connect_to_object_store from entityservice.async_worker import celery, logger diff --git a/backend/entityservice/tasks/stats.py b/backend/entityservice/tasks/stats.py index 8c1e12df..fc53c3ef 100644 --- a/backend/entityservice/tasks/stats.py +++ b/backend/entityservice/tasks/stats.py @@ -1,7 +1,7 @@ from datetime import timedelta from entityservice.async_worker import celery, logger -from entityservice.database import connect_db, logger, query_db +from entityservice.database import connect_db, get_elapsed_run_times from entityservice.database import get_total_comparisons_for_project from entityservice.database import insert_comparison_rate @@ -10,16 +10,10 @@ def calculate_comparison_rate(): dbinstance = connect_db() logger.info("Calculating global comparison rate") - elapsed_run_time_query = """ - select run_id, project as project_id, (time_completed - time_started) as elapsed - from runs - WHERE - runs.state='completed' - """ total_comparisons = 0 total_time = timedelta(0) - for run in query_db(dbinstance, elapsed_run_time_query): + for run in get_elapsed_run_times(dbinstance): comparisons = get_total_comparisons_for_project(dbinstance, run['project_id']) diff --git a/backend/entityservice/utils.py b/backend/entityservice/utils.py index 815e9f6e..f2ed73ab 100644 --- a/backend/entityservice/utils.py +++ b/backend/entityservice/utils.py @@ -3,18 +3,17 @@ import io import json import os -import logging import binascii import bitmath from flask import request from connexion import ProblemException -import structlog +from structlog import get_logger -from entityservice.database import connect_db, get_number_parties_uploaded, get_project_column, \ - get_number_parties_ready +from entityservice.database import connect_db, get_number_parties_uploaded, get_number_parties_ready +from entityservice.database import get_project_column -logger = structlog.wrap_logger(logging.getLogger('celery.es')) +logger = get_logger() def fmt_bytes(num_bytes): @@ -133,10 +132,10 @@ def safe_fail_request(status_code, message, **kwargs): # Connection reset by peer) (See issue #195) if 'Transfer-Encoding' in request.headers and request.headers['Transfer-Encoding'] == 'chunked': chunk_size = 4096 - for data in request.input_stream.read(chunk_size): + for _ in request.input_stream.read(chunk_size): pass else: - data = request.get_json() + _ = request.get_json() raise ProblemException(status=status_code, detail=message, **kwargs) @@ -199,15 +198,11 @@ def similarity_matrix_from_csv_bytes(data): def convert_mapping_to_list(permutation): """Convert the permutation from a dict mapping into a list - Assumes the keys and values of the given dict are numbers in the - inclusive range from 0 to length. Note the keys should be int. - - Returns a list of the values from the passed dict - in the order - defined by the keys. + :param dict permutation: + Assumes the keys and values of the given dict are numbers in the + inclusive range from 0 to length. Note the keys should be int. + :return: + A list of the values from the passed dict - in the order + defined by the keys. """ - l = len(permutation) - - perm_list = [] - for j in range(l): - perm_list.append(permutation[j]) - return perm_list + return [permutation[i] for i in range(len(permutation))] diff --git a/backend/entityservice/views/auth_checks.py b/backend/entityservice/views/auth_checks.py index bf314d65..4a68ad8a 100644 --- a/backend/entityservice/views/auth_checks.py +++ b/backend/entityservice/views/auth_checks.py @@ -1,8 +1,9 @@ -from entityservice import database as db, app +from structlog import get_logger + +from entityservice import database as db from entityservice.database import get_db, get_project_column from entityservice.messages import INVALID_ACCESS_MSG from entityservice.utils import safe_fail_request -from structlog import get_logger logger = get_logger() diff --git a/backend/entityservice/views/project.py b/backend/entityservice/views/project.py index bd276962..1411cb47 100644 --- a/backend/entityservice/views/project.py +++ b/backend/entityservice/views/project.py @@ -16,7 +16,7 @@ abort_if_invalid_results_token, get_authorization_token_type_or_abort from entityservice import models from entityservice.object_store import connect_to_object_store -from entityservice.settings import Config as config +from entityservice.settings import Config from entityservice.views.serialization import ProjectList, NewProjectResponse, ProjectDescription logger = get_logger() @@ -121,6 +121,8 @@ def project_clks_post(project_id): log = log.bind(dp_id=dp_id) log.info("Receiving CLK data.") + receipt_token = None + with opentracing.tracer.start_span('upload-data', child_of=parent_span) as span: span.set_tag("project_id", project_id) if headers['Content-Type'] == "application/json": @@ -143,7 +145,7 @@ def project_clks_post(project_id): count, size = check_binary_upload_headers(headers) log.info(f"Headers tell us to expect {count} encodings of {size} bytes") span.log_kv({'count': count, 'size': size}) - except: + except Exception: log.warning("Upload failed due to problem with headers in binary upload") raise # Check against project level encoding size (if it has been set) @@ -154,7 +156,7 @@ def project_clks_post(project_id): # TODO actually stream the upload data straight to Minio. Currently we can't because # connexion has already read the data before our handler is called! # https://github.com/zalando/connexion/issues/592 - #stream = get_stream() + # stream = get_stream() stream = BytesIO(request.data) log.debug(f"Stream size is {len(request.data)} B, and we expect {(6 + size)* count} B") if len(request.data) != (6 + size) * count: @@ -177,16 +179,15 @@ def get_header_int(header, min=None, max=None): INVALID_HEADER_NUMBER = "Invalid value for {} header".format(header) try: value = int(headers[header]) + if min is not None and value < min: + safe_fail_request(400, INVALID_HEADER_NUMBER) + if max is not None and value > max: + safe_fail_request(400, INVALID_HEADER_NUMBER) + return value except ValueError: safe_fail_request(400, INVALID_HEADER_NUMBER) - if min is not None and value < min: - safe_fail_request(400, INVALID_HEADER_NUMBER) - if max is not None and value > max: - safe_fail_request(400, INVALID_HEADER_NUMBER) - return value - - size = get_header_int('Hash-Size', min=config.MIN_ENCODING_SIZE, max=config.MAX_ENCODING_SIZE) + size = get_header_int('Hash-Size', min=Config.MIN_ENCODING_SIZE, max=Config.MAX_ENCODING_SIZE) count = get_header_int('Hash-Count', min=1) return count, size @@ -218,7 +219,7 @@ def upload_clk_data_binary(project_id, dp_id, raw_stream, count, size=128): """ receipt_token = generate_code() - filename = config.BIN_FILENAME_FMT.format(receipt_token) + filename = Config.BIN_FILENAME_FMT.format(receipt_token) # Set the state to 'pending' in the bloomingdata table with DBConn() as conn: db.insert_encoding_metadata(conn, filename, dp_id, receipt_token, count) @@ -236,7 +237,7 @@ def upload_clk_data_binary(project_id, dp_id, raw_stream, count, size=128): with opentracing.tracer.start_span('save-to-minio', child_of=parent_span) as span: mc = connect_to_object_store() try: - mc.put_object(config.MINIO_BUCKET, filename, data=raw_stream, length=num_bytes) + mc.put_object(Config.MINIO_BUCKET, filename, data=raw_stream, length=num_bytes) except (minio.error.InvalidSizeError, minio.error.InvalidArgumentError, minio.error.ResponseError): logger.info("Mismatch between expected stream length and header info") raise ValueError("Mismatch between expected stream length and header info") @@ -266,7 +267,7 @@ def upload_json_clk_data(dp_id, clk_json, parent_span): receipt_token = generate_code() - filename = config.RAW_FILENAME_FMT.format(receipt_token) + filename = Config.RAW_FILENAME_FMT.format(receipt_token) logger.info("Storing user {} supplied clks from json".format(dp_id)) with opentracing.tracer.start_span('clk-splitting', child_of=parent_span) as span: @@ -283,7 +284,7 @@ def upload_json_clk_data(dp_id, clk_json, parent_span): span.set_tag('filename', filename) mc = connect_to_object_store() mc.put_object( - config.MINIO_BUCKET, + Config.MINIO_BUCKET, filename, data=buffer, length=num_bytes diff --git a/backend/entityservice/views/run/description.py b/backend/entityservice/views/run/description.py index 5ab3c72c..5bd6ec7f 100644 --- a/backend/entityservice/views/run/description.py +++ b/backend/entityservice/views/run/description.py @@ -1,6 +1,7 @@ from flask import request from structlog import get_logger -from entityservice import app, database as db + +from entityservice import database as db from entityservice.views.auth_checks import abort_if_run_doesnt_exist, abort_if_invalid_results_token from entityservice.views.serialization import RunDescription diff --git a/docs/benchmarking.rst b/docs/benchmarking.rst index a084979c..6f042e7e 100644 --- a/docs/benchmarking.rst +++ b/docs/benchmarking.rst @@ -13,7 +13,7 @@ The container/script is configured via environment variables. - ``RESULT_PATH``: full filename to write results file. - ``SCHEMA``: path to the linkage schema file used when creating projects. If not provided it is assumed to be in the data directory. -- ``TIMEOUT ``: this timeout defined the time to wait for the result of a run in seconds. Default is 1200 (20min). +- ``TIMEOUT``: this timeout defined the time to wait for the result of a run in seconds. Default is 1200 (20min). Run Benchmarking Container @@ -83,4 +83,3 @@ Experiments to run can be configured as a simple json document. The default is:: The schema of the experiments can be found in ``benchmarking/schema/experiments.json``. - diff --git a/docs/conf.py b/docs/conf.py index 0fc913fb..28d13ae5 100644 --- a/docs/conf.py +++ b/docs/conf.py @@ -178,11 +178,13 @@ #'anonlink': ('http://anonlink.readthedocs.io/en/latest/', None) } - +# Configure OpenAPI rendering +# https://sphinxcontrib-redoc.readthedocs.io/en/stable/ redoc = [ { 'name': 'Entity Service API', 'page': 'api', 'spec': '_static/swagger.yaml', + 'embed': True } -] \ No newline at end of file +] diff --git a/docs/index.rst b/docs/index.rst index 895640fb..fd2f7f8a 100644 --- a/docs/index.rst +++ b/docs/index.rst @@ -55,3 +55,4 @@ Table Of Contents security deployment development + benchmarking diff --git a/docs/production-deployment.rst b/docs/production-deployment.rst index cb898129..210f98b6 100644 --- a/docs/production-deployment.rst +++ b/docs/production-deployment.rst @@ -161,7 +161,7 @@ At deployment time you can decide to deploy MINIO or instead use an existing ser a trade off between using a local deployment of minio vs S3. In our AWS based experimentation Minio is noticeably faster, but more expensive and less reliable than AWS S3, your own -millage may vary. +mileage may vary. To configure a deployment to use an external object store, simply set ``provision.minio`` to ``false`` and add appropriate connection configuration in the ``minio`` section. For example to use AWS S3 simply provide your access From 438a5bb30d6b6cd653fdda9f20c81cd27f95f0b2 Mon Sep 17 00:00:00 2001 From: Brian Thorne Date: Wed, 5 Dec 2018 20:17:36 +1100 Subject: [PATCH 09/15] Small adjustment to test to expose run_id on failure --- backend/entityservice/tests/test_results_correctness.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/backend/entityservice/tests/test_results_correctness.py b/backend/entityservice/tests/test_results_correctness.py index d5a6ac82..7752000c 100644 --- a/backend/entityservice/tests/test_results_correctness.py +++ b/backend/entityservice/tests/test_results_correctness.py @@ -46,6 +46,7 @@ def test_similarity_scores(requests, the_truth): delete_project(requests, project_data) + def test_mapping(requests, the_truth): project_data, _, _ = create_project_upload_data(requests, the_truth['clks_a'], the_truth['clks_b'], result_type='mapping') @@ -72,9 +73,10 @@ def test_permutation(requests, the_truth): mapping = the_truth['mapping'] for a, b, m in zip(permutation_a, permutation_b, mask_result['mask']): if m == 1: - assert mapping[a] == b + assert a in mapping, f"Unexpected link was included - run {run}" + assert mapping[a] == b, f"Expected link from {a} was incorrect - run {run}" else: - assert a not in mapping + assert a not in mapping, f"Expected link was masked out - run {run}" def apply_permutation(items, permutation): From eb70482113185e6f8858f7a4928ba9c02f0e62ab Mon Sep 17 00:00:00 2001 From: Brian Thorne Date: Wed, 5 Dec 2018 20:19:36 +1100 Subject: [PATCH 10/15] Set worker's logging level to debug in docker-compose --- backend/entityservice/settings.py | 2 +- tools/docker-compose.yml | 6 ++---- 2 files changed, 3 insertions(+), 5 deletions(-) diff --git a/backend/entityservice/settings.py b/backend/entityservice/settings.py index 328dcfc7..b74916d0 100644 --- a/backend/entityservice/settings.py +++ b/backend/entityservice/settings.py @@ -36,7 +36,7 @@ class Config(object): DATABASE_PASSWORD = os.getenv('DATABASE_PASSWORD', '') BROKER_URL = os.getenv('CELERY_BROKER_URL', - 'redis://:{}@{}:6379/0'.format(REDIS_PASSWORD, REDIS_SERVER)) + 'redis://:{}@{}:6379/0'.format(REDIS_PASSWORD, REDIS_SERVER)) CELERY_RESULT_BACKEND = BROKER_URL CELERY_ANNOTATIONS = { diff --git a/tools/docker-compose.yml b/tools/docker-compose.yml index 5d8a8f76..b9f35b08 100644 --- a/tools/docker-compose.yml +++ b/tools/docker-compose.yml @@ -59,10 +59,8 @@ services: entrypoint: celery -A entityservice.async_worker worker --loglevel=info -Q celery,compute,highmemory environment: - DATABASE_PASSWORD=rX%QpV7Xgyrz - - ENTITY_MATCH_THRESHOLD=0.90 - - ENCRYPTION_MIN_KEY_LENGTH=1024 - - DEBUG=false - - LOGGING_LEVEL=INFO + - DEBUG=true + - LOGGING_LEVEL=DEBUG - MINIO_ACCESS_KEY=AKIAIOSFODNN7EXAMPLE - MINIO_SECRET_KEY=wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY - CELERY_ACKS_LATE=true From e84a69f1ee1b4508d0f79b1b498fa7a52240a8d3 Mon Sep 17 00:00:00 2001 From: Brian Thorne Date: Wed, 5 Dec 2018 21:36:31 +1100 Subject: [PATCH 11/15] Fix a race condition in scheduling run/list tasks Finish the database transaction before calling worker task. --- backend/entityservice/views/run/list.py | 15 +++++++++------ 1 file changed, 9 insertions(+), 6 deletions(-) diff --git a/backend/entityservice/views/run/list.py b/backend/entityservice/views/run/list.py index 81b0ce4a..74da4a6b 100644 --- a/backend/entityservice/views/run/list.py +++ b/backend/entityservice/views/run/list.py @@ -39,23 +39,26 @@ def post(project_id, run): run_model = Run.from_json(run, project_id) log.debug("Saving run") + with db.DBConn() as db_conn: run_model.save(db_conn) - project_object = db.get_project(db_conn, project_id) parties_contributed = db.get_number_parties_uploaded(db_conn, project_id) + ready_to_run = parties_contributed == project_object['parties'] log.debug("Expecting {} parties to upload data. Have received {}".format( project_object['parties'], parties_contributed )) - if parties_contributed == project_object['parties']: + if ready_to_run: log.info("Scheduling task to carry out all runs for project {} now".format(project_id)) update_run_mark_queued(db_conn, run_model.run_id) - span = g.flask_tracer.get_span() - span.set_tag("run_id", run_model.run_id) - span.set_tag("project_id", run_model.project_id) - check_for_executable_runs.delay(project_id, serialize_span(span)) else: log.info("Task queued but won't start until CLKs are all uploaded") + + if ready_to_run: + span = g.flask_tracer.get_span() + span.set_tag("run_id", run_model.run_id) + span.set_tag("project_id", run_model.project_id) + check_for_executable_runs.delay(project_id, serialize_span(span)) return RunDescription().dump(run_model), 201 From cfaa12f2f8b1be6dcacdc198de1c57630ac4d7ec Mon Sep 17 00:00:00 2001 From: Brian Thorne Date: Thu, 6 Dec 2018 09:59:21 +1100 Subject: [PATCH 12/15] Another instance of calling a task before committing db transaction --- backend/entityservice/tasks/pre_run_check.py | 17 +++++++++++------ 1 file changed, 11 insertions(+), 6 deletions(-) diff --git a/backend/entityservice/tasks/pre_run_check.py b/backend/entityservice/tasks/pre_run_check.py index d365381c..309137dc 100644 --- a/backend/entityservice/tasks/pre_run_check.py +++ b/backend/entityservice/tasks/pre_run_check.py @@ -16,7 +16,7 @@ def check_for_executable_runs(project_id, parent_span=None): after all dataproviders have uploaded CLKs, and the CLKS are ready. """ log = logger.bind(pid=project_id) - log.info("Checking for runs that need to be executed") + log.debug("Checking for runs that need to be executed") if not clks_uploaded_to_project(project_id, check_data_ready=True): return @@ -27,15 +27,20 @@ def check_for_executable_runs(project_id, parent_span=None): log.warning(e.args[0]) # todo make sure this can be exposed to user return - new_runs = get_created_runs_and_queue(conn, project_id) - log.info("Creating tasks for {} created runs for project {}".format(len(new_runs), project_id)) + + log.debug("Progressing run stages") for qr in new_runs: - run_id = qr[0] - log.info('Queueing run for computation', run_id=run_id) # Record that the run has reached a new stage + run_id = qr[0] progress_stage(conn, run_id) - prerun_check.delay(project_id, run_id, check_for_executable_runs.get_serialized_span()) + + # commit db changes before scheduling following tasks + log.debug("Creating tasks for {} created runs for project {}".format(len(new_runs), project_id)) + for qr in new_runs: + run_id = qr[0] + log.info('Queueing run for computation', run_id=run_id) + prerun_check.delay(project_id, run_id, check_for_executable_runs.get_serialized_span()) def check_and_set_project_encoding_size(project_id, conn): From e7947982abde1eb658e5592d2df5e6cd6f5a058f Mon Sep 17 00:00:00 2001 From: Brian Thorne Date: Fri, 7 Dec 2018 00:14:23 +1100 Subject: [PATCH 13/15] Use row based locking to avoid a race condition Fixes #305 --- backend/entityservice/database/selections.py | 14 ++++++++++++++ backend/entityservice/tasks/run.py | 19 ++++++++++++------- 2 files changed, 26 insertions(+), 7 deletions(-) diff --git a/backend/entityservice/database/selections.py b/backend/entityservice/database/selections.py index 9c65e652..110136db 100644 --- a/backend/entityservice/database/selections.py +++ b/backend/entityservice/database/selections.py @@ -130,6 +130,20 @@ def get_runs(db, project_id): return query_db(db, select_query, [project_id], one=False) +def get_run_state_for_update(db, run_id): + """ + Get the current run state and acquire a row lock + (or fail fast if row already locked) + """ + sql_query = """ + SELECT state from runs + WHERE + run_id = %s + FOR UPDATE NOWAIT + """ + return query_db(db, sql_query, [run_id], one=True)['state'] + + def get_run(db, run_id): sql_query = """ SELECT * from runs diff --git a/backend/entityservice/tasks/run.py b/backend/entityservice/tasks/run.py index 09ae9b7b..2d413b26 100644 --- a/backend/entityservice/tasks/run.py +++ b/backend/entityservice/tasks/run.py @@ -1,4 +1,6 @@ -from entityservice.database import DBConn, check_project_exists, get_run +import psycopg2 + +from entityservice.database import DBConn, check_project_exists, get_run, get_run_state_for_update from entityservice.database import update_run_set_started, get_dataprovider_ids from entityservice.errors import RunDeleted, ProjectDeleted from entityservice.tasks.base_task import TracedTask @@ -13,19 +15,22 @@ def prerun_check(project_id, run_id, parent_span=None): with DBConn() as conn: if not check_project_exists(conn, project_id): - log.info("Project not found. Skipping") + log.debug("Project not found. Skipping") raise ProjectDeleted(project_id) res = get_run(conn, run_id) if res is None: - log.info(f"Run not found. Skipping") + log.debug(f"Run not found. Skipping") raise RunDeleted(run_id) - if res['state'] in {'completed', 'error'}: - log.info("Run is already finished. Skipping") + try: + state = get_run_state_for_update(conn, run_id) + except psycopg2.OperationalError: + log.warning("Run started in another task. Skipping this race.") return - if res['state'] == 'running': - log.info("Run already started. Skipping") + + if state in {'running', 'completed', 'error'}: + log.warning("Run already started. Skipping") return log.debug("Setting run as in progress") From 5902917d7f625c12c96baa6dbf35148462810ec1 Mon Sep 17 00:00:00 2001 From: Brian Thorne Date: Tue, 11 Dec 2018 13:43:46 +1100 Subject: [PATCH 14/15] Bump version to 1.9.2 and add changelog. --- backend/entityservice/VERSION | 2 +- deployment/entity-service/versions.yaml | 6 +++--- docs/changelog.rst | 7 +++++++ docs/conf.py | 2 +- tools/docker-compose.yml | 4 ++-- 5 files changed, 14 insertions(+), 7 deletions(-) diff --git a/backend/entityservice/VERSION b/backend/entityservice/VERSION index ba1e8bf0..b95e90dc 100644 --- a/backend/entityservice/VERSION +++ b/backend/entityservice/VERSION @@ -1 +1 @@ -v1.9.1 +v1.9.2 diff --git a/deployment/entity-service/versions.yaml b/deployment/entity-service/versions.yaml index 83de0cff..e3fa85fb 100644 --- a/deployment/entity-service/versions.yaml +++ b/deployment/entity-service/versions.yaml @@ -4,11 +4,11 @@ api: tag: "v1.4.0-develop" app: image: - tag: "v1.9.1-develop" + tag: "v1.9.2-develop" dbinit: image: - tag: "v1.9.1-develop" + tag: "v1.9.2-develop" workers: image: - tag: "v1.9.1-develop" + tag: "v1.9.2-develop" diff --git a/docs/changelog.rst b/docs/changelog.rst index 01759303..edddc55d 100644 --- a/docs/changelog.rst +++ b/docs/changelog.rst @@ -4,6 +4,13 @@ Changelog ========= +Version 1.9.2 +------------- + +- 2 race conditions have been identified and fixed. +- Integration tests are speed up and more focused. They now fail after the first failure. +- Code tidy-ups to be more pep8 compliant. + Version 1.9.1 ------------- diff --git a/docs/conf.py b/docs/conf.py index 28d13ae5..a0c5cb60 100644 --- a/docs/conf.py +++ b/docs/conf.py @@ -68,7 +68,7 @@ # The short X.Y version. version = '1.9' # The full version, including alpha/beta/rc tags. -release = '1.9.1' +release = '1.9.2' # The language for content autogenerated by Sphinx. Refer to documentation # for a list of supported languages. diff --git a/tools/docker-compose.yml b/tools/docker-compose.yml index b9f35b08..18a78ce8 100644 --- a/tools/docker-compose.yml +++ b/tools/docker-compose.yml @@ -74,8 +74,8 @@ services: nginx: image: quay.io/n1analytics/entity-nginx - #ports: - # - 8851:8851 + ports: + - 8851:8851 depends_on: - backend environment: From e794b1405b3778734c78b48087ca0a1475195369 Mon Sep 17 00:00:00 2001 From: Brian Thorne Date: Tue, 11 Dec 2018 15:03:09 +1100 Subject: [PATCH 15/15] Tweak changelog for v1.9.2 --- docs/changelog.rst | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/changelog.rst b/docs/changelog.rst index edddc55d..7a67c946 100644 --- a/docs/changelog.rst +++ b/docs/changelog.rst @@ -8,7 +8,7 @@ Version 1.9.2 ------------- - 2 race conditions have been identified and fixed. -- Integration tests are speed up and more focused. They now fail after the first failure. +- Integration tests are sped up and more focused. The test suite now fails after the first test failure. - Code tidy-ups to be more pep8 compliant. Version 1.9.1