2 Helper class to create ICU rules from a configuration file.
10 from icu import Transliterator
12 from nominatim.db.properties import set_property, get_property
13 from nominatim.errors import UsageError
14 from nominatim.tokenizer.icu_name_processor import ICUNameProcessor
15 from nominatim.tokenizer.place_sanitizer import PlaceSanitizer
16 import nominatim.tokenizer.icu_variants as variants
18 LOG = logging.getLogger()
20 DBCFG_IMPORT_NORM_RULES = "tokenizer_import_normalisation"
21 DBCFG_IMPORT_TRANS_RULES = "tokenizer_import_transliteration"
22 DBCFG_IMPORT_ANALYSIS_RULES = "tokenizer_import_analysis_rules"
25 def _flatten_config_list(content):
29 if not isinstance(content, list):
30 raise UsageError("List expected in ICU configuration.")
34 if isinstance(ele, list):
35 output.extend(_flatten_config_list(ele))
43 """ Saves a single variant expansion.
45 An expansion consists of the normalized replacement term and
46 a dicitonary of properties that describe when the expansion applies.
49 def __init__(self, replacement, properties):
50 self.replacement = replacement
51 self.properties = properties or {}
55 """ Compiler for ICU rules from a tokenizer configuration file.
58 def __init__(self, config):
59 rules = config.load_sub_configuration('icu_tokenizer.yaml',
60 config='TOKENIZER_CONFIG')
64 self.normalization_rules = self._cfg_to_icu_rules(rules, 'normalization')
65 self.transliteration_rules = self._cfg_to_icu_rules(rules, 'transliteration')
66 self.analysis_rules = self._get_section(rules, 'variants')
67 self._parse_variant_list()
69 # Load optional sanitizer rule set.
70 self.sanitizer_rules = rules.get('sanitizers', [])
73 def load_config_from_db(self, conn):
74 """ Get previously saved parts of the configuration from the
77 self.normalization_rules = get_property(conn, DBCFG_IMPORT_NORM_RULES)
78 self.transliteration_rules = get_property(conn, DBCFG_IMPORT_TRANS_RULES)
79 self.analysis_rules = json.loads(get_property(conn, DBCFG_IMPORT_ANALYSIS_RULES))
80 self._parse_variant_list()
83 def save_config_to_db(self, conn):
84 """ Save the part of the configuration that cannot be changed into
87 set_property(conn, DBCFG_IMPORT_NORM_RULES, self.normalization_rules)
88 set_property(conn, DBCFG_IMPORT_TRANS_RULES, self.transliteration_rules)
89 set_property(conn, DBCFG_IMPORT_ANALYSIS_RULES, json.dumps(self.analysis_rules))
92 def make_sanitizer(self):
93 """ Create a place sanitizer from the configured rules.
95 return PlaceSanitizer(self.sanitizer_rules)
98 def make_token_analysis(self):
99 """ Create a token analyser from the reviouly loaded rules.
101 return ICUNameProcessor(self.normalization_rules,
102 self.transliteration_rules,
106 def get_search_rules(self):
107 """ Return the ICU rules to be used during search.
108 The rules combine normalization and transliteration.
110 # First apply the normalization rules.
111 rules = io.StringIO()
112 rules.write(self.normalization_rules)
114 # Then add transliteration.
115 rules.write(self.transliteration_rules)
116 return rules.getvalue()
118 def get_normalization_rules(self):
119 """ Return rules for normalisation of a term.
121 return self.normalization_rules
123 def get_transliteration_rules(self):
124 """ Return the rules for converting a string into its asciii representation.
126 return self.transliteration_rules
128 def get_replacement_pairs(self):
129 """ Return the list of possible compound decompositions with
130 application of abbreviations included.
131 The result is a list of pairs: the first item is the sequence to
132 replace, the second is a list of replacements.
138 def _get_section(rules, section):
139 """ Get the section named 'section' from the rules. If the section does
140 not exist, raise a usage error with a meaningful message.
142 if section not in rules:
143 LOG.fatal("Section '%s' not found in tokenizer config.", section)
144 raise UsageError("Syntax error in tokenizer configuration file.")
146 return rules[section]
149 def _cfg_to_icu_rules(self, rules, section):
150 """ Load an ICU ruleset from the given section. If the section is a
151 simple string, it is interpreted as a file name and the rules are
152 loaded verbatim from the given file. The filename is expected to be
153 relative to the tokenizer rule file. If the section is a list then
154 each line is assumed to be a rule. All rules are concatenated and returned.
156 content = self._get_section(rules, section)
161 return ';'.join(_flatten_config_list(content)) + ';'
164 def _parse_variant_list(self):
165 rules = self.analysis_rules
167 self.variants.clear()
172 rules = _flatten_config_list(rules)
174 vmaker = _VariantMaker(self.normalization_rules)
177 for section in rules:
178 # Create the property field and deduplicate against existing
180 props = variants.ICUVariantProperties.from_rules(section)
181 for existing in properties:
182 if existing == props:
186 properties.append(props)
188 for rule in (section.get('words') or []):
189 self.variants.update(vmaker.compute(rule, props))
193 """ Generater for all necessary ICUVariants from a single variant rule.
195 All text in rules is normalized to make sure the variants match later.
198 def __init__(self, norm_rules):
199 self.norm = Transliterator.createFromRules("rule_loader_normalization",
203 def compute(self, rule, props):
204 """ Generator for all ICUVariant tuples from a single variant rule.
206 parts = re.split(r'(\|)?([=-])>', rule)
208 raise UsageError("Syntax error in variant rule: " + rule)
210 decompose = parts[1] is None
211 src_terms = [self._parse_variant_word(t) for t in parts[0].split(',')]
212 repl_terms = (self.norm.transliterate(t.strip()) for t in parts[3].split(','))
214 # If the source should be kept, add a 1:1 replacement
216 for src in src_terms:
218 for froms, tos in _create_variants(*src, src[0], decompose):
219 yield variants.ICUVariant(froms, tos, props)
221 for src, repl in itertools.product(src_terms, repl_terms):
223 for froms, tos in _create_variants(*src, repl, decompose):
224 yield variants.ICUVariant(froms, tos, props)
227 def _parse_variant_word(self, name):
229 match = re.fullmatch(r'([~^]?)([^~$^]*)([~$]?)', name)
230 if match is None or (match.group(1) == '~' and match.group(3) == '~'):
231 raise UsageError("Invalid variant word descriptor '{}'".format(name))
232 norm_name = self.norm.transliterate(match.group(2))
236 return norm_name, match.group(1), match.group(3)
239 _FLAG_MATCH = {'^': '^ ',
244 def _create_variants(src, preflag, postflag, repl, decompose):
246 postfix = _FLAG_MATCH[postflag]
247 # suffix decomposition
249 repl = repl + postfix
252 yield ' ' + src, ' ' + repl
255 yield src, ' ' + repl
256 yield ' ' + src, repl
257 elif postflag == '~':
258 # prefix decomposition
259 prefix = _FLAG_MATCH[preflag]
264 yield src + ' ', repl + ' '
267 yield src, repl + ' '
268 yield src + ' ', repl
270 prefix = _FLAG_MATCH[preflag]
271 postfix = _FLAG_MATCH[postflag]
273 yield prefix + src + postfix, prefix + repl + postfix