-
Notifications
You must be signed in to change notification settings - Fork 2
/
similar.py
135 lines (105 loc) · 3.84 KB
/
similar.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
#
# method
# - read in multiple JSON-L format SSP files
# - collate by controls
# - for each control:
# - construct spacy doc for each SSP's statement
# - compute similarity measure
# - build a graph showing which control statements are related
# assumptions:
# - all controls come from same catalog
# - only one instance of a control per SSP (this should
# be true, but it sometimes is not!
import json
import textwrap
from collections import defaultdict
import click
import spacy
from graph import Graph
def add_controls(controls, tag, ssp_path):
with open(ssp_path, "r") as ssp_file:
for line in ssp_file:
statement = json.loads(line)
control_key = statement["control"]
text = statement["text"]
controls[control_key].append((tag, text))
def similarity_controls(nlp, controls, threshold):
components = {}
for control_key in controls:
components[control_key] = similarity_by_statement(
nlp, controls[control_key], threshold
)
return components
def similarity_controls_sentences(nlp, controls, threshold):
components = {}
for control_key in controls:
components[control_key] = similarity_by_sentence(
nlp, controls[control_key], threshold
)
return components
def similarity_by_sentence(nlp, statements, threshold):
docs = [nlp(text) for _, text in statements]
tags = [tag for tag, _ in statements]
s_docs = []
s_tags = []
for tag, doc in zip(tags, docs):
for idx, sent in enumerate(doc.sents):
s_tag = f"{tag}_{idx}"
s_doc = nlp(sent.text)
s_tags.append(s_tag)
s_docs.append(s_doc)
return similarity(s_tags, s_docs, threshold)
def similarity_by_statement(nlp, statements, threshold):
docs = [nlp(text) for _, text in statements]
tags = [tag for tag, _ in statements]
return similarity(tags, docs, threshold)
def similarity(tags, docs, threshold):
docs_by_tag = {tag: doc for tag, doc in zip(tags, docs)}
matrix = [
[0.0 if doc1 == doc2 else doc1.similarity(doc2) for doc2 in docs]
for doc1 in docs
]
g = Graph()
for tag, doc in zip(tags, docs):
g.add_node(tag, doc)
for row_tag, row in zip(tags, matrix):
for col_tag, sim in zip(tags, row):
if sim >= threshold:
g.add_edge(row_tag, col_tag, sim)
return (g.components(), docs_by_tag)
def display(components):
for control in sorted(components):
print(f"Control {control}")
docs_by_tag = components[control][1]
for match in components[control][0]:
sorted_matches = sorted(match)
print(" [" + ", ".join(sorted_matches) + "]")
for tag in sorted_matches:
print(
f" {tag}:",
textwrap.shorten(
str(docs_by_tag[tag]), width=72, placeholder="..."
),
)
print()
@click.command()
@click.option("--ssp", type=(str, click.Path(exists=True)), multiple=True)
@click.option("--threshold", type=float, default=0.95)
@click.option("--by", type=click.Choice(["statement", "sentence"]), default="statement")
def main(ssp, by, threshold):
controls = defaultdict(list)
tags = set()
for tag, ssp_path in ssp:
if tag in tags:
raise click.ClickException("duplicate SSP tag {}".format(tag))
tags.add(tag)
add_controls(controls, tag, ssp_path)
nlp = spacy.load("en_core_web_lg")
if by == "statement":
print("# Similarity by control statement\n")
display(similarity_controls(nlp, controls, threshold))
elif by == "sentence":
print("\n# Similarity by sentence\n")
display(similarity_controls_sentences(nlp, controls, threshold))
if __name__ == "__main__":
main()