Skip to content

Commit

Permalink
Precheck supported TLS versions to reduce sslyze scan scope
Browse files Browse the repository at this point in the history
  • Loading branch information
mxsasha committed Aug 27, 2024
1 parent 4701fb2 commit e983c31
Show file tree
Hide file tree
Showing 2 changed files with 87 additions and 29 deletions.
104 changes: 84 additions & 20 deletions checks/tasks/tls/scans.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,10 @@
ServerScanResult,
TlsVersionEnum,
CipherSuite,
ServerTlsProbingResult,
ClientAuthRequirementEnum,
)
from sslyze.errors import ServerRejectedTlsHandshake, TlsHandshakeTimedOut
from sslyze.errors import ServerRejectedTlsHandshake, TlsHandshakeTimedOut, ConnectionToServerTimedOut
from sslyze.plugins.certificate_info._certificate_utils import (
parse_subject_alternative_name_extension,
get_common_names,
Expand Down Expand Up @@ -66,17 +68,29 @@
from internetnl import log

SSLYZE_SCAN_COMMANDS = {
ScanCommand.SSL_2_0_CIPHER_SUITES,
ScanCommand.SSL_3_0_CIPHER_SUITES,
ScanCommand.TLS_1_0_CIPHER_SUITES,
ScanCommand.TLS_1_1_CIPHER_SUITES,
ScanCommand.TLS_1_2_CIPHER_SUITES,
ScanCommand.TLS_1_3_CIPHER_SUITES,
ScanCommand.TLS_COMPRESSION,
ScanCommand.TLS_1_3_EARLY_DATA,
ScanCommand.SESSION_RENEGOTIATION,
ScanCommand.ELLIPTIC_CURVES,
}
SSLYZE_SCAN_COMMANDS_FOR_TLS = {
TlsVersionEnum.SSL_2_0: ScanCommand.SSL_2_0_CIPHER_SUITES,
TlsVersionEnum.SSL_3_0: ScanCommand.SSL_3_0_CIPHER_SUITES,
TlsVersionEnum.TLS_1_0: ScanCommand.TLS_1_0_CIPHER_SUITES,
TlsVersionEnum.TLS_1_1: ScanCommand.TLS_1_1_CIPHER_SUITES,
TlsVersionEnum.TLS_1_2: ScanCommand.TLS_1_2_CIPHER_SUITES,
TlsVersionEnum.TLS_1_3: ScanCommand.TLS_1_3_CIPHER_SUITES,
}
# Some code calls ServerConnectivityInfo.get_preconfigured_tls_connection
# before any other scans are done. This requires a ServerTLSProbingResult,
# which is never used due to explicit TLS version setting,
# so it's contents don't matter.
FAKE_SERVER_TLS_PROBING_RESULT = ServerTlsProbingResult(
highest_tls_version_supported=TlsVersionEnum.TLS_1_3,
cipher_suite_supported="",
client_auth_requirement=ClientAuthRequirementEnum.DISABLED,
supports_ecdh_key_exchange=True,
)

with open(settings.CA_FINGERPRINTS) as f:
root_fingerprints = f.read().splitlines()
Expand Down Expand Up @@ -462,26 +476,42 @@ def check_mail_tls_multiple(server_tuples, task) -> Dict[str, Dict[str, Any]]:
dane_cb_per_server = {}
results = {}
for server, dane_cb_data in server_tuples:
server_location = ServerNetworkLocation(hostname=server, port=25)
network_configuration = ServerNetworkConfiguration(
tls_server_name_indication=server,
tls_opportunistic_encryption=ProtocolWithOpportunisticTlsEnum.SMTP,
)
supported_tls_versions = check_supported_tls_versions(
ServerConnectivityInfo(
server_location=server_location,
network_configuration=network_configuration,
tls_probing_result=FAKE_SERVER_TLS_PROBING_RESULT,
)
)
scan_commands = SSLYZE_SCAN_COMMANDS | {
SSLYZE_SCAN_COMMANDS_FOR_TLS[tls_version] for tls_version in supported_tls_versions
}
log.info(f"==== precheck on {server_location} supports {supported_tls_versions} {scan_commands=}")

dane_cb_per_server[server] = dane_cb_data
scans.append(
ServerScanRequest(
server_location=ServerNetworkLocation(hostname=server, port=25),
network_configuration=ServerNetworkConfiguration(
tls_server_name_indication=server,
tls_opportunistic_encryption=ProtocolWithOpportunisticTlsEnum.SMTP,
),
scan_commands=SSLYZE_SCAN_COMMANDS,
server_location=server_location,
network_configuration=network_configuration,
scan_commands=scan_commands,
)
)
try:
for all_suites, result in run_sslyze(scans, connection_limit=1):
log.debug(
f"=========== sslyze complete for {result.server_location.hostname}, total set {dane_cb_per_server.keys()}"
f"=========== sslyze complete for {result.server_location.hostname}, "
f"total set {dane_cb_per_server.keys()}"
)
dane_cb_data = dane_cb_per_server[result.server_location.hostname]
results[result.server_location.hostname] = check_mail_tls(result, all_suites, dane_cb_data, task)
log.debug(
f"=========== check_mail_tls complete for {result.server_location.hostname}, total set {dane_cb_per_server.keys()}"
f"=========== check_mail_tls complete for {result.server_location.hostname}, "
f"total set {dane_cb_per_server.keys()}"
)
except TLSException as exc:
log.info(f"sslyze scan for mail failed: {exc}")
Expand Down Expand Up @@ -588,12 +618,27 @@ def check_web_tls(url, af_ip_pair=None, *args, **kwargs):
"""
Check the webserver's TLS configuration.
"""
server_location = ServerNetworkLocation(hostname=url, ip_address=af_ip_pair[1])
network_configuration = ServerNetworkConfiguration(
tls_server_name_indication=url, http_user_agent=settings.USER_AGENT
)
supported_tls_versions = check_supported_tls_versions(
ServerConnectivityInfo(
server_location=server_location,
network_configuration=network_configuration,
tls_probing_result=FAKE_SERVER_TLS_PROBING_RESULT,
)
)
scan_commands = (
SSLYZE_SCAN_COMMANDS
| {SSLYZE_SCAN_COMMANDS_FOR_TLS[tls_version] for tls_version in supported_tls_versions}
| {ScanCommand.CERTIFICATE_INFO}
)
log.info(f"==== precheck on {server_location} supports {supported_tls_versions} {scan_commands=}")
scan = ServerScanRequest(
server_location=ServerNetworkLocation(hostname=url, ip_address=af_ip_pair[1]),
network_configuration=ServerNetworkConfiguration(
tls_server_name_indication=url, http_user_agent=settings.USER_AGENT
),
scan_commands=SSLYZE_SCAN_COMMANDS | {ScanCommand.CERTIFICATE_INFO},
server_location=server_location,
network_configuration=network_configuration,
scan_commands=scan_commands,
scan_commands_extra_arguments=ScanCommandsExtraArguments(
certificate_info=CertificateInfoExtraArgument(custom_ca_file=Path(settings.CA_CERTIFICATES))
),
Expand Down Expand Up @@ -865,3 +910,22 @@ def _check_cipher_suite_available(tls_version: TlsVersionEnum, cipher_suite: Cip
return True
except ValueError:
return False


def check_supported_tls_versions(server_connectivity_info: ServerConnectivityInfo) -> List[TlsVersionEnum]:
supported_tls_versions = []
for tls_version in TlsVersionEnum:
requires_legacy_openssl = tls_version != TlsVersionEnum.TLS_1_3

ssl_connection = server_connectivity_info.get_preconfigured_tls_connection(
override_tls_version=tls_version, should_use_legacy_openssl=requires_legacy_openssl
)
try:
ssl_connection.connect()
supported_tls_versions.append(tls_version)
except (ServerRejectedTlsHandshake, TlsHandshakeTimedOut, ConnectionToServerTimedOut):
pass
finally:
ssl_connection.close()

return supported_tls_versions
12 changes: 3 additions & 9 deletions checks/tasks/tls/tasks_reports.py
Original file line number Diff line number Diff line change
Expand Up @@ -776,24 +776,18 @@ def do_mail_smtp_starttls(mailservers, url, task, *args, **kwargs):
# Pull in any cached results
cache_id = redis_id.mail_starttls.id.format(server)
results[server] = cache.get(cache_id, False)
log.debug(
f"=========== pulled {cache_id=} for {server=} data {results[server]}"
)
log.debug(f"=========== pulled {cache_id=} for {server=} data {results[server]}")
while timer() - start < cache_ttl and (not results or not all(results.values())):
servers_to_check = [
(server, dane_cb_data) for server, dane_cb_data, _ in mailservers if not results[server]
]
log.debug(
f"=========== checking remaining {servers_to_check=}"
)
log.debug(f"=========== checking remaining {servers_to_check=}")
results.update(check_mail_tls_multiple(servers_to_check, task))
time.sleep(1)
for server, server_result in results.items():
cache_id = redis_id.mail_starttls.id.format(server)
cache.set(cache_id, server_result, cache_ttl)
log.debug(
f"=========== writing to {cache_id=} for {server=}"
)
log.debug(f"=========== writing to {cache_id=} for {server=}")
if results[server] is False:
results[server] = dict(tls_enabled=False, could_not_test_smtp_starttls=True)
except SoftTimeLimitExceeded:
Expand Down

0 comments on commit e983c31

Please sign in to comment.