From 79072d761e3e1e4fcb00ec324bb6c3f4d13f7dbd Mon Sep 17 00:00:00 2001 From: Stephen Date: Tue, 26 May 2020 13:56:12 -0700 Subject: [PATCH] -Added test cases for changes made to client.py --- ...tificate-example-encrypted-private-key.der | Bin 0 -> 879 bytes ...vate-key-example-encrypted-private-key.pem | 30 +++++++ tests/test_crypto_connect.py | 84 +++++++++++++++--- 3 files changed, 101 insertions(+), 13 deletions(-) create mode 100644 examples/certificates/peer-certificate-example-encrypted-private-key.der create mode 100644 examples/certificates/peer-private-key-example-encrypted-private-key.pem diff --git a/examples/certificates/peer-certificate-example-encrypted-private-key.der b/examples/certificates/peer-certificate-example-encrypted-private-key.der new file mode 100644 index 0000000000000000000000000000000000000000..e0d40cedb58b67574808a5b05ce744489072fea4 GIT binary patch literal 879 zcmXqLV$L>bVhUcs%*4pVBqH7vYgRh#pi1-Ul7L&GE6UG#hzJ?*vT6sPP1 z$@7}_y}CQ?0Q19V4}~8-3$YWj2(+qC-oN~AOt#XORi$f1YJSQ5o4ri#iT=)d$=$mT zNlZMjeWIl4UVXR4i(}s=*X2zq+Tz^U$i&RZz_>WrAkcsx7-O=+jEw(TSPhtg6gUcH zm02VV#2Q4RM7TSdF7G}T=yT}AOV|7#Ebc9<4P-$&_*lePuuG$+8(^?AGDL(LS{+;a zoOR#xwZ#GAN7WZj{B}!n_EhITOG4j2+)&?KnYQfo-Z(41l?kzW&A#X4@*fNGzN_2t z`op@y>+A(q6)Mh=jr~W8qMH|dEBR#nXno-)vdi zVs}AVYFQu4gNF+h3+`AI`e|^9J#DO8#x;pE)9(G#<)t#R%TFB0T=w~7WBSS;{EA|k I68= (3, 6): from asyncio import TimeoutError else: @@ -45,6 +46,31 @@ "private_key": f"{EXAMPLE_PATH}certificates/peer-private-key-example-2.pem" } +encrypted_private_key_peer_creds = { + "private_key": f"{EXAMPLE_PATH}certificates/peer-private-key-example-encrypted-private-key.pem", + "certificate": f"{EXAMPLE_PATH}certificates/peer-certificate-example-encrypted-private-key.der", + "password": b"password" +} + + +@pytest.fixture(params=srv_crypto_params) +async def srv_crypto_encrypted_key_one_cert(request): + srv = Server() + peer_certificate = encrypted_private_key_peer_creds["certificate"] + cert_handler = CertificateHandler() + key, cert = request.param + await cert_handler.trust_certificate(peer_certificate) + await srv.init() + srv.set_endpoint(uri_crypto_cert) + srv.set_security_policy([ua.SecurityPolicyType.Basic256Sha256_SignAndEncrypt], + certificate_handler=cert_handler) + await srv.load_certificate(cert) + await srv.load_private_key(key) + await srv.start() + yield srv + # stop the server + await srv.stop() + @pytest.fixture(params=srv_crypto_params) async def srv_crypto_all_certs(request): @@ -101,12 +127,14 @@ async def test_nocrypto(srv_no_crypto): async def test_nocrypto_fail(srv_no_crypto): clt = Client(uri_no_crypto) with pytest.raises(ua.UaError): - await clt.set_security_string(f"Basic256Sha256,Sign,{EXAMPLE_PATH}certificate-example.der,{EXAMPLE_PATH}private-key-example.pem") + await clt.set_security_string( + f"Basic256Sha256,Sign,{EXAMPLE_PATH}certificate-example.der,{EXAMPLE_PATH}private-key-example.pem") async def test_basic256(srv_crypto_all_certs): clt = Client(uri_crypto) - await clt.set_security_string(f"Basic256Sha256,Sign,{EXAMPLE_PATH}certificate-example.der,{EXAMPLE_PATH}private-key-example.pem") + await clt.set_security_string( + f"Basic256Sha256,Sign,{EXAMPLE_PATH}certificate-example.der,{EXAMPLE_PATH}private-key-example.pem") async with clt: assert await clt.nodes.objects.get_children() @@ -114,7 +142,7 @@ async def test_basic256(srv_crypto_all_certs): async def test_basic256_encrypt(srv_crypto_all_certs): clt = Client(uri_crypto) await clt.set_security_string( - f"Basic256Sha256,SignAndEncrypt,{EXAMPLE_PATH}certificate-example.der,{EXAMPLE_PATH}private-key-example.pem") + f"Basic256Sha256,SignAndEncrypt,{EXAMPLE_PATH}certificate-example.der,{EXAMPLE_PATH}private-key-example.pem") async with clt: assert await clt.nodes.objects.get_children() @@ -122,12 +150,12 @@ async def test_basic256_encrypt(srv_crypto_all_certs): async def test_basic256_encrypt_success(srv_crypto_all_certs): clt = Client(uri_crypto) await clt.set_security( - security_policies.SecurityPolicyBasic256Sha256, - f"{EXAMPLE_PATH}certificate-example.der", - f"{EXAMPLE_PATH}private-key-example.pem", - None, - ua.MessageSecurityMode.SignAndEncrypt - ) + security_policies.SecurityPolicyBasic256Sha256, + f"{EXAMPLE_PATH}certificate-example.der", + f"{EXAMPLE_PATH}private-key-example.pem", + None, + mode=ua.MessageSecurityMode.SignAndEncrypt + ) async with clt: assert await clt.nodes.objects.get_children() @@ -141,7 +169,7 @@ async def test_basic256_encrypt_fail(srv_crypto_all_certs): f"{EXAMPLE_PATH}certificate-example.der", f"{EXAMPLE_PATH}private-key-example.pem", None, - ua.MessageSecurityMode.None_ + mode=ua.MessageSecurityMode.None_ ) @@ -152,7 +180,21 @@ async def test_certificate_handling_success(srv_crypto_one_cert): peer_creds['certificate'], peer_creds['private_key'], None, - ua.MessageSecurityMode.SignAndEncrypt + mode=ua.MessageSecurityMode.SignAndEncrypt + ) + async with clt: + assert await clt.get_objects_node().get_children() + + +async def test_encrypted_private_key_handling_success(srv_crypto_encrypted_key_one_cert): + clt = Client(uri_crypto_cert) + await clt.set_security( + security_policies.SecurityPolicyBasic256Sha256, + encrypted_private_key_peer_creds['certificate'], + encrypted_private_key_peer_creds['private_key'], + encrypted_private_key_peer_creds['password'], + None, + mode=ua.MessageSecurityMode.SignAndEncrypt ) async with clt: assert await clt.get_objects_node().get_children() @@ -167,7 +209,23 @@ async def test_certificate_handling_failure(srv_crypto_one_cert): unauthorized_peer_creds['certificate'], unauthorized_peer_creds['private_key'], None, - ua.MessageSecurityMode.SignAndEncrypt + mode=ua.MessageSecurityMode.SignAndEncrypt + ) + async with clt: + assert await clt.get_objects_node().get_children() + + +async def test_encrypted_private_key_handling_failure(srv_crypto_one_cert): + clt = Client(uri_crypto_cert) + + with pytest.raises(TimeoutError): + await clt.set_security( + security_policies.SecurityPolicyBasic256Sha256, + unauthorized_peer_creds['certificate'], + unauthorized_peer_creds['private_key'], + None, # Pass None for an empty password to test incorrect password. + None, + mode=ua.MessageSecurityMode.SignAndEncrypt ) async with clt: assert await clt.get_objects_node().get_children() @@ -181,7 +239,7 @@ async def test_certificate_handling_mismatched_creds(srv_crypto_one_cert): peer_creds['certificate'], unauthorized_peer_creds['private_key'], None, - ua.MessageSecurityMode.SignAndEncrypt + mode=ua.MessageSecurityMode.SignAndEncrypt ) async with clt: assert await clt.get_objects_node().get_children()