]> git.openstreetmap.org Git - nominatim.git/blob - nominatim/tokenizer/icu_rule_loader.py
use yaml tag syntax to mark include files
[nominatim.git] / nominatim / tokenizer / icu_rule_loader.py
1 """
2 Helper class to create ICU rules from a configuration file.
3 """
4 import io
5 import logging
6 from collections import defaultdict
7 import itertools
8 from pathlib import Path
9
10 import yaml
11 from icu import Transliterator
12
13 from nominatim.errors import UsageError
14
15 LOG = logging.getLogger()
16
17 def _flatten_yaml_list(content):
18     if not content:
19         return []
20
21     if not isinstance(content, list):
22         raise UsageError("List expected in ICU yaml configuration.")
23
24     output = []
25     for ele in content:
26         if isinstance(ele, list):
27             output.extend(_flatten_yaml_list(ele))
28         else:
29             output.append(ele)
30
31     return output
32
33
34 class ICURuleLoader:
35     """ Compiler for ICU rules from a tokenizer configuration file.
36     """
37
38     def __init__(self, configfile):
39         self.configfile = configfile
40         self.compound_suffixes = set()
41         self.abbreviations = defaultdict()
42
43         if configfile.suffix == '.yaml':
44             self._load_from_yaml()
45         else:
46             raise UsageError("Unknown format of tokenizer configuration.")
47
48
49     def get_search_rules(self):
50         """ Return the ICU rules to be used during search.
51             The rules combine normalization and transliteration.
52         """
53         # First apply the normalization rules.
54         rules = io.StringIO()
55         rules.write(self.normalization_rules)
56
57         # Then add transliteration.
58         rules.write(self.transliteration_rules)
59         return rules.getvalue()
60
61     def get_normalization_rules(self):
62         """ Return rules for normalisation of a term.
63         """
64         return self.normalization_rules
65
66     def get_transliteration_rules(self):
67         """ Return the rules for converting a string into its asciii representation.
68         """
69         return self.transliteration_rules
70
71     def get_replacement_pairs(self):
72         """ Return the list of possible compound decompositions with
73             application of abbreviations included.
74             The result is a list of pairs: the first item is the sequence to
75             replace, the second is a list of replacements.
76         """
77         synonyms = defaultdict(set)
78
79         # First add entries for compound decomposition.
80         for suffix in self.compound_suffixes:
81             variants = (suffix + ' ', ' ' + suffix + ' ')
82             for key in variants:
83                 synonyms[key].update(variants)
84
85         for full, abbr in self.abbreviations.items():
86             key = ' ' + full + ' '
87             # Entries in the abbreviation list always apply to full words:
88             synonyms[key].update((' ' + a + ' ' for a in abbr))
89             # Replacements are optional, so add a noop
90             synonyms[key].add(key)
91
92             if full in self.compound_suffixes:
93                 # Full word abbreviating to compunded version.
94                 synonyms[key].update((a + ' ' for a in abbr))
95
96                 key = full + ' '
97                 # Uncompunded suffix abbrevitating to decompounded version.
98                 synonyms[key].update((' ' + a + ' ' for a in abbr))
99                 # Uncompunded suffix abbrevitating to compunded version.
100                 synonyms[key].update((a + ' ' for a in abbr))
101
102         # sort the resulting list by descending length (longer matches are prefered).
103         sorted_keys = sorted(synonyms.keys(), key=len, reverse=True)
104
105         return [(k, list(synonyms[k])) for k in sorted_keys]
106
107     def _yaml_include_representer(self, loader, node):
108         value = loader.construct_scalar(node)
109
110         if Path(value).is_absolute():
111             content = Path(value).read_text()
112         else:
113             content = (self.configfile.parent / value).read_text()
114
115         return yaml.safe_load(content)
116
117
118     def _load_from_yaml(self):
119         yaml.add_constructor('!include', self._yaml_include_representer,
120                              Loader=yaml.SafeLoader)
121         rules = yaml.safe_load(self.configfile.read_text())
122
123         self.normalization_rules = self._cfg_to_icu_rules(rules, 'normalization')
124         self.transliteration_rules = self._cfg_to_icu_rules(rules, 'transliteration')
125         self._parse_compound_suffix_list(self._get_section(rules, 'compound_suffixes'))
126         self._parse_abbreviation_list(self._get_section(rules, 'abbreviations'))
127
128
129     def _get_section(self, rules, section):
130         """ Get the section named 'section' from the rules. If the section does
131             not exist, raise a usage error with a meaningful message.
132         """
133         if section not in rules:
134             LOG.fatal("Section '%s' not found in tokenizer config '%s'.",
135                       section, str(self.configfile))
136             raise UsageError("Syntax error in tokenizer configuration file.")
137
138         return rules[section]
139
140
141     def _cfg_to_icu_rules(self, rules, section):
142         """ Load an ICU ruleset from the given section. If the section is a
143             simple string, it is interpreted as a file name and the rules are
144             loaded verbatim from the given file. The filename is expected to be
145             relative to the tokenizer rule file. If the section is a list then
146             each line is assumed to be a rule. All rules are concatenated and returned.
147         """
148         content = self._get_section(rules, section)
149
150         if content is None:
151             return ''
152
153         return ';'.join(_flatten_yaml_list(content)) + ';'
154
155
156
157     def _parse_compound_suffix_list(self, rules):
158         if not rules:
159             self.compound_suffixes = set()
160             return
161
162         norm = Transliterator.createFromRules("rule_loader_normalization",
163                                               self.normalization_rules)
164
165         # Make sure all suffixes are in their normalised form.
166         self.compound_suffixes = set((norm.transliterate(s) for s in rules))
167
168
169     def _parse_abbreviation_list(self, rules):
170         self.abbreviations = defaultdict(list)
171
172         if not rules:
173             return
174
175         norm = Transliterator.createFromRules("rule_loader_normalization",
176                                               self.normalization_rules)
177
178         for rule in rules:
179             parts = rule.split('=>')
180             if len(parts) != 2:
181                 LOG.fatal("Syntax error in abbreviation section, line: %s", rule)
182                 raise UsageError("Syntax error in tokenizer configuration file.")
183
184             # Make sure all terms match the normalised version.
185             fullterms = (norm.transliterate(t.strip()) for t in parts[0].split(','))
186             abbrterms = (norm.transliterate(t.strip()) for t in parts[1].split(','))
187
188             for full, abbr in itertools.product(fullterms, abbrterms):
189                 if full and abbr:
190                     self.abbreviations[full].append(abbr)