From c6e55d43200db2e89e6d264a454dcc801cc70d77 Mon Sep 17 00:00:00 2001 From: rwxd Date: Wed, 27 Nov 2024 13:09:02 +0100 Subject: [PATCH] feat(records): allow records with regex --- README.md | 16 +++++ powerdns_api_proxy/config.py | 10 ++- powerdns_api_proxy/models.py | 6 +- powerdns_api_proxy/utils.py | 5 ++ tests/unit/config_test.py | 125 +++++++++++++++++++++++++++++++++-- 5 files changed, 154 insertions(+), 8 deletions(-) diff --git a/README.md b/README.md index 2015b78..d4fef2b 100644 --- a/README.md +++ b/README.md @@ -104,6 +104,22 @@ environments: - "test.example.com" ``` +###### Regex + +Additionally to the `records` list a `regex_records` list can be defined. +In this list regex can be to define, which records are allowed. + +```yaml +... +environments: + - name: "Test1" + ... + zones: + - name: "example.com" + regex_records: + - "_acme-challenge.service-.*.example.com" +``` + ##### Services Under a `zone` `services` can be defined. diff --git a/powerdns_api_proxy/config.py b/powerdns_api_proxy/config.py index 54969ce..a296415 100644 --- a/powerdns_api_proxy/config.py +++ b/powerdns_api_proxy/config.py @@ -19,7 +19,7 @@ RRSETRequest, ZoneNotAllowedException, ) -from powerdns_api_proxy.utils import check_zones_equal +from powerdns_api_proxy.utils import check_record_in_regex, check_zones_equal @lru_cache(maxsize=1) @@ -147,10 +147,18 @@ def check_rrset_allowed(zone: ProxyConfigZone, rrset: RRSET) -> bool: if zone.all_records: return True + if not rrset['name'].rstrip('.').endswith(zone.name.rstrip('.')): + logger.debug('RRSET not allowed, because zone does not match') + return False + for record in zone.records: if check_zones_equal(rrset['name'], record): return True + for regex in zone.regex_records: + if check_record_in_regex(rrset['name'], regex): + return True + if check_acme_record_allowed(zone, rrset): return True diff --git a/powerdns_api_proxy/models.py b/powerdns_api_proxy/models.py index efe6425..99388a2 100644 --- a/powerdns_api_proxy/models.py +++ b/powerdns_api_proxy/models.py @@ -19,7 +19,10 @@ class ProxyConfigServices(BaseModel): class ProxyConfigZone(BaseModel): ''' `name` is the zone name. + `description` is a description of the zone. `regex` should be set to `True` if `name` is a regex. + `records` is a list of record names that are allowed. + `regex_records` is a list of record regexes that are allowed. `admin` enabled creating and deleting the zone. `subzones` sets the same permissions on all subzones. `all_records` will be set to `True` if no `records` are defined. @@ -30,6 +33,7 @@ class ProxyConfigZone(BaseModel): regex: bool = False description: str = '' records: list[str] = [] + regex_records: list[str] = [] services: ProxyConfigServices = ProxyConfigServices(acme=False) admin: bool = False subzones: bool = False @@ -38,7 +42,7 @@ class ProxyConfigZone(BaseModel): def __init__(self, **data): super().__init__(**data) - if len(self.records) == 0: + if len(self.records) == 0 and len(self.regex_records) == 0: logger.debug( f'Setting all_records to True for zone {self.name}, because no records are defined' ) diff --git a/powerdns_api_proxy/utils.py b/powerdns_api_proxy/utils.py index 17ca783..8a2a261 100644 --- a/powerdns_api_proxy/utils.py +++ b/powerdns_api_proxy/utils.py @@ -26,6 +26,11 @@ def check_zone_in_regex(zone: str, regex: str) -> bool: return re.match(regex, zone.rstrip('.')) is not None +def check_record_in_regex(record: str, regex: str) -> bool: + '''Checks if record is in regex''' + return re.match(regex, record.rstrip('.')) is not None + + def check_zones_equal(zone1: str, zone2: str) -> bool: '''Checks if zones equal with or without trailing dot''' return zone1.rstrip('.') == zone2.rstrip('.') diff --git a/tests/unit/config_test.py b/tests/unit/config_test.py index 5c22d3a..593dbdc 100644 --- a/tests/unit/config_test.py +++ b/tests/unit/config_test.py @@ -281,9 +281,9 @@ def test_check_rrset_not_allowed_single_entries(): ], ) for item in [ - 'entry1.test-zone.example.com.', - 'entry2.entry1.test-zone.example.com', - 'test-zone.example.com.', + 'entry100.test-zone.example.com.', + 'entry200.entry1.test-zone.example.com', + 'test-record.example.com.', ]: rrset: RRSET = { 'name': item, @@ -293,7 +293,7 @@ def test_check_rrset_not_allowed_single_entries(): 'records': [], 'comments': [], } - assert check_rrset_allowed(zone, rrset) + assert not check_rrset_allowed(zone, rrset) def test_check_rrsets_request_allowed_no_raise(): @@ -348,8 +348,8 @@ def test_check_rrsets_request_allowed_raise(): ) with pytest.raises(HTTPException) as err: ensure_rrsets_request_allowed(zone, request) - assert err.value.status_code == 403 - assert err.value.detail == 'RRSET entry1.test-zone.example.com. not allowed' + assert err.value.status_code == 403 + assert err.value.detail == 'RRSET entry1.test-zone.example.com. not allowed' def test_check_rrsets_request_not_allowed_read_only(): @@ -378,6 +378,119 @@ def test_check_rrsets_request_not_allowed_read_only(): assert err.value.detail == 'RRSET update not allowed with read only token' +def test_rrset_request_not_allowed_regex_empty(): + zone = ProxyConfigZone( + name='test-zone.example.com.', + regex_records=[], + ) + request: RRSETRequest = {'rrsets': []} + assert ensure_rrsets_request_allowed(zone, request) + + +def test_rrset_request_allowed_all_regex(): + zone = ProxyConfigZone( + name='test-zone.example.com.', + regex_records=[ + '.*', + ], + ) + request: RRSETRequest = {'rrsets': []} + for item in [ + 'entry1.test-zone.example.com.', + 'entry2.entry1.test-zone.example.com', + ]: + request['rrsets'].append( + { + 'name': item, + 'type': 'TXT', + 'changetype': 'REPLACE', + 'ttl': 3600, + 'records': [], + 'comments': [], + } + ) + assert ensure_rrsets_request_allowed(zone, request) + + +def test_rrset_request_allowed_acme_regex(): + zone = ProxyConfigZone( + name='test-zone.example.com.', + regex_records=[ + '_acme-challenge.example.*.test-zone.example.com', + ], + ) + request: RRSETRequest = {'rrsets': []} + for item in [ + '_acme-challenge.example-entry.test-zone.example.com.', + ]: + request['rrsets'].append( + { + 'name': item, + 'type': 'TXT', + 'changetype': 'REPLACE', + 'ttl': 3600, + 'records': [], + 'comments': [], + } + ) + assert ensure_rrsets_request_allowed(zone, request) + + +def test_rrset_request_not_allowed_false_regex(): + zone = ProxyConfigZone( + name='test-zone.example.com.', + regex_records=[ + 'example.*.test-zone.example.com', + ], + ) + request: RRSETRequest = {'rrsets': []} + for item in [ + 'entry1.test-zone.example.com.', + 'entry2.entry1.test-zone.example.com', + ]: + request['rrsets'].append( + { + 'name': item, + 'type': 'TXT', + 'changetype': 'REPLACE', + 'ttl': 3600, + 'records': [], + 'comments': [], + } + ) + with pytest.raises(HTTPException) as err: + ensure_rrsets_request_allowed(zone, request) + assert err.value.status_code == 403 + assert err.value.detail == 'RRSET entry1.test-zone.example.com. not allowed' + + +def test_rrset_request_not_allowed_false_zone(): + zone = ProxyConfigZone( + name='test-zone.example.com.', + regex_records=[ + 'example.*.test-zone2.example.com', + ], + ) + request: RRSETRequest = {'rrsets': []} + for item in [ + 'example1.test-zone2.example.com.', + ]: + request['rrsets'].append( + { + 'name': item, + 'type': 'TXT', + 'changetype': 'REPLACE', + 'ttl': 3600, + 'records': [], + 'comments': [], + } + ) + with pytest.raises(HTTPException) as err: + ensure_rrsets_request_allowed(zone, request) + assert err.value.status_code == 403 + assert err.value.detail == 'RRSET example1.test-zone2.example.com. not allowed' + + def test_check_acme_record_allowed_all_records(): zone = ProxyConfigZone(name='test-zone.example.com', all_records=True) rrset = RRSET(