From 628a7784ed4037307d7c5688d5de6533c270cb75 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Cl=C3=A9ment=20GUICHARD?= Date: Tue, 21 Dec 2021 15:53:53 +0100 Subject: [PATCH] test: add tests for credentials feature (#60) --- tests/context.py | 12 +- tests/test_cli.py | 281 ++++++++++++++++++++++++++++++++++- tests/test_config.py | 119 ++++++++++++++- tests/units/test_security.py | 97 ++++++++++++ tests/units/test_utils.py | 27 ++++ 5 files changed, 533 insertions(+), 3 deletions(-) create mode 100644 tests/units/test_security.py diff --git a/tests/context.py b/tests/context.py index 96aca67ba..0f9f3fe9d 100644 --- a/tests/context.py +++ b/tests/context.py @@ -32,7 +32,15 @@ from eodag.api.product.metadata_mapping import format_metadata from eodag.api.search_result import SearchResult from eodag.cli import download, eodag, list_pt, search_crunch -from eodag.config import load_default_config, merge_configs +from eodag.config import ( + load_default_config, + merge_configs, + get_provider_credentials, + load_providers_credentials, + override_credentials_from_mapping, + save_providers_credentials, + set_provider_credentials_key, +) from eodag.plugins.authentication.base import Authentication from eodag.plugins.crunch.filter_date import FilterDate from eodag.plugins.crunch.filter_latest_tpl_name import FilterLatestByName @@ -54,6 +62,7 @@ ProgressCallback, uri_to_path, ) +from eodag.utils.cli import ask_confirmation from eodag.utils.exceptions import ( AddressNotFound, AuthenticationError, @@ -65,5 +74,6 @@ UnsupportedProvider, ValidationError, ) +from eodag.utils.security import Crypto, CryptoKey from eodag.utils.stac_reader import fetch_stac_items from tests import TESTS_DOWNLOAD_PATH, TEST_RESOURCES_PATH diff --git a/tests/test_cli.py b/tests/test_cli.py index 161d69a77..0f32de2a8 100644 --- a/tests/test_cli.py +++ b/tests/test_cli.py @@ -18,6 +18,7 @@ import os import random +import tempfile import unittest from contextlib import contextmanager @@ -329,7 +330,7 @@ def test_eodag_search_with_cruncher(self, dag): api_obj.crunch.assert_called_with( search_results, search_criteria=criteria, - **{cruncher: {"minimum_overlap": "10"}} + **{cruncher: {"minimum_overlap": "10"}}, ) @mock.patch("eodag.cli.EODataAccessGateway", autospec=True) @@ -496,3 +497,281 @@ def test_eodag_download_wrongcredentials(self, download): self.assertEqual(result.exit_code, 1) self.assertIsInstance(result.exception, AuthenticationError) self.assertEqual(download.call_count, 1) + + def test_eodag_credentials_provider_not_found(self): + with tempfile.NamedTemporaryFile() as f: + result = self.runner.invoke( + eodag, ["credentials", "--creds", f.name, "notexists"] + ) + self.assertEqual(result.exit_code, -1) + + @mock.patch("getpass.getpass") + def test_eodag_credentials_set(self, getpass_mock): + getpass_mock.return_value = "mocked_value" + with tempfile.NamedTemporaryFile() as f: + result = self.runner.invoke( + eodag, ["credentials", "--creds", f.name, "sobloo"] + ) + self.assertEqual(result.exit_code, 0) + + @mock.patch("builtins.input") + def test_eodag_credentials_already_saved_not_override(self, input_mock): + input_mock.return_value = "n" + with tempfile.NamedTemporaryFile() as f: + insert_creds = self.runner.invoke( + eodag, + [ + "credentials", + "--creds", + f.name, + "sobloo", + "-y", + "-s", + "apikey=abcd", + ], + ) + insert_creds2 = self.runner.invoke( + eodag, ["credentials", "--creds", f.name, "sobloo"] + ) + self.assertEqual(insert_creds.exit_code, 0) + self.assertEqual(insert_creds2.exit_code, 1) + + @mock.patch("getpass.getpass") + @mock.patch("builtins.input") + def test_eodag_credentials_already_saved_override(self, input_mock, getpass_mock): + getpass_mock.return_value = "mocked_value" + input_mock.return_value = "y" + with tempfile.NamedTemporaryFile() as f: + insert_creds = self.runner.invoke( + eodag, + [ + "credentials", + "--creds", + f.name, + "sobloo", + "-y", + "-s", + "apikey=abcd", + ], + ) + insert_creds2 = self.runner.invoke( + eodag, ["credentials", "--creds", f.name, "sobloo"] + ) + self.assertEqual(insert_creds.exit_code, 0) + self.assertEqual(insert_creds2.exit_code, 0) + + def test_eodag_credentials_set_from_cli(self): + with tempfile.NamedTemporaryFile() as f: + result = self.runner.invoke( + eodag, + [ + "credentials", + "--creds", + f.name, + "peps", + "-y", + "-s", + "username=user1", + "-s", + "password=secret", + ], + ) + self.assertEqual(result.exit_code, 0) + + def test_eodag_credentials_set_from_cli_missing_field(self): + with tempfile.NamedTemporaryFile() as f: + result = self.runner.invoke( + eodag, + [ + "credentials", + "--creds", + f.name, + "peps", + "-y", + "-s", + "username=user1", + ], + ) + self.assertEqual(result.exit_code, -1) + + def test_eodag_credentials_list(self): + result = self.runner.invoke(eodag, ["credentials", "peps", "-l"]) + self.assertTrue("username" in result.stdout) + self.assertTrue("password" in result.stdout) + self.assertEqual(result.exit_code, 0) + + def test_eodag_credentials_exists(self): + with tempfile.NamedTemporaryFile() as f: + check_not_exists = self.runner.invoke( + eodag, ["credentials", "--creds", f.name, "sobloo", "-e"] + ) + insert_creds = self.runner.invoke( + eodag, + ["credentials", "--creds", f.name, "sobloo", "-y", "-s", "apikey=abcd"], + ) + check_exists = self.runner.invoke( + eodag, ["credentials", "--creds", f.name, "sobloo", "-e"] + ) + self.assertEqual(check_not_exists.exit_code, -1) + self.assertEqual(insert_creds.exit_code, 0) + self.assertEqual(check_exists.exit_code, 0) + + def test_eodag_credentials_read_no_credentials_found(self): + with tempfile.NamedTemporaryFile() as f: + result = self.runner.invoke( + eodag, ["credentials", "--creds", f.name, "sobloo", "-ry"] + ) + self.assertEqual(result.exit_code, -1) + + def test_eodag_credentials_read_autoconfirm(self): + apikey = "abcd" + with tempfile.NamedTemporaryFile() as f: + insert_creds = self.runner.invoke( + eodag, + [ + "credentials", + "--creds", + f.name, + "sobloo", + "-y", + "-s", + f"apikey={apikey}", + ], + ) + read_creds = self.runner.invoke( + eodag, ["credentials", "--creds", f.name, "sobloo", "-ry"] + ) + self.assertEqual(insert_creds.exit_code, 0) + self.assertEqual(read_creds.exit_code, 0) + self.assertTrue(apikey in read_creds.stdout) + + @mock.patch("builtins.input") + def test_eodag_credentials_read_confirm(self, input_mock): + input_mock.return_value = "y" + apikey = "abcd" + with tempfile.NamedTemporaryFile() as f: + insert_creds = self.runner.invoke( + eodag, + [ + "credentials", + "--creds", + f.name, + "sobloo", + "-y", + "-s", + f"apikey={apikey}", + ], + ) + read_creds = self.runner.invoke( + eodag, ["credentials", "--creds", f.name, "sobloo", "-r"] + ) + self.assertEqual(insert_creds.exit_code, 0) + self.assertEqual(read_creds.exit_code, 0) + self.assertTrue(apikey in read_creds.stdout) + + @mock.patch("builtins.input") + def test_eodag_credentials_read_not_confirm(self, input_mock): + input_mock.return_value = "n" + apikey = "abcd" + with tempfile.NamedTemporaryFile() as f: + insert_creds = self.runner.invoke( + eodag, + [ + "credentials", + "--creds", + f.name, + "sobloo", + "-y", + "-s", + f"apikey={apikey}", + ], + ) + read_creds = self.runner.invoke( + eodag, ["credentials", "--creds", f.name, "sobloo", "-r"] + ) + self.assertEqual(insert_creds.exit_code, 0) + self.assertEqual(read_creds.exit_code, 1) + self.assertTrue(apikey not in read_creds.stdout) + + def test_eodag_credentials_delete_no_credentials_found(self): + with tempfile.NamedTemporaryFile() as f: + delete_creds = self.runner.invoke( + eodag, ["credentials", "--creds", f.name, "sobloo", "-d"] + ) + self.assertEqual(delete_creds.exit_code, -1) + + def test_eodag_credentials_delete_autoconfirm(self): + with tempfile.NamedTemporaryFile() as f: + insert_creds = self.runner.invoke( + eodag, + [ + "credentials", + "--creds", + f.name, + "sobloo", + "-y", + "-s", + "apikey=abcd", + ], + ) + delete_creds = self.runner.invoke( + eodag, ["credentials", "--creds", f.name, "sobloo", "-dy"] + ) + check_exists = self.runner.invoke( + eodag, ["credentials", "--creds", f.name, "sobloo", "-e"] + ) + self.assertEqual(insert_creds.exit_code, 0) + self.assertEqual(delete_creds.exit_code, 0) + self.assertEqual(check_exists.exit_code, -1) + + @mock.patch("builtins.input") + def test_eodag_credentials_delete_confirm(self, input_mock): + input_mock.return_value = "y" + with tempfile.NamedTemporaryFile() as f: + insert_creds = self.runner.invoke( + eodag, + [ + "credentials", + "--creds", + f.name, + "sobloo", + "-y", + "-s", + "apikey=abcd", + ], + ) + delete_creds = self.runner.invoke( + eodag, ["credentials", "--creds", f.name, "sobloo", "-d"] + ) + check_exists = self.runner.invoke( + eodag, ["credentials", "--creds", f.name, "sobloo", "-e"] + ) + self.assertEqual(insert_creds.exit_code, 0) + self.assertEqual(delete_creds.exit_code, 0) + self.assertEqual(check_exists.exit_code, -1) + + @mock.patch("builtins.input") + def test_eodag_credentials_delete_not_confirm(self, input_mock): + input_mock.return_value = "n" + with tempfile.NamedTemporaryFile() as f: + insert_creds = self.runner.invoke( + eodag, + [ + "credentials", + "--creds", + f.name, + "sobloo", + "-y", + "-s", + "apikey=abcd", + ], + ) + delete_creds = self.runner.invoke( + eodag, ["credentials", "--creds", f.name, "sobloo", "-d"] + ) + check_exists = self.runner.invoke( + eodag, ["credentials", "--creds", f.name, "sobloo", "-e"] + ) + self.assertEqual(insert_creds.exit_code, 0) + self.assertEqual(delete_creds.exit_code, 1) + self.assertEqual(check_exists.exit_code, 0) diff --git a/tests/test_config.py b/tests/test_config.py index 937eb9311..9f8bfe36f 100644 --- a/tests/test_config.py +++ b/tests/test_config.py @@ -17,13 +17,25 @@ # limitations under the License. import os +import pickle import tempfile import unittest from io import StringIO import yaml.parser -from tests.context import ValidationError, config, merge_configs +from tests.context import ( + Crypto, + CryptoKey, + ValidationError, + config, + get_provider_credentials, + load_providers_credentials, + merge_configs, + override_credentials_from_mapping, + save_providers_credentials, + set_provider_credentials_key, +) class TestProviderConfig(unittest.TestCase): @@ -426,3 +438,108 @@ def test_override_config_from_env(self): peps_conf = default_config["peps"] self.assertEqual(peps_conf.download.outputs_prefix, "/data") + + def test_get_provider_credentials(self): + secret_apikey = "test" + + provider1_stream = StringIO( + """!provider + name: provider1 + api: !plugin + type: MyPluginClass + """ + ) + provider1_config = yaml.load(provider1_stream, Loader=yaml.Loader) + provider1_credentials = get_provider_credentials(provider1_config) + self.assertTrue(isinstance(provider1_credentials, dict)) + self.assertTrue("apikey" not in provider1_credentials) + self.assertTrue(provider1_credentials.get("apikey") != secret_apikey) + + provider2_stream = StringIO( + f"""!provider + name: provider2 + api: !plugin + type: MyPluginClass + credentials: + apikey: {secret_apikey} + """ + ) + provider2_config = yaml.load(provider2_stream, Loader=yaml.Loader) + provider2_credentials = get_provider_credentials(provider2_config) + self.assertTrue(isinstance(provider2_credentials, dict)) + self.assertTrue("apikey" in provider2_credentials) + self.assertTrue(provider2_credentials.get("apikey") == secret_apikey) + + provider3_stream = StringIO( + f"""!provider + name: provider3 + auth: !plugin + type: MyPluginClass + credentials: + apikey: {secret_apikey} + """ + ) + provider3_config = yaml.load(provider3_stream, Loader=yaml.Loader) + provider3_credentials = get_provider_credentials(provider3_config) + self.assertTrue(isinstance(provider3_credentials, dict)) + self.assertTrue("apikey" in provider3_credentials) + self.assertTrue(provider3_credentials.get("apikey") == secret_apikey) + + def test_override_credentials_from_mapping(self): + crypto = Crypto() + fake_username = "user1" + fake_password = "passw" + fake_creds = { + "peps": { + "username": crypto.encrypt(fake_username), + "password": crypto.encrypt(fake_password), + }, + } + set_provider_credentials_key(fake_creds["peps"], crypto.key) + + default_config = config.load_default_config() + + fake_raw_cred = "raw" + default_config["sobloo"].auth.credentials["apikey"] = fake_raw_cred + + override_credentials_from_mapping(default_config, fake_creds) + + peps = get_provider_credentials(default_config["peps"]) + sobloo = get_provider_credentials(default_config["sobloo"]) + self.assertEqual(peps.get("username"), fake_username) + self.assertEqual(peps.get("password"), fake_password) + self.assertEqual(sobloo.get("apikey"), fake_raw_cred) + + def test_set_provider_credentials_key(self): + key = CryptoKey() + fake_provider_creds = {"apikey": "something"} + + count_before = len(fake_provider_creds) + set_provider_credentials_key(fake_provider_creds, key) + count_after = len(fake_provider_creds) + self.assertEqual(count_before + 1, count_after) + self.assertTrue(key.as_str() in fake_provider_creds.values()) + + def test_load_providers_credentials(self): + fake_creds = {"peps": {"apikey": "something"}} + with tempfile.NamedTemporaryFile() as tmpfile: + # File is not a pickle file, function returns {} + empty_loaded_creds = load_providers_credentials(tmpfile.name) + # We make it a pickle file with fake credentials data + pickle.dump(fake_creds, tmpfile) + tmpfile.flush() + # Then load the credentials + loaded_creds = load_providers_credentials(tmpfile.name) + self.assertTrue(isinstance(empty_loaded_creds, dict)) + self.assertTrue(isinstance(loaded_creds, dict)) + self.assertEqual(empty_loaded_creds, {}) + self.assertEqual(fake_creds, loaded_creds) + + def test_save_providers_credentials(self): + fake_creds = {"peps": {}} + with tempfile.NamedTemporaryFile() as tmpfile: + # File is empty because we wrote nothing inside + self.assertTrue(os.stat(tmpfile.name).st_size == 0) + save_providers_credentials(tmpfile.name, fake_creds) + # File is no longer empty because we wrote the credentials inside + self.assertTrue(os.stat(tmpfile.name).st_size != 0) diff --git a/tests/units/test_security.py b/tests/units/test_security.py new file mode 100644 index 000000000..22e3c1ab8 --- /dev/null +++ b/tests/units/test_security.py @@ -0,0 +1,97 @@ +# -*- coding: utf-8 -*- +# Copyright 2021, CS GROUP - France, https://www.csgroup.eu/ +# +# This file is part of EODAG project +# https://www.github.com/CS-SI/EODAG +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import unittest + +from tests.context import Crypto, CryptoKey + + +class TestCryptoKey(unittest.TestCase): + def test_crypto_key_default_build(self): + try: + key = CryptoKey() + self.assertTrue(isinstance(key.value, bytes)) + except Exception as e: + self.fail(f"Could not build eodag.utils.security.CryptoKey object: {e}") + + def test_crypto_key_build_with_str(self): + try: + strkey = "test" + key1 = CryptoKey(strkey=strkey) + key2 = CryptoKey.from_str(strkey) + self.assertTrue(isinstance(key1.value, bytes)) + self.assertTrue(isinstance(key2.value, bytes)) + self.assertEqual(key1, key2) + except Exception as e: + self.fail( + f"Could not build eodag.utils.security.CryptoKey object with strkey: {e}" + ) + + def test_crypto_key_build_with_bytes(self): + try: + byteskey = b"test" + key1 = CryptoKey(byteskey=byteskey) + key2 = CryptoKey.from_bytes(byteskey) + print(key1.value, key2.value) + self.assertTrue(isinstance(key1.value, bytes)) + self.assertTrue(isinstance(key2.value, bytes)) + self.assertEqual(key1, key2) + except Exception as e: + self.fail( + f"Could not build eodag.utils.security.CryptoKey object with byteskey: {e}" + ) + + def test_crypto_key_as_str(self): + try: + strkey = "test" + key = CryptoKey(strkey=strkey) + self.assertTrue(isinstance(key.value, bytes)) + self.assertEqual(key.as_str(), strkey) + except Exception as e: + self.fail( + f"Could not build eodag.utils.security.CryptoKey object with byteskey: {e}" + ) + + +class TestCrypto(unittest.TestCase): + def test_crypto_build(self): + key = CryptoKey() + strkey = key.as_str() + crypto1 = Crypto(key=key) + crypto2 = Crypto.from_key(key) + crypto3 = Crypto.from_strkey(strkey) + self.assertTrue(crypto1 == crypto2 == crypto3) + # When key is None, a new CryptoKey is built + crypto4 = Crypto() + crypto5 = Crypto(key=None) + self.assertIsNotNone(crypto4.key) + self.assertIsNotNone(crypto5.key) + # Generating the strkey from CryptoKey + # is the safe way of creating an strkey + # or you might get an error with another + # arbitrary str + with self.assertRaises(ValueError): + Crypto.from_strkey("test") + + def test_crypto_encrypt_decrypt(self): + crypto = Crypto() + text = "Hello World!" + encrypted = crypto.encrypt(text) + decrypted = crypto.decrypt(encrypted) + self.assertNotEqual(text, encrypted) + self.assertEqual(text, decrypted) diff --git a/tests/units/test_utils.py b/tests/units/test_utils.py index 62a5b7133..fdd610a84 100644 --- a/tests/units/test_utils.py +++ b/tests/units/test_utils.py @@ -16,14 +16,17 @@ # See the License for the specific language governing permissions and # limitations under the License. +import builtins import sys import unittest from contextlib import closing from datetime import datetime from io import StringIO +from unittest.mock import patch from tests.context import ( ProgressCallback, + ask_confirmation, get_timestamp, merge_mappings, path_to_uri, @@ -152,3 +155,27 @@ def test_merge_mappings(self): mapping = {"keyA": True} merge_mappings(mapping, {"keya": "bar"}) self.assertEqual(mapping, {"keyA": True}) + + def test_ask_confirmation_answer_y(self): + """The method :meth:`~eodag.utils.cli.ask_confirmation` returns True if input is 'y'""" + with patch.object(builtins, "input", lambda _: "y"): + confirm = ask_confirmation("test ?") + self.assertTrue(confirm) + + def test_ask_confirmation_answer_n(self): + """The method :meth:`~eodag.utils.cli.ask_confirmation` returns False if input is 'n'""" + with patch.object(builtins, "input", lambda _: "n"): + confirm = ask_confirmation("test ?") + self.assertFalse(confirm) + + def test_ask_confirmation_answer_something_else(self): + """The method :meth:`~eodag.utils.cli.ask_confirmation` returns False if input response is neither 'y'/'n'""" + with patch.object(builtins, "input", lambda _: "something else"): + confirm = ask_confirmation("test ?") + self.assertFalse(confirm) + + def test_ask_confirmation_no_answer(self): + """The method :meth:`~eodag.utils.cli.ask_confirmation` returns False if input response is empty""" + with patch.object(builtins, "input", lambda _: ""): + confirm = ask_confirmation("test ?") + self.assertFalse(confirm)