]> git.openstreetmap.org Git - nominatim.git/blob - nominatim/tokenizer/icu_rule_loader.py
adapt tests for ICU tokenizer
[nominatim.git] / nominatim / tokenizer / icu_rule_loader.py
1 """
2 Helper class to create ICU rules from a configuration file.
3 """
4 import io
5 import yaml
6 import logging
7 from collections import defaultdict
8 import itertools
9
10 from icu import Transliterator
11
12 from nominatim.errors import UsageError
13
14 LOG = logging.getLogger()
15
16
17 class ICURuleLoader:
18     """ Compiler for ICU rules from a tokenizer configuration file.
19     """
20
21     def __init__(self, configfile):
22         self.configfile = configfile
23
24         if configfile.suffix == '.yaml':
25             self._load_from_yaml()
26         else:
27             raise UsageError("Unknown format of tokenizer configuration.")
28
29
30     def get_search_rules(self):
31         """ Return the ICU rules to be used during search.
32             The rules combine normalization, compound decomposition (including
33             abbreviated compounds) and transliteration.
34         """
35         # First apply the normalization rules.
36         rules = io.StringIO()
37         rules.write(self.normalization_rules)
38
39         # For all compound suffixes: add them in their full and any abbreviated form.
40         suffixes = set()
41         for suffix in self.compound_suffixes:
42             suffixes.add(suffix)
43             suffixes.update(self.abbreviations.get(suffix, []))
44
45         for suffix in sorted(suffixes, key=lambda x:len(x), reverse=True):
46             rules.write("'{0} ' > ' {0} ';".format(suffix))
47
48         # Finally add transliteration.
49         rules.write(self.transliteration_rules)
50         return rules.getvalue()
51
52     def get_normalization_rules(self):
53         """ Return rules for normalisation of a term.
54         """
55         return self.normalization_rules
56
57     def get_transliteration_rules(self):
58         """ Return the rules for converting a string into its asciii representation.
59         """
60         return self.transliteration_rules
61
62     def get_replacement_pairs(self):
63         """ Return the list of possible compound decompositions with
64             application of abbreviations included.
65             The result is a list of pairs: the first item is the sequence to
66             replace, the second is a list of replacements.
67         """
68         synonyms = defaultdict(set)
69
70         for full, abbr in self.abbreviations.items():
71             key = ' ' + full + ' '
72             # Entries in the abbreviation list always apply to full words:
73             synonyms[key].update((' ' + a + ' ' for a in abbr))
74             # Replacements are optional, so add a noop
75             synonyms[key].add(key)
76
77         # Entries in the compound list expand to themselves and to
78         # abbreviations.
79         for suffix in self.compound_suffixes:
80             keyset = synonyms[suffix + ' ']
81             keyset.add(' ' + suffix + ' ')
82             keyset.update((' ' + a + ' ' for a in self.abbreviations.get(suffix, [])))
83             # The terms the entries are shortended to, need to be decompunded as well.
84             for abbr in self.abbreviations.get(suffix, []):
85                 synonyms[abbr + ' '].add(' ' + abbr + ' ')
86
87         # sort the resulting list by descending length (longer matches are prefered).
88         sorted_keys = sorted(synonyms.keys(), key=lambda x: len(x), reverse=True)
89
90         return [(k, list(synonyms[k])) for k in sorted_keys]
91
92
93     def _load_from_yaml(self):
94         rules = yaml.load(self.configfile.read_text())
95
96         self.normalization_rules = self._cfg_to_icu_rules(rules, 'normalization')
97         self.transliteration_rules = self._cfg_to_icu_rules(rules, 'transliteration')
98         self._parse_compound_suffix_list(self._get_section(rules, 'compound_suffixes'))
99         self._parse_abbreviation_list(self._get_section(rules, 'abbreviations'))
100
101
102     def _get_section(self, rules, section):
103         """ Get the section named 'section' from the rules. If the section does
104             not exist, raise a usage error with a meaningful message.
105         """
106         if section not in rules:
107             LOG.fatal("Section '%s' not found in tokenizer config '%s'.",
108                       section, str(self.configfile))
109             raise UsageError("Syntax error in tokenizer configuration file.")
110
111         return rules[section]
112
113
114     def _cfg_to_icu_rules(self, rules, section):
115         """ Load an ICU ruleset from the given section. If the section is a
116             simple string, it is interpreted as a file name and the rules are
117             loaded verbatim from the given file. The filename is expected to be
118             relative to the tokenizer rule file. If the section is a list then
119             each line is assumed to be a rule. All rules are concatenated and returned.
120         """
121         content = self._get_section(rules, section)
122
123         if isinstance(content, str):
124             return (self.configfile.parent / content).read_text().replace('\n', ' ')
125
126         return ';'.join(content) + ';'
127
128
129     def _parse_compound_suffix_list(self, rules):
130         if not rules:
131             self.compound_suffixes = set()
132             return
133
134         norm = Transliterator.createFromRules("rule_loader_normalization",
135                                               self.normalization_rules)
136
137         # Make sure all suffixes are in their normalised form.
138         self.compound_suffixes = set((norm.transliterate(s) for s in rules))
139
140
141     def _parse_abbreviation_list(self, rules):
142         self.abbreviations = defaultdict(list)
143
144         if not rules:
145             return
146
147         norm = Transliterator.createFromRules("rule_loader_normalization",
148                                               self.normalization_rules)
149
150         for rule in rules:
151             parts = rule.split('=>')
152             if len(parts) != 2:
153                 LOG.fatal("Syntax error in abbreviation section, line: %s", rule)
154                 raise UsageError("Syntax error in tokenizer configuration file.")
155
156             # Make sure all terms match the normalised version.
157             fullterms = (norm.transliterate(t.strip()) for t in parts[0].split(','))
158             abbrterms = (norm.transliterate(t.strip()) for t in parts[1].split(','))
159
160             for full, abbr in itertools.product(fullterms, abbrterms):
161                 self.abbreviations[full].append(abbr)