Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support validating LWCA certmonger requests #308

Merged
merged 3 commits into from
Nov 7, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions src/ipahealthcheck/core/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import sys
import traceback
import warnings
from ipalib import api

from datetime import datetime, timezone

Expand Down Expand Up @@ -464,6 +465,7 @@ def run_healthcheck(self):
return 1

results = exclude_keys(config, results)
api.Backend.ldap2.disconnect()

try:
output.render(results)
Expand Down
68 changes: 59 additions & 9 deletions src/ipahealthcheck/ipa/certs.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@
logger = logging.getLogger()
DAY = 60 * 60 * 24

expected_requests = [] # to cache the expected certmonger requests


def is_ipa_issued_cert(myapi, cert):
"""Thin wrapper around certs.is_ipa_issued to test for LDAP"""
Expand All @@ -44,7 +46,7 @@ def is_ipa_issued_cert(myapi, cert):
return certs.is_ipa_issued_cert(myapi, cert)


def get_expected_requests(ca, ds, serverid):
def get_expected_requests(ca, ds, serverid, conn):
"""Provide the expected certmonger tracking request data

This list is based in part on certificate_renewal_update() in
Expand All @@ -54,10 +56,15 @@ def get_expected_requests(ca, ds, serverid):
The list is filtered depending on whether a CA is running
and the certificates have been issued by IPA.

:param ca: the CAInstance
:param ds: the DSInstance
:param serverid: the DS serverid name
:param ca: the CAInstance
:param ds: the DSInstance
:param serverid: the DS serverid name
"""
global expected_requests

if expected_requests:
return expected_requests

template = paths.CERTMONGER_COMMAND_TEMPLATE

if api.Command.ca_is_enabled()['result']:
Expand Down Expand Up @@ -100,6 +107,7 @@ def get_expected_requests(ca, ds, serverid):
if token and token != 'internal':
req['key-token'] = token
requests.append(req)
requests.extend(get_lwca_requests(conn))
else:
logger.debug('CA is not configured, skipping CA tracking')

Expand Down Expand Up @@ -174,9 +182,49 @@ def get_expected_requests(ca, ds, serverid):
}
)

expected_requests = requests

return requests


def get_lwca_requests(conn):
"""
Retrieve the known LWCA certificates.
"""
lwca_requests = []
try:
entries, truncated = conn.find_entries(
base_dn=DN(api.env.container_ca, api.env.basedn),
filter="(&(objectclass=ipaca)(!(cn=ipa)))",
)
except (errors.EmptyResult, errors.NotFound) as e:
logger.debug("Search for LWCA entries result: %s", e)
return lwca_requests

template = paths.CERTMONGER_COMMAND_TEMPLATE
for entry in entries:
try:
id = entry.get('ipacaid')[0]
except Exception as e:
logger.debug("Malformed LDAP entry, unable to get LWCA id %s: %s",
entry.get('dn'), e)
continue
nickname = f'caSigningCert cert-pki-ca {id}'

req = {
'cert-database': paths.PKI_TOMCAT_ALIAS_DIR,
'cert-nickname': nickname,
'ca-name': RENEWAL_CA_NAME,
'cert-presave-command': template % 'stop_pkicad',
'cert-postsave-command':
(template % 'renew_ca_cert "{}"'.format(nickname)),
'template-profile': 'caCACert',
}
lwca_requests.append(req)

return lwca_requests


def expected_token(token_name, certmonger_token):
"""The value is stored in two places, do some sanity checking"""
if token_name != certmonger_token:
Expand Down Expand Up @@ -426,7 +474,8 @@ class IPACertTracking(IPAPlugin):

@duration
def check(self):
requests = get_expected_requests(self.ca, self.ds, self.serverid)
requests = get_expected_requests(self.ca, self.ds, self.serverid,
self.conn)
cm = certmonger._certmonger()

ids = []
Expand Down Expand Up @@ -495,7 +544,8 @@ class IPACertDNSSAN(IPAPlugin):
@duration
def check(self):
fqdn = socket.getfqdn()
requests = get_expected_requests(self.ca, self.ds, self.serverid)
requests = get_expected_requests(self.ca, self.ds, self.serverid,
self.conn)

for request in requests:
request_id = certmonger.get_request_id(request)
Expand Down Expand Up @@ -1225,7 +1275,8 @@ def check(self):
if not self.ca.is_configured():
logger.debug('CA is not configured, skipping revocation check')
return
requests = get_expected_requests(self.ca, self.ds, self.serverid)
requests = get_expected_requests(self.ca, self.ds, self.serverid,
self.conn)
pwd_file = get_token_password_file(self.ca.hsm_enabled,
self.ca.token_name)
for request in requests:
Expand Down Expand Up @@ -1297,8 +1348,7 @@ def check(self):

# Now we have the cert either way, check the recovation
try:
result = api.Command.cert_show(cert.serial_number,
all=True)
result = api.Command.cert_show(cert.serial_number)
except Exception as e:
yield Result(self, constants.ERROR,
key=id,
Expand Down
18 changes: 16 additions & 2 deletions tests/test_commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,21 +40,35 @@ def _make_facts(configured=None):
return _make_facts


@pytest.mark.xfail(
reason="https://github.com/freeipa/freeipa-healthcheck/issues/309"
)
def test_ipa_notinstalled(python_ipalib_dir, monkeypatch):
"""
Test ipa-healthcheck handles the missing IPA stuff
"""
monkeypatch.setenv("PYTHONPATH", python_ipalib_dir(configured=None))
output = run(["ipa-healthcheck"], raiseonerr=False, env=os.environ)
assert output.returncode == 1
assert "IPA server is not installed" in output.raw_output.decode("utf-8")
assert "IPA server is not installed" in output.raw_output.decode(
"utf-8"
) or "IPA server is not installed" in output.raw_error_output.decode(
"utf-8"
)


@pytest.mark.xfail(
reason="https://github.com/freeipa/freeipa-healthcheck/issues/309"
)
def test_ipa_unconfigured(python_ipalib_dir, monkeypatch):
"""
Test ipa-healthcheck handles the unconfigured IPA server
"""
monkeypatch.setenv("PYTHONPATH", python_ipalib_dir(configured=False))
output = run(["ipa-healthcheck"], raiseonerr=False, env=os.environ)
assert output.returncode == 1
assert "IPA server is not configured" in output.raw_output.decode("utf-8")
assert "IPA server is not configured" in output.raw_output.decode(
"utf-8"
) or "IPA server is not configured" in output.raw_error_output.decode(
"utf-8"
)