1 # SPDX-License-Identifier: GPL-3.0-or-later
3 # This file is part of Nominatim. (https://nominatim.org)
5 # Copyright (C) 2024 by the Nominatim developer community.
6 # For a full list of authors see the git log.
8 Module containing the class handling the import
9 of the special phrases.
11 Phrases are analyzed and imported into the database.
13 The phrases already present in the database which are not
14 valids anymore are removed.
16 from typing import Iterable, Tuple, Mapping, Sequence, Optional, Set
19 from psycopg.sql import Identifier, SQL
21 from ...typing import Protocol
22 from ...config import Configuration
23 from ...db.connection import Connection, drop_tables, index_exists
24 from .importer_statistics import SpecialPhrasesImporterStatistics
25 from .special_phrase import SpecialPhrase
26 from ...tokenizer.base import AbstractTokenizer
28 LOG = logging.getLogger()
31 def _classtype_table(phrase_class: str, phrase_type: str) -> str:
32 """ Return the name of the table for the given class and type.
34 return f'place_classtype_{phrase_class}_{phrase_type}'
37 class SpecialPhraseLoader(Protocol):
38 """ Protocol for classes implementing a loader for special phrases.
41 def generate_phrases(self) -> Iterable[SpecialPhrase]:
42 """ Generates all special phrase terms this loader can produce.
48 Class handling the process of special phrases importation into the database.
50 Take a sp loader which load the phrases from an external source.
52 def __init__(self, config: Configuration, conn: Connection,
53 sp_loader: SpecialPhraseLoader) -> None:
55 self.db_connection = conn
56 self.sp_loader = sp_loader
57 self.statistics_handler = SpecialPhrasesImporterStatistics()
58 self.black_list, self.white_list = self._load_white_and_black_lists()
59 self.sanity_check_pattern = re.compile(r'^\w+$')
60 # This set will contain all existing phrases to be added.
61 # It contains tuples with the following format: (label, class, type, operator)
62 self.word_phrases: Set[Tuple[str, str, str, str]] = set()
63 # This set will contain all existing place_classtype tables which doesn't match any
64 # special phrases class/type on the wiki.
65 self.table_phrases_to_delete: Set[str] = set()
67 def get_classtype_pairs(self, min: int = 0) -> Set[Tuple[str, str]]:
69 Returns list of allowed special phrases from the database,
70 restricting to a list of combinations of classes and types
71 which occur more than a specified amount of times.
73 Default value for this, if not specified, is at least once.
75 db_combinations = set()
77 SELECT class AS CLS, type AS typ
80 HAVING COUNT(*) > {min}
83 with self.db_connection.cursor() as db_cursor:
84 db_cursor.execute(SQL(query))
86 db_combinations.add((row[0], row[1]))
88 return db_combinations
90 def import_phrases(self, tokenizer: AbstractTokenizer, should_replace: bool) -> None:
92 Iterate through all SpecialPhrases extracted from the
93 loader and import them into the database.
95 If should_replace is set to True only the loaded phrases
96 will be kept into the database. All other phrases already
97 in the database will be removed.
99 LOG.warning('Special phrases importation starting')
100 self._fetch_existing_place_classtype_tables()
102 # Store pairs of class/type for further processing
103 class_type_pairs = set()
105 for phrase in self.sp_loader.generate_phrases():
106 result = self._process_phrase(phrase)
108 class_type_pairs.add(result)
110 self._create_classtype_table_and_indexes(class_type_pairs)
112 self._remove_non_existent_tables_from_db()
114 self.db_connection.commit()
116 with tokenizer.name_analyzer() as analyzer:
117 analyzer.update_special_phrases(self.word_phrases, should_replace)
119 LOG.warning('Import done.')
120 self.statistics_handler.notify_import_done()
122 def _fetch_existing_place_classtype_tables(self) -> None:
124 Fetch existing place_classtype tables.
125 Fill the table_phrases_to_delete set of the class.
129 FROM information_schema.tables
130 WHERE table_schema='public'
131 AND table_name like 'place_classtype_%';
133 with self.db_connection.cursor() as db_cursor:
134 db_cursor.execute(SQL(query))
135 for row in db_cursor:
136 self.table_phrases_to_delete.add(row[0])
138 def _load_white_and_black_lists(self) \
139 -> Tuple[Mapping[str, Sequence[str]], Mapping[str, Sequence[str]]]:
141 Load white and black lists from phrases-settings.json.
143 settings = self.config.load_sub_configuration('phrase-settings.json')
145 return settings['blackList'], settings['whiteList']
147 def _check_sanity(self, phrase: SpecialPhrase) -> bool:
149 Check sanity of given inputs in case somebody added garbage in the wiki.
150 If a bad class/type is detected the system will exit with an error.
152 class_matchs = self.sanity_check_pattern.findall(phrase.p_class)
153 type_matchs = self.sanity_check_pattern.findall(phrase.p_type)
155 if not class_matchs or not type_matchs:
156 LOG.warning("Bad class/type: %s=%s. It will not be imported",
157 phrase.p_class, phrase.p_type)
161 def _process_phrase(self, phrase: SpecialPhrase) -> Optional[Tuple[str, str]]:
163 Processes the given phrase by checking black and white list
165 Return the class/type pair corresponding to the phrase.
168 # blacklisting: disallow certain class/type combinations
169 if phrase.p_class in self.black_list.keys() \
170 and phrase.p_type in self.black_list[phrase.p_class]:
173 # whitelisting: if class is in whitelist, allow only tags in the list
174 if phrase.p_class in self.white_list.keys() \
175 and phrase.p_type not in self.white_list[phrase.p_class]:
178 # sanity check, in case somebody added garbage in the wiki
179 if not self._check_sanity(phrase):
180 self.statistics_handler.notify_one_phrase_invalid()
183 self.word_phrases.add((phrase.p_label, phrase.p_class,
184 phrase.p_type, phrase.p_operator))
186 return (phrase.p_class, phrase.p_type)
188 def _create_classtype_table_and_indexes(self,
189 class_type_pairs: Iterable[Tuple[str, str]]) -> None:
191 Create table place_classtype for each given pair.
192 Also create indexes on place_id and centroid.
194 LOG.warning('Create tables and indexes...')
196 sql_tablespace = self.config.TABLESPACE_AUX_DATA
198 sql_tablespace = ' TABLESPACE ' + sql_tablespace
200 with self.db_connection.cursor() as db_cursor:
201 db_cursor.execute("CREATE INDEX idx_placex_classtype ON placex (class, type)")
203 allowed_special_phrases = self.get_classtype_pairs()
205 for pair in class_type_pairs:
206 phrase_class = pair[0]
207 phrase_type = pair[1]
209 if (phrase_class, phrase_type) not in allowed_special_phrases:
210 LOG.warning("Skipping phrase %s=%s: not in allowed special phrases",
211 phrase_class, phrase_type)
214 table_name = _classtype_table(phrase_class, phrase_type)
216 if table_name in self.table_phrases_to_delete:
217 self.statistics_handler.notify_one_table_ignored()
218 # Remove this table from the ones to delete as it match a
219 # class/type still existing on the special phrases of the wiki.
220 self.table_phrases_to_delete.remove(table_name)
221 # So don't need to create the table and indexes.
225 self._create_place_classtype_table(sql_tablespace, phrase_class, phrase_type)
228 self._create_place_classtype_indexes(sql_tablespace, phrase_class, phrase_type)
230 # Grant access on read to the web user.
231 self._grant_access_to_webuser(phrase_class, phrase_type)
233 self.statistics_handler.notify_one_table_created()
235 with self.db_connection.cursor() as db_cursor:
236 db_cursor.execute("DROP INDEX idx_placex_classtype")
238 def _create_place_classtype_table(self, sql_tablespace: str,
239 phrase_class: str, phrase_type: str) -> None:
241 Create table place_classtype of the given phrase_class/phrase_type
244 table_name = _classtype_table(phrase_class, phrase_type)
245 with self.db_connection.cursor() as cur:
246 cur.execute(SQL("""CREATE TABLE IF NOT EXISTS {} {} AS
247 SELECT place_id AS place_id,
248 st_centroid(geometry) AS centroid
250 WHERE class = %s AND type = %s
251 """).format(Identifier(table_name), SQL(sql_tablespace)),
252 (phrase_class, phrase_type))
254 def _create_place_classtype_indexes(self, sql_tablespace: str,
255 phrase_class: str, phrase_type: str) -> None:
257 Create indexes on centroid and place_id for the place_classtype table.
259 index_prefix = f'idx_place_classtype_{phrase_class}_{phrase_type}_'
260 base_table = _classtype_table(phrase_class, phrase_type)
262 if not index_exists(self.db_connection, index_prefix + 'centroid'):
263 with self.db_connection.cursor() as db_cursor:
264 db_cursor.execute(SQL("CREATE INDEX {} ON {} USING GIST (centroid) {}")
265 .format(Identifier(index_prefix + 'centroid'),
266 Identifier(base_table),
267 SQL(sql_tablespace)))
270 if not index_exists(self.db_connection, index_prefix + 'place_id'):
271 with self.db_connection.cursor() as db_cursor:
272 db_cursor.execute(SQL("CREATE INDEX {} ON {} USING btree(place_id) {}")
273 .format(Identifier(index_prefix + 'place_id'),
274 Identifier(base_table),
275 SQL(sql_tablespace)))
277 def _grant_access_to_webuser(self, phrase_class: str, phrase_type: str) -> None:
279 Grant access on read to the table place_classtype for the webuser.
281 table_name = _classtype_table(phrase_class, phrase_type)
282 with self.db_connection.cursor() as db_cursor:
283 db_cursor.execute(SQL("""GRANT SELECT ON {} TO {}""")
284 .format(Identifier(table_name),
285 Identifier(self.config.DATABASE_WEBUSER)))
287 def _remove_non_existent_tables_from_db(self) -> None:
289 Remove special phrases which doesn't exist on the wiki anymore.
290 Delete the place_classtype tables.
292 LOG.warning('Cleaning database...')
294 # Delete place_classtype tables corresponding to class/type which
295 # are not on the wiki anymore.
296 drop_tables(self.db_connection, *self.table_phrases_to_delete)
297 for _ in self.table_phrases_to_delete:
298 self.statistics_handler.notify_one_table_deleted()