forked from microsoft/presidio-research
-
Notifications
You must be signed in to change notification settings - Fork 0
/
evaluator.py
321 lines (276 loc) · 11.6 KB
/
evaluator.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
from collections import Counter
from typing import List, Optional, Dict
import numpy as np
from tqdm import tqdm
from presidio_evaluator import InputSample
from presidio_evaluator.evaluation import EvaluationResult, ModelError
from presidio_evaluator.models import BaseModel, PresidioAnalyzerWrapper
class Evaluator:
def __init__(
self,
model: BaseModel,
verbose: bool = False,
compare_by_io=True,
entities_to_keep: Optional[List[str]] = None,
):
"""
Evaluate a PII detection model or a Presidio analyzer / recognizer
:param model: Instance of a fitted model (of base type BaseModel)
:param compare_by_io: True if comparison should be done on the entity
level and not the sub-entity level
:param entities_to_keep: List of entity names to focus the evaluator on (and ignore the rest).
Default is None = all entities. If the provided model has a list of entities to keep,
this list would be used for evaluation.
"""
self.model = model
self.verbose = verbose
self.compare_by_io = compare_by_io
self.entities_to_keep = entities_to_keep
if self.entities_to_keep is None and self.model.entities:
self.entities_to_keep = self.model.entities
def compare(self, input_sample: InputSample, prediction: List[str]):
"""
Compares ground truth tags (annotation) and predicted (prediction)
:param input_sample: input sample containing list of tags with scheme
:param prediction: predicted value for each token
self.labeling_scheme
"""
annotation = input_sample.tags
tokens = input_sample.tokens
if len(annotation) != len(prediction):
print(
"Annotation and prediction do not have the"
"same length. Sample={}".format(input_sample)
)
return Counter(), []
results = Counter()
mistakes = []
new_annotation = annotation.copy()
if self.compare_by_io:
new_annotation = self._to_io(new_annotation)
prediction = self._to_io(prediction)
# Ignore annotations that aren't in the list of
# requested entities.
if self.entities_to_keep:
prediction = self._adjust_per_entities(prediction)
new_annotation = self._adjust_per_entities(new_annotation)
for i in range(0, len(new_annotation)):
results[(new_annotation[i], prediction[i])] += 1
if self.verbose:
print("Annotation:", new_annotation[i])
print("Prediction:", prediction[i])
print(results)
# check if there was an error
is_error = new_annotation[i] != prediction[i]
if is_error:
if prediction[i] == "O":
mistakes.append(
ModelError(
"FN",
new_annotation[i],
prediction[i],
tokens[i],
input_sample.full_text,
input_sample.metadata,
)
)
elif new_annotation[i] == "O":
mistakes.append(
ModelError(
"FP",
new_annotation[i],
prediction[i],
tokens[i],
input_sample.full_text,
input_sample.metadata,
)
)
else:
mistakes.append(
ModelError(
"Wrong entity",
new_annotation[i],
prediction[i],
tokens[i],
input_sample.full_text,
input_sample.metadata,
)
)
return results, mistakes
def _adjust_per_entities(self, tags):
if self.entities_to_keep:
return [tag if tag in self.entities_to_keep else "O" for tag in tags]
@staticmethod
def _to_io(tags):
"""
Translates BILOU/BIO/IOB to IO - only In or Out of entity.
['B-PERSON','I-PERSON','L-PERSON'] is translated into
['PERSON','PERSON','PERSON']
:param tags: the input tags in BILOU/IOB/BIO format
:return: a new list of IO tags
"""
return [tag[2:] if "-" in tag else tag for tag in tags]
def evaluate_sample(
self, sample: InputSample, prediction: List[str]
) -> EvaluationResult:
if self.verbose:
print("Input sentence: {}".format(sample.full_text))
results, mistakes = self.compare(input_sample=sample, prediction=prediction)
return EvaluationResult(results, mistakes, sample.full_text)
def evaluate_all(self, dataset: List[InputSample]) -> List[EvaluationResult]:
evaluation_results = []
for sample in tqdm(dataset, desc="Evaluating {}".format(self.__class__)):
prediction = self.model.predict(sample)
evaluation_result = self.evaluate_sample(
sample=sample, prediction=prediction
)
evaluation_results.append(evaluation_result)
return evaluation_results
@staticmethod
def align_entity_types(
input_samples: List[InputSample],
entities_mapping: Dict[
str, str
] = None,
allow_missing_mappings: bool = False
) -> List[InputSample]:
"""
Change input samples to conform with Presidio's entities
:return: new list of InputSample
"""
new_input_samples = input_samples.copy()
# A list that will contain updated input samples,
new_list = []
for input_sample in new_input_samples:
contains_field_in_mapping = False
new_spans = []
# Update spans to match the entity types in the values of entities_mapping
for span in input_sample.spans:
if span.entity_type in entities_mapping.keys():
new_name = entities_mapping.get(span.entity_type)
span.entity_type = new_name
contains_field_in_mapping = True
new_spans.append(span)
else:
if not allow_missing_mappings:
raise ValueError(f"Key {span.entity_type} cannot be found in the provided entities_mapping")
input_sample.spans = new_spans
# Update tags in case this sample has relevant entities for evaluation
if contains_field_in_mapping:
for i, tag in enumerate(input_sample.tags):
has_prefix = "-" in tag
if has_prefix:
prefix = tag[:2]
clean = tag[2:]
else:
prefix = ""
clean = tag
if clean in entities_mapping.keys():
new_name = entities_mapping.get(clean)
input_sample.tags[i] = "{}{}".format(prefix, new_name)
else:
input_sample.tags[i] = "O"
new_list.append(input_sample)
return new_list
# Iterate on all samples
def calculate_score(
self,
evaluation_results: List[EvaluationResult],
entities: Optional[List[str]] = None,
beta: float = 2.5,
) -> EvaluationResult:
"""
Returns the pii_precision, pii_recall and f_measure either for each entity
or for all entities (ignore_entity_type = True)
:param evaluation_results: List of EvaluationResult
:param entities: List of entities to calculate score to. Default is None: all entities
:param beta: F measure beta value
between different entity types, or to treat these as misclassifications
:return: EvaluationResult with precision, recall and f measures
"""
# aggregate results
all_results = sum([er.results for er in evaluation_results], Counter())
# compute pii_recall per entity
entity_recall = {}
entity_precision = {}
if not entities:
entities = list(set([x[0] for x in all_results.keys() if x[0] != "O"]))
for entity in entities:
# all annotation of given type
annotated = sum([all_results[x] for x in all_results if x[0] == entity])
predicted = sum([all_results[x] for x in all_results if x[1] == entity])
tp = all_results[(entity, entity)]
if annotated > 0:
entity_recall[entity] = tp / annotated
else:
entity_recall[entity] = np.NaN
if predicted > 0:
per_entity_tp = all_results[(entity, entity)]
entity_precision[entity] = per_entity_tp / predicted
else:
entity_precision[entity] = np.NaN
# compute pii_precision and pii_recall
annotated_all = sum([all_results[x] for x in all_results if x[0] != "O"])
predicted_all = sum([all_results[x] for x in all_results if x[1] != "O"])
if annotated_all > 0:
pii_recall = (
sum(
[
all_results[x]
for x in all_results
if (x[0] != "O" and x[1] != "O")
]
)
/ annotated_all
)
else:
pii_recall = np.NaN
if predicted_all > 0:
pii_precision = (
sum(
[
all_results[x]
for x in all_results
if (x[0] != "O" and x[1] != "O")
]
)
/ predicted_all
)
else:
pii_precision = np.NaN
# compute pii_f_beta-score
pii_f_beta = self.f_beta(pii_precision, pii_recall, beta)
# aggregate errors
errors = []
for res in evaluation_results:
if res.model_errors:
errors.extend(res.model_errors)
evaluation_result = EvaluationResult(results=all_results, model_errors=errors)
evaluation_result.pii_precision = pii_precision
evaluation_result.pii_recall = pii_recall
evaluation_result.entity_recall_dict = entity_recall
evaluation_result.entity_precision_dict = entity_precision
evaluation_result.pii_f = pii_f_beta
return evaluation_result
@staticmethod
def precision(tp: int, fp: int) -> float:
return tp / (tp + fp + 1e-100)
@staticmethod
def recall(tp: int, fn: int) -> float:
return tp / (tp + fn + 1e-100)
@staticmethod
def f_beta(precision: float, recall: float, beta: float) -> float:
"""
Returns the F score for precision, recall and a beta parameter
:param precision: a float with the precision value
:param recall: a float with the recall value
:param beta: a float with the beta parameter of the F measure,
which gives more or less weight to precision
vs. recall
:return: a float value of the f(beta) measure.
"""
if np.isnan(precision) or np.isnan(recall) or (precision == 0 and recall == 0):
return np.nan
return ((1 + beta ** 2) * precision * recall) / (
((beta ** 2) * precision) + recall
)