diff --git a/presidio-analyzer/presidio_analyzer/__init__.py b/presidio-analyzer/presidio_analyzer/__init__.py index eb5050d9d..68d456761 100644 --- a/presidio-analyzer/presidio_analyzer/__init__.py +++ b/presidio-analyzer/presidio_analyzer/__init__.py @@ -3,12 +3,14 @@ import logging from presidio_analyzer.pattern import Pattern +from presidio_analyzer.improvable_pattern import ImprovablePattern from presidio_analyzer.analysis_explanation import AnalysisExplanation from presidio_analyzer.recognizer_result import RecognizerResult from presidio_analyzer.dict_analyzer_result import DictAnalyzerResult from presidio_analyzer.entity_recognizer import EntityRecognizer from presidio_analyzer.local_recognizer import LocalRecognizer from presidio_analyzer.pattern_recognizer import PatternRecognizer +from presidio_analyzer.improvable_pattern_recognizer import ImprovablePatternRecognizer from presidio_analyzer.remote_recognizer import RemoteRecognizer from presidio_analyzer.recognizer_registry import RecognizerRegistry from presidio_analyzer.analyzer_engine import AnalyzerEngine @@ -17,7 +19,6 @@ from presidio_analyzer.context_aware_enhancers import ContextAwareEnhancer from presidio_analyzer.context_aware_enhancers import LemmaContextAwareEnhancer - # Define default loggers behavior # 1. presidio_analyzer logger @@ -36,6 +37,7 @@ decision_process_logger.setLevel("INFO") __all__ = [ "Pattern", + "ImprovablePattern", "AnalysisExplanation", "RecognizerResult", "DictAnalyzerResult", @@ -49,4 +51,5 @@ "ContextAwareEnhancer", "LemmaContextAwareEnhancer", "BatchAnalyzerEngine", + "ImprovablePatternRecognizer", ] diff --git a/presidio-analyzer/presidio_analyzer/improvable_pattern.py b/presidio-analyzer/presidio_analyzer/improvable_pattern.py new file mode 100644 index 000000000..45599829e --- /dev/null +++ b/presidio-analyzer/presidio_analyzer/improvable_pattern.py @@ -0,0 +1,37 @@ +from presidio_analyzer import Pattern +from typing import Optional, Callable +from regex import Match +from .analysis_explanation import AnalysisExplanation + + +class ImprovablePattern(Pattern): + """ + A class that represents a regex pattern. + + :param name: the name of the pattern + :param regex: the regex pattern to detect + :param score: the pattern's strength (values varies 0-1) + :param get_improved_pattern_func: a function that improve the score of the analysis explanation + based on the regex match info. + Can be used when is needed to improve the score based on detected groups in the regex match + or any logic applied only to this pattern. + """ + + def __init__( + self, + name: str, + regex: str, + score: float, + improve_score_fn: Optional[ + Callable[["ImprovablePattern", str, Match, AnalysisExplanation], None] + ] = None, + ) -> None: + + super().__init__(name, regex, score) + self.improve_score_fn = improve_score_fn + + def improve_score( + self, matched_text: str, match: Match, analysis_explanation: AnalysisExplanation + ) -> None: + if self.improve_score_fn: + self.improve_score_fn(self, matched_text, match, analysis_explanation) diff --git a/presidio-analyzer/presidio_analyzer/improvable_pattern_recognizer.py b/presidio-analyzer/presidio_analyzer/improvable_pattern_recognizer.py new file mode 100644 index 000000000..57ca2b2e9 --- /dev/null +++ b/presidio-analyzer/presidio_analyzer/improvable_pattern_recognizer.py @@ -0,0 +1,248 @@ +import datetime +import logging +from typing import List, Dict +from regex import Match + +import regex as re + +from presidio_analyzer import ( + LocalRecognizer, + Pattern, + RecognizerResult, + EntityRecognizer, + AnalysisExplanation, + ImprovablePattern, +) + +from presidio_analyzer.nlp_engine import NlpArtifacts + +logger = logging.getLogger("presidio-analyzer") + + +class ImprovablePatternRecognizer(LocalRecognizer): + """ + PII entity recognizer using regular expressions or deny-lists. + Analysis explanations can be improved by a pattern or by the recognizer. + + :param patterns: A list of patterns to detect + :param deny_list: A list of words to detect, + in case our recognizer uses a predefined list of words (deny list) + :param context: list of context words + :param deny_list_score: confidence score for a term + identified using a deny-list + """ + + def __init__( + self, + supported_entity: str, + name: str = None, + supported_language: str = "en", + patterns: List[ImprovablePattern] = None, + deny_list: List[str] = None, + context: List[str] = None, + deny_list_score: float = 1.0, + version: str = "0.0.1", + ): + + if not supported_entity: + raise ValueError("Pattern recognizer should be initialized with entity") + + if not patterns and not deny_list: + raise ValueError( + "Pattern recognizer should be initialized with patterns" + " or with deny list" + ) + + super().__init__( + supported_entities=[supported_entity], + supported_language=supported_language, + name=name, + version=version, + ) + if patterns is None: + self.patterns = [] + else: + self.patterns = patterns + self.context = context + self.deny_list_score = deny_list_score + + if deny_list: + deny_list_pattern = self._deny_list_to_regex(deny_list) + self.patterns.append(deny_list_pattern) + self.deny_list = deny_list + else: + self.deny_list = [] + + def load(self): # noqa D102 + pass + + def analyze( + self, + text: str, + entities: List[str], + nlp_artifacts: NlpArtifacts = None, + regex_flags: int = None, + ) -> List[RecognizerResult]: + """ + Analyzes text to detect PII using regular expressions or deny-lists. + + :param text: Text to be analyzed + :param entities: Entities this recognizer can detect + :param nlp_artifacts: Output values from the NLP engine + :param regex_flags: + :return: + """ + results = [] + + if self.patterns: + pattern_result = self.__analyze_patterns(text, regex_flags) + results.extend(pattern_result) + + return results + + def _deny_list_to_regex(self, deny_list: List[str]) -> Pattern: + """ + Convert a list of words to a matching regex. + + To be analyzed by the analyze method as any other regex patterns. + + :param deny_list: the list of words to detect + :return:the regex of the words for detection + """ + # Escape deny list elements as preparation for regex + escaped_deny_list = [re.escape(element) for element in deny_list] + regex = r"(?:^|(?<=\W))(" + "|".join(escaped_deny_list) + r")(?:(?=\W)|$)" + return Pattern(name="deny_list", regex=regex, score=self.deny_list_score) + + @staticmethod + def build_regex_explanation( + recognizer_name: str, + pattern_name: str, + pattern: str, + original_score: float, + validation_result: bool, + ) -> AnalysisExplanation: + """ + Construct an explanation for why this entity was detected. + + :param recognizer_name: Name of recognizer detecting the entity + :param pattern_name: Regex pattern name which detected the entity + :param pattern: Regex pattern logic + :param original_score: Score given by the recognizer + :param validation_result: Whether validation was used and its result + :return: Analysis explanation + """ + explanation = AnalysisExplanation( + recognizer=recognizer_name, + original_score=original_score, + pattern_name=pattern_name, + pattern=pattern, + validation_result=validation_result, + ) + return explanation + + def __analyze_patterns( + self, text: str, flags: int = None + ) -> List[RecognizerResult]: + """ + Evaluate all patterns in the provided text. + + Including words in the provided deny-list + + :param text: text to analyze + :param flags: regex flags + :return: A list of RecognizerResult + """ + flags = flags if flags else re.DOTALL | re.MULTILINE + results = [] + for pattern in self.patterns: + match_start_time = datetime.datetime.now() + matches = re.finditer(pattern.regex, text, flags=flags) + match_time = datetime.datetime.now() - match_start_time + logger.debug( + "--- match_time[%s]: %s.%s seconds", + pattern.name, + match_time.seconds, + match_time.microseconds, + ) + + for match in matches: + start, end = match.span() + + current_match = text[start:end] + + # Skip empty results + if current_match == "": + continue + + description = self.build_regex_explanation( + self.name, pattern.name, pattern.regex, pattern.score, False + ) + + pattern.improve_score(current_match, match, description) + self.improve_score(pattern, current_match, match, description) + + pattern_result = RecognizerResult( + entity_type=self.supported_entities[0], + start=start, + end=end, + score=description.score, + analysis_explanation=description, + recognition_metadata={ + RecognizerResult.RECOGNIZER_NAME_KEY: self.name, + RecognizerResult.RECOGNIZER_IDENTIFIER_KEY: self.id, + }, + ) + + if pattern_result.score > EntityRecognizer.MIN_SCORE: + results.append(pattern_result) + + # Update analysis explanation score following validation or invalidation + description.score = pattern_result.score + + results = EntityRecognizer.remove_duplicates(results) + return results + + def to_dict(self) -> Dict: + """Serialize instance into a dictionary.""" + return_dict = super().to_dict() + + return_dict["patterns"] = [pat.to_dict() for pat in self.patterns] + return_dict["deny_list"] = self.deny_list + return_dict["context"] = self.context + return_dict["supported_entity"] = return_dict["supported_entities"][0] + del return_dict["supported_entities"] + + return return_dict + + @classmethod + def from_dict(cls, entity_recognizer_dict: Dict) -> "ImprovablePatternRecognizer": + """Create instance from a serialized dict.""" + patterns = entity_recognizer_dict.get("patterns") + if patterns: + patterns_list = [Pattern.from_dict(pat) for pat in patterns] + entity_recognizer_dict["patterns"] = patterns_list + + return cls(**entity_recognizer_dict) + + def improve_score( + self, + pattern: ImprovablePattern, + matched_text: str, + match: Match, + analysis_explanation: AnalysisExplanation, + ): + """ + Logic to improve the recognizer score. + + Override this method if is needed an improvement + of the score at recognizer level. It will be applied + to all patterns. + + :param pattern: the pattern that matched. + :param pattern_text: the text matched. + :param match: the regex match info. + :param analysis_explanation: the analysis explanation that should improved. + Modify the score and explanation. + """ + pass diff --git a/presidio-analyzer/presidio_analyzer/predefined_recognizers/us_itin_recognizer.py b/presidio-analyzer/presidio_analyzer/predefined_recognizers/us_itin_recognizer.py index 21618dc27..1bf200b5c 100644 --- a/presidio-analyzer/presidio_analyzer/predefined_recognizers/us_itin_recognizer.py +++ b/presidio-analyzer/presidio_analyzer/predefined_recognizers/us_itin_recognizer.py @@ -1,9 +1,41 @@ from typing import Optional, List +from presidio_analyzer import ( + AnalysisExplanation, + ImprovablePatternRecognizer, + ImprovablePattern, +) -from presidio_analyzer import Pattern, PatternRecognizer +from regex import Match -class UsItinRecognizer(PatternRecognizer): +def improve_itin_pattern( + pattern: ImprovablePattern, matched_text: str, match: Match, analysis_explanation: AnalysisExplanation +): + """ + Change the score of the itin by checking if contains '-' or ' ' characters as separators. + """ + first_separator = match.group(1) + second_separator = match.group(2) + + if first_separator and second_separator: + return + + if not first_separator and not second_separator: + analysis_explanation.pattern_name = "Itin (weak)" + analysis_explanation.set_improved_score(0.3) + analysis_explanation.append_textual_explanation_line( + "Weak pattern. No separators" + ) + return + + analysis_explanation.pattern_name = "Itin (very weak)" + analysis_explanation.set_improved_score(0.05) + analysis_explanation.append_textual_explanation_line( + "Very Weak pattern. Only one separator" + ) + + +class UsItinRecognizer(ImprovablePatternRecognizer): """ Recognizes US ITIN (Individual Taxpayer Identification Number) using regex. @@ -14,20 +46,11 @@ class UsItinRecognizer(PatternRecognizer): """ PATTERNS = [ - Pattern( - "Itin (very weak)", - r"\b9\d{2}[- ](5\d|6[0-5]|7\d|8[0-8]|9([0-2]|[4-9]))\d{4}\b|\b9\d{2}(5\d|6[0-5]|7\d|8[0-8]|9([0-2]|[4-9]))[- ]\d{4}\b", # noqa: E501 - 0.05, - ), - Pattern( - "Itin (weak)", - r"\b9\d{2}(5\d|6[0-5]|7\d|8[0-8]|9([0-2]|[4-9]))\d{4}\b", # noqa: E501 - 0.3, - ), - Pattern( + ImprovablePattern( "Itin (medium)", - r"\b9\d{2}[- ](5\d|6[0-5]|7\d|8[0-8]|9([0-2]|[4-9]))[- ]\d{4}\b", # noqa: E501 + r"\b9\d{2}([- ]?)(?:5\d|6[0-5]|7\d|8[0-8]|9(?:[0-2]|[4-9]))([- ]?)\d{4}\b", 0.5, + improve_itin_pattern, ), ] @@ -35,7 +58,7 @@ class UsItinRecognizer(PatternRecognizer): def __init__( self, - patterns: Optional[List[Pattern]] = None, + patterns: Optional[List[ImprovablePattern]] = None, context: Optional[List[str]] = None, supported_language: str = "en", supported_entity: str = "US_ITIN", diff --git a/presidio-analyzer/presidio_analyzer/predefined_recognizers/us_ssn_recognizer.py b/presidio-analyzer/presidio_analyzer/predefined_recognizers/us_ssn_recognizer.py index 011b351fa..ee394439e 100644 --- a/presidio-analyzer/presidio_analyzer/predefined_recognizers/us_ssn_recognizer.py +++ b/presidio-analyzer/presidio_analyzer/predefined_recognizers/us_ssn_recognizer.py @@ -1,10 +1,44 @@ -from collections import defaultdict from typing import List, Optional +from presidio_analyzer import ( + AnalysisExplanation, + EntityRecognizer, + ImprovablePatternRecognizer, + ImprovablePattern, +) +from presidio_analyzer.string_sanitizers import StringSanitizer, TranslateSanitizer +from regex import Match -from presidio_analyzer import Pattern, PatternRecognizer +def improve_ssn_pattern( + pattern: ImprovablePattern, matched_text: str, match: Match, analysis_explanation: AnalysisExplanation +): + """ + Change the score of the ssn by checking if contains separator characters. + """ + first_separator = match.group(1) + second_separator = match.group(2) + analysis_explanation.set_improved_score(0.05) + + if first_separator and second_separator: + if first_separator != second_separator: + analysis_explanation.set_improved_score(EntityRecognizer.MIN_SCORE) + else: + analysis_explanation.set_improved_score(pattern.score) + elif not first_separator and not second_separator: + analysis_explanation.pattern_name = "SSN4 (very weak)" + elif first_separator: + if first_separator == "-": + analysis_explanation.pattern_name = "SSN2 (very weak)" + else: + analysis_explanation.set_improved_score(EntityRecognizer.MIN_SCORE) + else: + if second_separator == "-": + analysis_explanation.pattern_name = "SSN1 (very weak)" + else: + analysis_explanation.set_improved_score(EntityRecognizer.MIN_SCORE) -class UsSsnRecognizer(PatternRecognizer): + +class UsSsnRecognizer(ImprovablePatternRecognizer): """Recognize US Social Security Number (SSN) using regex. :param patterns: List of patterns to be used by this recognizer @@ -14,13 +48,12 @@ class UsSsnRecognizer(PatternRecognizer): """ PATTERNS = [ - Pattern("SSN1 (very weak)", r"\b([0-9]{5})-([0-9]{4})\b", 0.05), # noqa E501 - Pattern("SSN2 (very weak)", r"\b([0-9]{3})-([0-9]{6})\b", 0.05), # noqa E501 - Pattern( - "SSN3 (very weak)", r"\b(([0-9]{3})-([0-9]{2})-([0-9]{4}))\b", 0.05 - ), # noqa E501 - Pattern("SSN4 (very weak)", r"\b[0-9]{9}\b", 0.05), - Pattern("SSN5 (medium)", r"\b([0-9]{3})[- .]([0-9]{2})[- .]([0-9]{4})\b", 0.5), + ImprovablePattern( + "SSN5 (medium)", + r"\b[0-9]{3}([\.\- ])?[0-9]{2}([\.\- ])?[0-9]{4}\b", + 0.5, + improve_ssn_pattern, + ) ] CONTEXT = [ @@ -36,10 +69,11 @@ class UsSsnRecognizer(PatternRecognizer): def __init__( self, - patterns: Optional[List[Pattern]] = None, + patterns: Optional[List[ImprovablePatternRecognizer]] = None, context: Optional[List[str]] = None, supported_language: str = "en", supported_entity: str = "US_SSN", + sanitizer: Optional[StringSanitizer] = None, ): patterns = patterns if patterns else self.PATTERNS context = context if context else self.CONTEXT @@ -49,24 +83,20 @@ def __init__( context=context, supported_language=supported_language, ) + self.sanitizer = sanitizer or TranslateSanitizer({".": "", "-": "", " ": ""}) - def invalidate_result(self, pattern_text: str) -> bool: - """ - Check if the pattern text cannot be validated as a US_SSN entity. - - :param pattern_text: Text detected as pattern by regex - :return: True if invalidated - """ - # if there are delimiters, make sure both delimiters are the same - delimiter_counts = defaultdict(int) - for c in pattern_text: - if c in (".", "-", " "): - delimiter_counts[c] += 1 - if len(delimiter_counts.keys()) > 1: - # mismatched delimiters - return True + def improve_score( + self, + pattern: ImprovablePattern, + matched_text: str, + match: Match, + analysis_explanation: AnalysisExplanation, + ): + sanitized_value = self.sanitizer.sanitize(matched_text) + if self.is_invalid_ssn(sanitized_value): + analysis_explanation.set_improved_score(EntityRecognizer.MIN_SCORE) - only_digits = "".join(c for c in pattern_text if c.isdigit()) + def is_invalid_ssn(self, only_digits: str) -> bool: if all(only_digits[0] == c for c in only_digits): # cannot be all same digit return True diff --git a/presidio-analyzer/presidio_analyzer/string_sanitizers/__init__.py b/presidio-analyzer/presidio_analyzer/string_sanitizers/__init__.py new file mode 100644 index 000000000..8545f6493 --- /dev/null +++ b/presidio-analyzer/presidio_analyzer/string_sanitizers/__init__.py @@ -0,0 +1,18 @@ +"""String sanitizers init.""" +from .string_sanitizer import ( + StringSanitizer, + RegexReplaceSanitizer, + TranslateSanitizer, + WhiteSpaceSanitizer, + HyphenSanitizer, + HyphenWhiteSpaceSanitizer, +) + +__all__ = [ + "StringSanitizer", + "RegexReplaceSanitizer", + "TranslateSanitizer", + "WhiteSpaceSanitizer", + "HyphenSanitizer", + "HyphenWhiteSpaceSanitizer", +] diff --git a/presidio-analyzer/presidio_analyzer/string_sanitizers/string_sanitizer.py b/presidio-analyzer/presidio_analyzer/string_sanitizers/string_sanitizer.py new file mode 100644 index 000000000..2a009a255 --- /dev/null +++ b/presidio-analyzer/presidio_analyzer/string_sanitizers/string_sanitizer.py @@ -0,0 +1,59 @@ +import regex as re + + +class StringSanitizer: + """Cleans a string.""" + + def sanitize(self, text: str) -> str: + return text + + +class RegexReplaceSanitizer(StringSanitizer): + """ + Replace parts of a string using a regex to search the term to replace. + """ + + def __init__(self, regex: str, replace: str) -> None: + self.regex = regex + self.replace = replace + + def sanitize(self, text: str) -> str: + return re.sub(self.regex, self.replace, text) + + +class TranslateSanitizer(StringSanitizer): + """ + Replace characters of a string using a translate table. + """ + + def __init__(self, *trans_table) -> None: + """ + Build sanitized using str.maketrans(...) params. + + See https://docs.python.org/3/library/stdtypes.html#str.maketrans + """ + self.trans_table = str.maketrans(*trans_table) + + def sanitize(self, text: str) -> str: + return text.translate(self.trans_table) + + +class WhiteSpaceSanitizer(TranslateSanitizer): + """Removes all white spaces from the string""" + + def __init__(self) -> None: + super().__init__({" ": ""}) + + +class HyphenSanitizer(TranslateSanitizer): + """Removes all '-' characters from the string""" + + def __init__(self) -> None: + super().__init__({"-": ""}) + + +class HyphenWhiteSpaceSanitizer(TranslateSanitizer): + """Removes all '-' or white space characters from the string""" + + def __init__(self) -> None: + super().__init__({"-": "", " ": ""}) diff --git a/presidio-analyzer/tests/test_us_itin_recognizer.py b/presidio-analyzer/tests/test_us_itin_recognizer.py index 9f4326646..67c8215ec 100644 --- a/presidio-analyzer/tests/test_us_itin_recognizer.py +++ b/presidio-analyzer/tests/test_us_itin_recognizer.py @@ -18,11 +18,17 @@ def entities(): "text, expected_len, expected_positions, expected_score_ranges", [ # fmt: off - ("911-701234 91170-1234", 2, ((0, 10), (11, 21),), ((0.0, 0.3), (0.0, 0.3),),), + ("911-701234 91170-1234", 2, + ((0, 10), (11, 21),), + ((0.0, 0.3), (0.0, 0.3),),), + ("911 701234 91170 1234", 2, + ((0, 10), (11, 21),), + ((0.0, 0.3), (0.0, 0.3),),), ("911701234", 1, ((0, 9),), ((0.3, 0.4),),), ("911-70-1234", 1, ((0, 11),), ((0.5, 0.6),),), ("911-53-1234", 1, ((0, 11),), ((0.5, 0.6),),), ("911-64-1234", 1, ((0, 11),), ((0.5, 0.6),),), + ("911 63 7534", 1, ((0, 11),), ((0.5, 0.6),),), ("911-89-1234", 0, (), (),), ("my tax id 911-89-1234", 0, (), (),), # fmt: on