1 # SPDX-License-Identifier: GPL-3.0-or-later
3 # This file is part of Nominatim. (https://nominatim.org)
5 # Copyright (C) 2024 by the Nominatim developer community.
6 # For a full list of authors see the git log.
8 Module containing the class handling the import
9 of the special phrases.
11 Phrases are analyzed and imported into the database.
13 The phrases already present in the database which are not
14 valids anymore are removed.
16 from typing import Iterable, Tuple, Mapping, Sequence, Optional, Set
19 from psycopg.sql import Identifier, SQL
21 from ...typing import Protocol
22 from ...config import Configuration
23 from ...db.connection import Connection, drop_tables, index_exists
24 from .importer_statistics import SpecialPhrasesImporterStatistics
25 from .special_phrase import SpecialPhrase
26 from ...tokenizer.base import AbstractTokenizer
28 LOG = logging.getLogger()
31 def _classtype_table(phrase_class: str, phrase_type: str) -> str:
32 """ Return the name of the table for the given class and type.
34 return f'place_classtype_{phrase_class}_{phrase_type}'
37 class SpecialPhraseLoader(Protocol):
38 """ Protocol for classes implementing a loader for special phrases.
41 def generate_phrases(self) -> Iterable[SpecialPhrase]:
42 """ Generates all special phrase terms this loader can produce.
48 Class handling the process of special phrases importation into the database.
50 Take a sp loader which load the phrases from an external source.
52 def __init__(self, config: Configuration, conn: Connection,
53 sp_loader: SpecialPhraseLoader) -> None:
55 self.db_connection = conn
56 self.sp_loader = sp_loader
57 self.statistics_handler = SpecialPhrasesImporterStatistics()
58 self.black_list, self.white_list = self._load_white_and_black_lists()
59 self.sanity_check_pattern = re.compile(r'^\w+$')
60 # This set will contain all existing phrases to be added.
61 # It contains tuples with the following format: (label, class, type, operator)
62 self.word_phrases: Set[Tuple[str, str, str, str]] = set()
63 # This set will contain all existing place_classtype tables which doesn't match any
64 # special phrases class/type on the wiki.
65 self.table_phrases_to_delete: Set[str] = set()
67 def get_classtype_pairs(self, min: int = 0) -> Set[Tuple[str, str]]:
69 Returns list of allowed special phrases from the database,
70 restricting to a list of combinations of classes and types
71 which occur equal to or more than a specified amount of times.
73 Default value for this is 0, which allows everything in database.
75 db_combinations = set()
78 SELECT class AS CLS, type AS typ
81 HAVING COUNT(*) >= {min}
84 with self.db_connection.cursor() as db_cursor:
85 db_cursor.execute(SQL(query))
87 db_combinations.add((row[0], row[1]))
89 return db_combinations
91 def import_phrases(self, tokenizer: AbstractTokenizer, should_replace: bool,
92 min: int = 0) -> None:
94 Iterate through all SpecialPhrases extracted from the
95 loader and import them into the database.
97 If should_replace is set to True only the loaded phrases
98 will be kept into the database. All other phrases already
99 in the database will be removed.
101 LOG.warning('Special phrases importation starting')
102 self._fetch_existing_place_classtype_tables()
104 # Store pairs of class/type for further processing
105 class_type_pairs = set()
107 for phrase in self.sp_loader.generate_phrases():
108 result = self._process_phrase(phrase)
110 class_type_pairs.add(result)
112 self._create_classtype_table_and_indexes(class_type_pairs, min)
114 self._remove_non_existent_tables_from_db()
116 self.db_connection.commit()
118 with tokenizer.name_analyzer() as analyzer:
119 analyzer.update_special_phrases(self.word_phrases, should_replace)
121 LOG.warning('Import done.')
122 self.statistics_handler.notify_import_done()
124 def _fetch_existing_place_classtype_tables(self) -> None:
126 Fetch existing place_classtype tables.
127 Fill the table_phrases_to_delete set of the class.
131 FROM information_schema.tables
132 WHERE table_schema='public'
133 AND table_name like 'place_classtype_%';
135 with self.db_connection.cursor() as db_cursor:
136 db_cursor.execute(SQL(query))
137 for row in db_cursor:
138 self.table_phrases_to_delete.add(row[0])
140 def _load_white_and_black_lists(self) \
141 -> Tuple[Mapping[str, Sequence[str]], Mapping[str, Sequence[str]]]:
143 Load white and black lists from phrases-settings.json.
145 settings = self.config.load_sub_configuration('phrase-settings.json')
147 return settings['blackList'], settings['whiteList']
149 def _check_sanity(self, phrase: SpecialPhrase) -> bool:
151 Check sanity of given inputs in case somebody added garbage in the wiki.
152 If a bad class/type is detected the system will exit with an error.
154 class_matchs = self.sanity_check_pattern.findall(phrase.p_class)
155 type_matchs = self.sanity_check_pattern.findall(phrase.p_type)
157 if not class_matchs or not type_matchs:
158 LOG.warning("Bad class/type: %s=%s. It will not be imported",
159 phrase.p_class, phrase.p_type)
163 def _process_phrase(self, phrase: SpecialPhrase) -> Optional[Tuple[str, str]]:
165 Processes the given phrase by checking black and white list
167 Return the class/type pair corresponding to the phrase.
170 # blacklisting: disallow certain class/type combinations
171 if phrase.p_class in self.black_list.keys() \
172 and phrase.p_type in self.black_list[phrase.p_class]:
175 # whitelisting: if class is in whitelist, allow only tags in the list
176 if phrase.p_class in self.white_list.keys() \
177 and phrase.p_type not in self.white_list[phrase.p_class]:
180 # sanity check, in case somebody added garbage in the wiki
181 if not self._check_sanity(phrase):
182 self.statistics_handler.notify_one_phrase_invalid()
185 self.word_phrases.add((phrase.p_label, phrase.p_class,
186 phrase.p_type, phrase.p_operator))
188 return (phrase.p_class, phrase.p_type)
190 def _create_classtype_table_and_indexes(self,
191 class_type_pairs: Iterable[Tuple[str, str]],
192 min: int = 0) -> None:
194 Create table place_classtype for each given pair.
195 Also create indexes on place_id and centroid.
197 LOG.warning('Create tables and indexes...')
199 sql_tablespace = self.config.TABLESPACE_AUX_DATA
201 sql_tablespace = ' TABLESPACE ' + sql_tablespace
203 with self.db_connection.cursor() as db_cursor:
204 db_cursor.execute("CREATE INDEX idx_placex_classtype ON placex (class, type)")
207 allowed_special_phrases = self.get_classtype_pairs(min)
209 for pair in class_type_pairs:
210 phrase_class = pair[0]
211 phrase_type = pair[1]
213 # Will only filter if min is not 0
214 if min and (phrase_class, phrase_type) not in allowed_special_phrases:
215 LOG.warning("Skipping phrase %s=%s: not in allowed special phrases",
216 phrase_class, phrase_type)
219 table_name = _classtype_table(phrase_class, phrase_type)
221 if table_name in self.table_phrases_to_delete:
222 self.statistics_handler.notify_one_table_ignored()
223 # Remove this table from the ones to delete as it match a
224 # class/type still existing on the special phrases of the wiki.
225 self.table_phrases_to_delete.remove(table_name)
226 # So don't need to create the table and indexes.
230 self._create_place_classtype_table(sql_tablespace, phrase_class, phrase_type)
233 self._create_place_classtype_indexes(sql_tablespace, phrase_class, phrase_type)
235 # Grant access on read to the web user.
236 self._grant_access_to_webuser(phrase_class, phrase_type)
238 self.statistics_handler.notify_one_table_created()
240 with self.db_connection.cursor() as db_cursor:
241 db_cursor.execute("DROP INDEX idx_placex_classtype")
243 def _create_place_classtype_table(self, sql_tablespace: str,
244 phrase_class: str, phrase_type: str) -> None:
246 Create table place_classtype of the given phrase_class/phrase_type
249 table_name = _classtype_table(phrase_class, phrase_type)
250 with self.db_connection.cursor() as cur:
251 cur.execute(SQL("""CREATE TABLE IF NOT EXISTS {} {} AS
252 SELECT place_id AS place_id,
253 st_centroid(geometry) AS centroid
255 WHERE class = %s AND type = %s
256 """).format(Identifier(table_name), SQL(sql_tablespace)),
257 (phrase_class, phrase_type))
259 def _create_place_classtype_indexes(self, sql_tablespace: str,
260 phrase_class: str, phrase_type: str) -> None:
262 Create indexes on centroid and place_id for the place_classtype table.
264 index_prefix = f'idx_place_classtype_{phrase_class}_{phrase_type}_'
265 base_table = _classtype_table(phrase_class, phrase_type)
267 if not index_exists(self.db_connection, index_prefix + 'centroid'):
268 with self.db_connection.cursor() as db_cursor:
269 db_cursor.execute(SQL("CREATE INDEX {} ON {} USING GIST (centroid) {}")
270 .format(Identifier(index_prefix + 'centroid'),
271 Identifier(base_table),
272 SQL(sql_tablespace)))
275 if not index_exists(self.db_connection, index_prefix + 'place_id'):
276 with self.db_connection.cursor() as db_cursor:
277 db_cursor.execute(SQL("CREATE INDEX {} ON {} USING btree(place_id) {}")
278 .format(Identifier(index_prefix + 'place_id'),
279 Identifier(base_table),
280 SQL(sql_tablespace)))
282 def _grant_access_to_webuser(self, phrase_class: str, phrase_type: str) -> None:
284 Grant access on read to the table place_classtype for the webuser.
286 table_name = _classtype_table(phrase_class, phrase_type)
287 with self.db_connection.cursor() as db_cursor:
288 db_cursor.execute(SQL("""GRANT SELECT ON {} TO {}""")
289 .format(Identifier(table_name),
290 Identifier(self.config.DATABASE_WEBUSER)))
292 def _remove_non_existent_tables_from_db(self) -> None:
294 Remove special phrases which doesn't exist on the wiki anymore.
295 Delete the place_classtype tables.
297 LOG.warning('Cleaning database...')
299 # Delete place_classtype tables corresponding to class/type which
300 # are not on the wiki anymore.
301 drop_tables(self.db_connection, *self.table_phrases_to_delete)
302 for _ in self.table_phrases_to_delete:
303 self.statistics_handler.notify_one_table_deleted()