]> git.openstreetmap.org Git - nominatim.git/blob - nominatim/tokenizer/icu_rule_loader.py
269faed981abbbb9ffc530bd32d6b38ae0c30df4
[nominatim.git] / nominatim / tokenizer / icu_rule_loader.py
1 """
2 Helper class to create ICU rules from a configuration file.
3 """
4 import io
5 import logging
6 from collections import defaultdict
7 import itertools
8
9 import yaml
10 from icu import Transliterator
11
12 from nominatim.errors import UsageError
13
14 LOG = logging.getLogger()
15
16
17 class ICURuleLoader:
18     """ Compiler for ICU rules from a tokenizer configuration file.
19     """
20
21     def __init__(self, configfile):
22         self.configfile = configfile
23         self.compound_suffixes = set()
24         self.abbreviations = defaultdict()
25
26         if configfile.suffix == '.yaml':
27             self._load_from_yaml()
28         else:
29             raise UsageError("Unknown format of tokenizer configuration.")
30
31
32     def get_search_rules(self):
33         """ Return the ICU rules to be used during search.
34             The rules combine normalization, compound decomposition (including
35             abbreviated compounds) and transliteration.
36         """
37         # First apply the normalization rules.
38         rules = io.StringIO()
39         rules.write(self.normalization_rules)
40
41         # For all compound suffixes: add them in their full and any abbreviated form.
42         suffixes = set()
43         for suffix in self.compound_suffixes:
44             suffixes.add(suffix)
45             suffixes.update(self.abbreviations.get(suffix, []))
46
47         for suffix in sorted(suffixes, key=len, reverse=True):
48             rules.write("'{0} ' > ' {0} ';".format(suffix))
49
50         # Finally add transliteration.
51         rules.write(self.transliteration_rules)
52         return rules.getvalue()
53
54     def get_normalization_rules(self):
55         """ Return rules for normalisation of a term.
56         """
57         return self.normalization_rules
58
59     def get_transliteration_rules(self):
60         """ Return the rules for converting a string into its asciii representation.
61         """
62         return self.transliteration_rules
63
64     def get_replacement_pairs(self):
65         """ Return the list of possible compound decompositions with
66             application of abbreviations included.
67             The result is a list of pairs: the first item is the sequence to
68             replace, the second is a list of replacements.
69         """
70         synonyms = defaultdict(set)
71
72         for full, abbr in self.abbreviations.items():
73             key = ' ' + full + ' '
74             # Entries in the abbreviation list always apply to full words:
75             synonyms[key].update((' ' + a + ' ' for a in abbr))
76             # Replacements are optional, so add a noop
77             synonyms[key].add(key)
78
79         # Entries in the compound list expand to themselves and to
80         # abbreviations.
81         for suffix in self.compound_suffixes:
82             keyset = synonyms[suffix + ' ']
83             keyset.add(' ' + suffix + ' ')
84             keyset.update((' ' + a + ' ' for a in self.abbreviations.get(suffix, [])))
85             # The terms the entries are shortended to, need to be decompunded as well.
86             for abbr in self.abbreviations.get(suffix, []):
87                 synonyms[abbr + ' '].add(' ' + abbr + ' ')
88
89         # sort the resulting list by descending length (longer matches are prefered).
90         sorted_keys = sorted(synonyms.keys(), key=len, reverse=True)
91
92         return [(k, list(synonyms[k])) for k in sorted_keys]
93
94
95     def _load_from_yaml(self):
96         rules = yaml.safe_load(self.configfile.read_text())
97
98         self.normalization_rules = self._cfg_to_icu_rules(rules, 'normalization')
99         self.transliteration_rules = self._cfg_to_icu_rules(rules, 'transliteration')
100         self._parse_compound_suffix_list(self._get_section(rules, 'compound_suffixes'))
101         self._parse_abbreviation_list(self._get_section(rules, 'abbreviations'))
102
103
104     def _get_section(self, rules, section):
105         """ Get the section named 'section' from the rules. If the section does
106             not exist, raise a usage error with a meaningful message.
107         """
108         if section not in rules:
109             LOG.fatal("Section '%s' not found in tokenizer config '%s'.",
110                       section, str(self.configfile))
111             raise UsageError("Syntax error in tokenizer configuration file.")
112
113         return rules[section]
114
115
116     def _cfg_to_icu_rules(self, rules, section):
117         """ Load an ICU ruleset from the given section. If the section is a
118             simple string, it is interpreted as a file name and the rules are
119             loaded verbatim from the given file. The filename is expected to be
120             relative to the tokenizer rule file. If the section is a list then
121             each line is assumed to be a rule. All rules are concatenated and returned.
122         """
123         content = self._get_section(rules, section)
124
125         if content is None:
126             return ''
127
128         if isinstance(content, str):
129             return (self.configfile.parent / content).read_text().replace('\n', ' ')
130
131         return ';'.join(content) + ';'
132
133
134     def _parse_compound_suffix_list(self, rules):
135         if not rules:
136             self.compound_suffixes = set()
137             return
138
139         norm = Transliterator.createFromRules("rule_loader_normalization",
140                                               self.normalization_rules)
141
142         # Make sure all suffixes are in their normalised form.
143         self.compound_suffixes = set((norm.transliterate(s) for s in rules))
144
145
146     def _parse_abbreviation_list(self, rules):
147         self.abbreviations = defaultdict(list)
148
149         if not rules:
150             return
151
152         norm = Transliterator.createFromRules("rule_loader_normalization",
153                                               self.normalization_rules)
154
155         for rule in rules:
156             parts = rule.split('=>')
157             if len(parts) != 2:
158                 LOG.fatal("Syntax error in abbreviation section, line: %s", rule)
159                 raise UsageError("Syntax error in tokenizer configuration file.")
160
161             # Make sure all terms match the normalised version.
162             fullterms = (norm.transliterate(t.strip()) for t in parts[0].split(','))
163             abbrterms = (norm.transliterate(t.strip()) for t in parts[1].split(','))
164
165             for full, abbr in itertools.product(fullterms, abbrterms):
166                 if full and abbr:
167                     self.abbreviations[full].append(abbr)