2 Helper class to create ICU rules from a configuration file.
11 from icu import Transliterator
13 from nominatim.config import flatten_config_list
14 from nominatim.db.properties import set_property, get_property
15 from nominatim.errors import UsageError
16 from nominatim.tokenizer.place_sanitizer import PlaceSanitizer
17 import nominatim.tokenizer.icu_variants as variants
19 LOG = logging.getLogger()
21 DBCFG_IMPORT_NORM_RULES = "tokenizer_import_normalisation"
22 DBCFG_IMPORT_TRANS_RULES = "tokenizer_import_transliteration"
23 DBCFG_IMPORT_ANALYSIS_RULES = "tokenizer_import_analysis_rules"
26 def _get_section(rules, section):
27 """ Get the section named 'section' from the rules. If the section does
28 not exist, raise a usage error with a meaningful message.
30 if section not in rules:
31 LOG.fatal("Section '%s' not found in tokenizer config.", section)
32 raise UsageError("Syntax error in tokenizer configuration file.")
38 """ Saves a single variant expansion.
40 An expansion consists of the normalized replacement term and
41 a dicitonary of properties that describe when the expansion applies.
44 def __init__(self, replacement, properties):
45 self.replacement = replacement
46 self.properties = properties or {}
50 """ Compiler for ICU rules from a tokenizer configuration file.
53 def __init__(self, config):
54 rules = config.load_sub_configuration('icu_tokenizer.yaml',
55 config='TOKENIZER_CONFIG')
57 self.normalization_rules = self._cfg_to_icu_rules(rules, 'normalization')
58 self.transliteration_rules = self._cfg_to_icu_rules(rules, 'transliteration')
59 self.analysis_rules = _get_section(rules, 'token-analysis')
60 self._setup_analysis()
62 # Load optional sanitizer rule set.
63 self.sanitizer_rules = rules.get('sanitizers', [])
66 def load_config_from_db(self, conn):
67 """ Get previously saved parts of the configuration from the
70 self.normalization_rules = get_property(conn, DBCFG_IMPORT_NORM_RULES)
71 self.transliteration_rules = get_property(conn, DBCFG_IMPORT_TRANS_RULES)
72 self.analysis_rules = json.loads(get_property(conn, DBCFG_IMPORT_ANALYSIS_RULES))
73 self._setup_analysis()
76 def save_config_to_db(self, conn):
77 """ Save the part of the configuration that cannot be changed into
80 set_property(conn, DBCFG_IMPORT_NORM_RULES, self.normalization_rules)
81 set_property(conn, DBCFG_IMPORT_TRANS_RULES, self.transliteration_rules)
82 set_property(conn, DBCFG_IMPORT_ANALYSIS_RULES, json.dumps(self.analysis_rules))
85 def make_sanitizer(self):
86 """ Create a place sanitizer from the configured rules.
88 return PlaceSanitizer(self.sanitizer_rules)
91 def make_token_analysis(self):
92 """ Create a token analyser from the reviouly loaded rules.
94 return self.analysis[None].create(self.normalization_rules,
95 self.transliteration_rules)
98 def get_search_rules(self):
99 """ Return the ICU rules to be used during search.
100 The rules combine normalization and transliteration.
102 # First apply the normalization rules.
103 rules = io.StringIO()
104 rules.write(self.normalization_rules)
106 # Then add transliteration.
107 rules.write(self.transliteration_rules)
108 return rules.getvalue()
111 def get_normalization_rules(self):
112 """ Return rules for normalisation of a term.
114 return self.normalization_rules
117 def get_transliteration_rules(self):
118 """ Return the rules for converting a string into its asciii representation.
120 return self.transliteration_rules
123 def _setup_analysis(self):
124 """ Process the rules used for creating the various token analyzers.
128 if not isinstance(self.analysis_rules, list):
129 raise UsageError("Configuration section 'token-analysis' must be a list.")
131 for section in self.analysis_rules:
132 name = section.get('id', None)
133 if name in self.analysis:
135 LOG.fatal("ICU tokenizer configuration has two default token analyzers.")
137 LOG.fatal("ICU tokenizer configuration has two token "
138 "analyzers with id '%s'.", name)
139 UsageError("Syntax error in ICU tokenizer config.")
140 self.analysis[name] = TokenAnalyzerRule(section, self.normalization_rules)
144 def _cfg_to_icu_rules(rules, section):
145 """ Load an ICU ruleset from the given section. If the section is a
146 simple string, it is interpreted as a file name and the rules are
147 loaded verbatim from the given file. The filename is expected to be
148 relative to the tokenizer rule file. If the section is a list then
149 each line is assumed to be a rule. All rules are concatenated and returned.
151 content = _get_section(rules, section)
156 return ';'.join(flatten_config_list(content, section)) + ';'
159 class TokenAnalyzerRule:
160 """ Factory for a single analysis module. The class saves the configuration
161 and creates a new token analyzer on request.
164 def __init__(self, rules, normalization_rules):
165 # Find the analysis module
166 module_name = 'nominatim.tokenizer.token_analysis.' \
167 + _get_section(rules, 'analyzer').replace('-', '_')
168 analysis_mod = importlib.import_module(module_name)
169 self._mod_create = analysis_mod.create
171 # Load the configuration.
173 self._parse_variant_list(rules.get('variants'), normalization_rules)
176 def create(self, normalization_rules, transliteration_rules):
177 """ Create an analyzer from the given rules.
179 return self._mod_create(normalization_rules,
180 transliteration_rules,
184 def _parse_variant_list(self, rules, normalization_rules):
190 rules = flatten_config_list(rules, 'variants')
192 vmaker = _VariantMaker(normalization_rules)
195 for section in rules:
196 # Create the property field and deduplicate against existing
198 props = variants.ICUVariantProperties.from_rules(section)
199 for existing in properties:
200 if existing == props:
204 properties.append(props)
206 for rule in (section.get('words') or []):
207 vset.update(vmaker.compute(rule, props))
209 self.config['variants'] = vset
213 """ Generater for all necessary ICUVariants from a single variant rule.
215 All text in rules is normalized to make sure the variants match later.
218 def __init__(self, norm_rules):
219 self.norm = Transliterator.createFromRules("rule_loader_normalization",
223 def compute(self, rule, props):
224 """ Generator for all ICUVariant tuples from a single variant rule.
226 parts = re.split(r'(\|)?([=-])>', rule)
228 raise UsageError("Syntax error in variant rule: " + rule)
230 decompose = parts[1] is None
231 src_terms = [self._parse_variant_word(t) for t in parts[0].split(',')]
232 repl_terms = (self.norm.transliterate(t.strip()) for t in parts[3].split(','))
234 # If the source should be kept, add a 1:1 replacement
236 for src in src_terms:
238 for froms, tos in _create_variants(*src, src[0], decompose):
239 yield variants.ICUVariant(froms, tos, props)
241 for src, repl in itertools.product(src_terms, repl_terms):
243 for froms, tos in _create_variants(*src, repl, decompose):
244 yield variants.ICUVariant(froms, tos, props)
247 def _parse_variant_word(self, name):
249 match = re.fullmatch(r'([~^]?)([^~$^]*)([~$]?)', name)
250 if match is None or (match.group(1) == '~' and match.group(3) == '~'):
251 raise UsageError("Invalid variant word descriptor '{}'".format(name))
252 norm_name = self.norm.transliterate(match.group(2))
256 return norm_name, match.group(1), match.group(3)
259 _FLAG_MATCH = {'^': '^ ',
264 def _create_variants(src, preflag, postflag, repl, decompose):
266 postfix = _FLAG_MATCH[postflag]
267 # suffix decomposition
269 repl = repl + postfix
272 yield ' ' + src, ' ' + repl
275 yield src, ' ' + repl
276 yield ' ' + src, repl
277 elif postflag == '~':
278 # prefix decomposition
279 prefix = _FLAG_MATCH[preflag]
284 yield src + ' ', repl + ' '
287 yield src, repl + ' '
288 yield src + ' ', repl
290 prefix = _FLAG_MATCH[preflag]
291 postfix = _FLAG_MATCH[postflag]
293 yield prefix + src + postfix, prefix + repl + postfix