-
Notifications
You must be signed in to change notification settings - Fork 1
/
dcql.py
186 lines (165 loc) · 8.51 KB
/
dcql.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
_required_in_credential = {
"id": None,
"format": None
}
def match_credental(credential, credential_store):
matched_credentials = []
for _item in _required_in_credential:
if _item not in credential:
raise RuntimeError(f"{_item} missing from credential")
_required_in_credential[_item] = credential.get(_item)
# check for optional params
meta = credential.get("meta", None)
claims = credential.get("claims", None)
claim_sets = credential.get("claim_sets", None)
if claims is None and claim_sets is not None:
raise RuntimeError("claim_set without claims")
# Filter by format
candidates = credential_store.get(_required_in_credential["format"], None)
# Bail if there are no credentials matching this format
if candidates is None:
return matched_credentials
# Filter by meta
if meta is not None:
# meta is intpreted on a per-format basis
if _required_in_credential["format"] == "mso_mdoc":
# Check for a doctype
doctype_value = meta.get("doctype_value", None)
if doctype_value is not None:
# Filter by doctype
candidates = candidates.get(doctype_value, None)
else:
# Unknown format. Can't parse meta, so bail
return matched_credentials
# Bail if there are no credentials after filtering on meta
if candidates is None:
return matched_credentials
# Match on the claims
if claims is None:
# Match every candidate
for candidate in candidates:
matched_credential = {}
matched_credential["id"] = candidate["id"]
matched_claim_names = []
# Matching logic is peformed on a per-format basis. This is a bit annoying
if _required_in_credential["format"] == "mso_mdoc":
for namespace, namespace_val in candidate["namespaces"].items():
for attr, attr_val in namespace_val.items():
matched_claim_names.append(attr_val["display"])
matched_credential["matched_claim_names"] = matched_claim_names
matched_credentials.append(matched_credential)
else:
if claim_sets is None:
# Match candidates that contain all claims requested
for candidate in candidates:
matched_credential = {}
matched_credential["id"] = candidate["id"]
matched_claim_names = []
for claim in claims:
claim_values = claim.get("values", None)
# Matching logic is peformed on a per-format basis. This is a bit annoying
if _required_in_credential["format"] == "mso_mdoc":
namespace = claim.get("namespace", None)
claim_name = claim.get("claim_name", None)
if namespace is None:
raise RuntimeError("namespace missing from an mso-mdoc claim")
if claim_name is None:
raise RuntimeError("claim_name missing from an mso-mdoc claim")
if namespace in candidate["namespaces"]:
if claim_name in candidate["namespaces"][namespace]:
if claim_values is not None:
for v in claim_values:
if v == candidate["namespaces"][namespace][claim_name]["value"]:
matched_claim_names.append(candidate["namespaces"][namespace][claim_name]["display"])
break
else:
matched_claim_names.append(candidate["namespaces"][namespace][claim_name]["display"])
matched_credential["matched_claim_names"] = matched_claim_names
if len(matched_claim_names) == len(claims):
matched_credentials.append(matched_credential)
else:
# Match the claim sets
for candidate in candidates:
matched_credential = {}
matched_credential["id"] = candidate["id"]
matched_claim_ids = {}
for claim in claims:
claim_values = claim.get("values", None)
claim_id = claim.get("id", None)
if claim_id is None:
raise RuntimeError(
"claim is missing ID when claim_set is present"
)
# Matching logic is peformed on a per-format basis. This is a bit annoying
if _required_in_credential["format"] == "mso_mdoc":
namespace = claim.get("namespace", None)
claim_name = claim.get("claim_name", None)
if namespace is None:
raise RuntimeError("namespace missing from an mso-mdoc claim")
if claim_name is None:
raise RuntimeError("claim_name missing from an mso-mdoc claim")
if namespace in candidate["namespaces"]:
if claim_name in candidate["namespaces"][namespace]:
if claim_values is not None:
for v in claim_values:
if v == candidate["namespaces"][namespace][claim_name]["value"]:
matched_claim_ids[claim_id] = candidate["namespaces"][namespace][claim_name]["display"]
break
else:
matched_claim_ids[claim_id] = candidate["namespaces"][namespace][claim_name]["display"]
for claim_set in claim_sets:
matched_claim_names = []
for c in claim_set:
if c in matched_claim_ids:
matched_claim_names.append(matched_claim_ids[c])
if len(matched_claim_names) == len(claim_set):
matched_credential["matched_claim_names"] = matched_claim_names
matched_credentials.append(matched_credential)
break
return matched_credentials
def dcql_query(query, credential_store):
matched_credentials = {}
candidate_matched_credentials = {}
credentials = query.get("credentials", None)
credential_sets = query.get("credential_sets", None)
if credentials is None:
raise RuntimeError("credentials missing")
for credential in credentials:
_required_in_credential["id"] = credential.get("id", None)
if _required_in_credential["id"] is None:
raise RuntimeError("id missing from credential")
matched = match_credental(credential, credential_store)
if len(matched) > 0:
candidate_matched_credentials[
_required_in_credential["id"]
] = matched
if credential_sets is None:
if len(credentials) == len(candidate_matched_credentials.keys()):
matched_credentials = candidate_matched_credentials
else:
matched_credential_set = {}
matched_credential_set_optional = {}
required_count = 0
for credential_set in credential_sets:
options = credential_set.get("options", None)
if options is None:
raise RuntimeError("options missing from credential_set")
required = credential_set.get("required", True)
if required:
required_count = required_count + 1
for option in options:
matched_options = []
for cred in option:
if cred in candidate_matched_credentials:
matched_options.append(cred)
if len(matched_options) == len(option):
# This option matched
for m in matched_options:
if required:
matched_credential_set[m] = candidate_matched_credentials[m]
else:
matched_credential_set_optional[m] = candidate_matched_credentials[m]
break
if required_count == len(matched_credential_set.keys()):
matched_credentials = matched_credential_set | matched_credential_set_optional
return matched_credentials