diff --git a/lemur/certificates/service.py b/lemur/certificates/service.py index 6c77e95f24..7cd3a6c26a 100644 --- a/lemur/certificates/service.py +++ b/lemur/certificates/service.py @@ -481,17 +481,8 @@ def create(**kwargs): """ Creates a new certificate. """ - # Validate destinations do not overlap accounts for the same plugin if "destinations" in kwargs: - dest_plugin_accounts = {} - for dest in kwargs["destinations"]: - plugin_accounts = dest_plugin_accounts.setdefault(dest.plugin_name, {}) - account = get_plugin_option("accountNumber", dest.options) - dest_plugin = plugins.get(dest.plugin_name) - if account in plugin_accounts and not dest_plugin.allow_multiple_per_account(): - raise Exception(f"Duplicate destinations for plugin {dest.plugin_name} and account {account} are not " - f"allowed") - plugin_accounts[account] = True + validate_no_duplicate_destinations(kwargs["destinations"]) try: cert_body, private_key, cert_chain, external_id, csr = mint(**kwargs) @@ -561,6 +552,21 @@ def create(**kwargs): return cert +def validate_no_duplicate_destinations(destinations): + """ + Validates destinations do not overlap accounts for the same plugin (for plugins that don't allow duplicates). + """ + dest_plugin_accounts = {} + for dest in destinations: + plugin_accounts = dest_plugin_accounts.setdefault(dest.plugin_name, {}) + account = get_plugin_option("accountNumber", dest.options) + dest_plugin = plugins.get(dest.plugin_name) + if account in plugin_accounts and not dest_plugin.allow_multiple_per_account(): + raise Exception(f"Duplicate destinations for plugin {dest.plugin_name} and account {account} are not " + f"allowed") + plugin_accounts[account] = True + + def render(args): """ Helper function that allows use to render our REST Api. diff --git a/lemur/certificates/views.py b/lemur/certificates/views.py index 5542996484..e539aebe18 100644 --- a/lemur/certificates/views.py +++ b/lemur/certificates/views.py @@ -11,6 +11,7 @@ from flask import Blueprint, make_response, jsonify, g, current_app from flask_restful import reqparse, Api, inputs +from lemur.certificates.service import validate_no_duplicate_destinations from lemur.common import validators from lemur.plugins.bases.authorization import UnauthorizedError from sentry_sdk import capture_exception @@ -919,6 +920,14 @@ def put(self, certificate_id, data=None): 403, ) + try: + validate_no_duplicate_destinations(data["destinations"]) + except Exception as e: + return ( + dict(message=str(e)), + 400, + ) + for destination in data["destinations"]: if destination.plugin.requires_key: if not cert.private_key: diff --git a/lemur/tests/conftest.py b/lemur/tests/conftest.py index a14fd1faf9..238d75d92a 100644 --- a/lemur/tests/conftest.py +++ b/lemur/tests/conftest.py @@ -38,6 +38,7 @@ CACertificateFactory, DnsProviderFactory, OptionalCNAuthorityFactory, + DuplicateAllowedDestinationFactory, ) @@ -145,6 +146,13 @@ def destination(session): return d +@pytest.fixture +def duplicate_allowed_destination(session): + d = DuplicateAllowedDestinationFactory() + session.commit() + return d + + @pytest.fixture def source(session): s = SourceFactory() @@ -283,6 +291,15 @@ def destination_plugin(): return TestDestinationPlugin +@pytest.fixture +def duplicate_allowed_destination_plugin(): + from lemur.plugins.base import register + from .plugins.destination_plugin import TestDestinationPluginDuplicatesAllowed + + register(TestDestinationPluginDuplicatesAllowed) + return TestDestinationPluginDuplicatesAllowed + + @pytest.fixture def source_plugin(): from lemur.plugins.base import register diff --git a/lemur/tests/factories.py b/lemur/tests/factories.py index e413639a8c..3cc994422d 100644 --- a/lemur/tests/factories.py +++ b/lemur/tests/factories.py @@ -209,6 +209,22 @@ class DestinationFactory(BaseFactory): plugin_name = "test-destination" label = Sequence(lambda n: "destination{0}".format(n)) + options = [{"name": "exportPlugin", "type": "export-plugin", "value": {"plugin_options": [{}]}}, + {"name": "accountNumber", "type": "str", "value": "1234567890"}] + + class Meta: + """Factory Configuration.""" + + model = Destination + + +class DuplicateAllowedDestinationFactory(BaseFactory): + """Destination factory.""" + + plugin_name = "test-destination-dupe-allowed" + label = Sequence(lambda n: "duplicate-allowed-destination{0}".format(n)) + options = [{"name": "exportPlugin", "type": "export-plugin", "value": {"plugin_options": [{}]}}, + {"name": "accountNumber", "type": "str", "value": "1234567890"}] class Meta: """Factory Configuration.""" diff --git a/lemur/tests/plugins/destination_plugin.py b/lemur/tests/plugins/destination_plugin.py index d1eb671179..44e0f53398 100644 --- a/lemur/tests/plugins/destination_plugin.py +++ b/lemur/tests/plugins/destination_plugin.py @@ -14,3 +14,21 @@ def __init__(self, *args, **kwargs): def upload(self, name, body, private_key, cert_chain, options, **kwargs): return + + +class TestDestinationPluginDuplicatesAllowed(DestinationPlugin): + title = "Test with Duplicates Allowed" + slug = "test-destination-dupe-allowed" + description = "Enables testing; allows duplicates" + + author = "Jasmine Schladen" + author_url = "https://github.com/netflix/lemur.git" + + def __init__(self, *args, **kwargs): + super(TestDestinationPluginDuplicatesAllowed, self).__init__(*args, **kwargs) + + def allow_multiple_per_account(self): + return True + + def upload(self, name, body, private_key, cert_chain, options, **kwargs): + return diff --git a/lemur/tests/test_certificates.py b/lemur/tests/test_certificates.py index 2a5ea927be..cc789900b9 100644 --- a/lemur/tests/test_certificates.py +++ b/lemur/tests/test_certificates.py @@ -6,24 +6,24 @@ import threading from http.server import HTTPServer, SimpleHTTPRequestHandler from tempfile import NamedTemporaryFile +from unittest.mock import patch import arrow import pytest from cryptography import x509 -from cryptography.x509.oid import ExtensionOID from cryptography.hazmat.backends import default_backend -from marshmallow import ValidationError +from cryptography.x509.oid import ExtensionOID from freezegun import freeze_time -from unittest.mock import patch - +from marshmallow import ValidationError from sqlalchemy.testing import fail -from lemur.certificates.service import create_csr, identify_and_persist_expiring_deployed_certificates +from lemur.certificates.service import create_csr, identify_and_persist_expiring_deployed_certificates, \ + reissue_certificate from lemur.certificates.views import * # noqa from lemur.common import utils from lemur.domains.models import Domain +from lemur.tests.factories import DestinationFactory, DuplicateAllowedDestinationFactory from lemur.tests.test_messaging import create_cert_that_expires_in_days - from lemur.tests.vectors import ( VALID_ADMIN_API_TOKEN, VALID_ADMIN_HEADER_TOKEN, @@ -1821,3 +1821,89 @@ def test_query_common_name(session): cn2_valid_certs = query_common_name(cn2, {"owner": "", "page": "", "count": ""}) assert len(cn2_valid_certs) == 1 + + +def test_reissue_certificate_with_duplicate_destinations_not_allowed(session, + logged_in_user, + crypto_authority, + issuer_plugin, + destination_plugin, + certificate): + # test-authority would return a mismatching private key, so use 'cryptography-issuer' plugin instead. + certificate.authority = crypto_authority + + destination1 = DestinationFactory() + destination2 = DestinationFactory() + certificate.destinations.append(destination1) + certificate.destinations.append(destination2) + with pytest.raises(Exception, match='Duplicate destinations for plugin test-destination and account 1234567890 ' + 'are not allowed'): + reissue_certificate(certificate) + + +def test_reissue_certificate_with_duplicate_destinations_allowed(session, + logged_in_user, + crypto_authority, + issuer_plugin, + duplicate_allowed_destination_plugin, + certificate): + # test-authority would return a mismatching private key, so use 'cryptography-issuer' plugin instead. + certificate.authority = crypto_authority + + destination1 = DuplicateAllowedDestinationFactory() + destination2 = DuplicateAllowedDestinationFactory() + certificate.destinations.append(destination1) + certificate.destinations.append(destination2) + new_cert = reissue_certificate(certificate) + assert new_cert + assert len(new_cert.destinations) == 2 + assert destination1 in new_cert.destinations + assert destination2 in new_cert.destinations + + +def test_certificate_update_duplicate_destinations_not_allowed(client, crypto_authority, certificate, issuer_plugin, + destination_plugin): + # test-authority would return a mismatching private key, so use 'cryptography-issuer' plugin instead. + certificate.authority = crypto_authority + + destination1 = DestinationFactory() + destination2 = DestinationFactory() + certificate.destinations.append(destination1) + certificate.destinations.append(destination2) + + resp = client.put( + api.url_for(Certificates, certificate_id=certificate.id), + data=json.dumps( + certificate_output_schema.dump(certificate).data + ), + headers=VALID_ADMIN_HEADER_TOKEN, + ) + assert resp.status_code == 400 + assert 'Duplicate destinations for plugin test-destination and account 1234567890 are not allowed' \ + in resp.json['message'] + + +def test_certificate_update_duplicate_destinations_allowed(client, crypto_authority, certificate, issuer_plugin, + duplicate_allowed_destination_plugin): + from lemur.destinations.schemas import destination_output_schema + + # test-authority would return a mismatching private key, so use 'cryptography-issuer' plugin instead. + certificate.authority = crypto_authority + + destination1 = DuplicateAllowedDestinationFactory() + destination2 = DuplicateAllowedDestinationFactory() + certificate.destinations.append(destination1) + certificate.destinations.append(destination2) + + resp = client.put( + api.url_for(Certificates, certificate_id=certificate.id), + data=json.dumps( + certificate_output_schema.dump(certificate).data + ), + headers=VALID_ADMIN_HEADER_TOKEN, + ) + assert resp.status_code == 200 + resp_cert = resp.json + assert len(resp_cert['destinations']) == 2 + assert destination_output_schema.dump(destination1).data in resp_cert['destinations'] + assert destination_output_schema.dump(destination2).data in resp_cert['destinations']