forked from microsoft/presidio-research
-
Notifications
You must be signed in to change notification settings - Fork 0
/
crf_model.py
104 lines (87 loc) · 3.1 KB
/
crf_model.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
import pickle
from typing import List
from presidio_evaluator import InputSample
from presidio_evaluator.models import BaseModel
class CRFModel(BaseModel):
def __init__(
self,
model_pickle_path: str = "../models/crf.pickle",
entities_to_keep: List[str] = None,
verbose: bool = False,
):
super().__init__(
entities_to_keep=entities_to_keep,
verbose=verbose,
)
if model_pickle_path is None:
raise ValueError("model_pickle_path must be supplied")
with open(model_pickle_path, "rb") as f:
self.model = pickle.load(f)
def predict(self, sample: InputSample) -> List[str]:
tags = CRFModel.crf_predict(sample, self.model)
if self.entities:
tags = [tag for tag in tags if tag in self.entities]
if len(tags) != len(sample.tokens):
print("mismatch between previous tokens and new tokens")
# translated_tags = sample.rename_from_spacy_tags(tags)
return tags
@staticmethod
def crf_predict(sample, model):
sample.translate_input_sample_tags()
conll = sample.to_conll(translate_tags=True)
sentence = [(di["text"], di["pos"], di["label"]) for di in conll]
features = CRFModel.sent2features(sentence)
return model.predict([features])[0]
@staticmethod
def word2features(sent, i):
word = sent[i][0]
postag = sent[i][1]
features = {
"bias": 1.0,
"word.lower()": word.lower(),
"word[-3:]": word[-3:],
"word[-2:]": word[-2:],
"word.isupper()": word.isupper(),
"word.istitle()": word.istitle(),
"word.isdigit()": word.isdigit(),
"postag": postag,
"postag[:2]": postag[:2],
}
if i > 0:
word1 = sent[i - 1][0]
postag1 = sent[i - 1][1]
features.update(
{
"-1:word.lower()": word1.lower(),
"-1:word.istitle()": word1.istitle(),
"-1:word.isupper()": word1.isupper(),
"-1:postag": postag1,
"-1:postag[:2]": postag1[:2],
}
)
else:
features["BOS"] = True
if i < len(sent) - 1:
word1 = sent[i + 1][0]
postag1 = sent[i + 1][1]
features.update(
{
"+1:word.lower()": word1.lower(),
"+1:word.istitle()": word1.istitle(),
"+1:word.isupper()": word1.isupper(),
"+1:postag": postag1,
"+1:postag[:2]": postag1[:2],
}
)
else:
features["EOS"] = True
return features
@staticmethod
def sent2features(sent):
return [CRFModel.word2features(sent, i) for i in range(len(sent))]
@staticmethod
def sent2labels(sent):
return [label for token, postag, label in sent]
@staticmethod
def sent2tokens(sent):
return [token for token, postag, label in sent]