]> git.openstreetmap.org Git - nominatim.git/blob - src/nominatim_db/tools/special_phrases/sp_importer.py
release 5.1.0.post12
[nominatim.git] / src / nominatim_db / tools / special_phrases / sp_importer.py
1 # SPDX-License-Identifier: GPL-3.0-or-later
2 #
3 # This file is part of Nominatim. (https://nominatim.org)
4 #
5 # Copyright (C) 2024 by the Nominatim developer community.
6 # For a full list of authors see the git log.
7 """
8     Module containing the class handling the import
9     of the special phrases.
10
11     Phrases are analyzed and imported into the database.
12
13     The phrases already present in the database which are not
14     valids anymore are removed.
15 """
16 from typing import Iterable, Tuple, Mapping, Sequence, Optional, Set
17 import logging
18 import re
19 from psycopg.sql import Identifier, SQL
20
21 from ...typing import Protocol
22 from ...config import Configuration
23 from ...db.connection import Connection, drop_tables, index_exists
24 from .importer_statistics import SpecialPhrasesImporterStatistics
25 from .special_phrase import SpecialPhrase
26 from ...tokenizer.base import AbstractTokenizer
27
28 LOG = logging.getLogger()
29
30
31 def _classtype_table(phrase_class: str, phrase_type: str) -> str:
32     """ Return the name of the table for the given class and type.
33     """
34     return f'place_classtype_{phrase_class}_{phrase_type}'
35
36
37 class SpecialPhraseLoader(Protocol):
38     """ Protocol for classes implementing a loader for special phrases.
39     """
40
41     def generate_phrases(self) -> Iterable[SpecialPhrase]:
42         """ Generates all special phrase terms this loader can produce.
43         """
44
45
46 class SPImporter():
47     """
48         Class handling the process of special phrases importation into the database.
49
50         Take a sp loader which load the phrases from an external source.
51     """
52     def __init__(self, config: Configuration, conn: Connection,
53                  sp_loader: SpecialPhraseLoader) -> None:
54         self.config = config
55         self.db_connection = conn
56         self.sp_loader = sp_loader
57         self.statistics_handler = SpecialPhrasesImporterStatistics()
58         self.black_list, self.white_list = self._load_white_and_black_lists()
59         self.sanity_check_pattern = re.compile(r'^\w+$')
60         # This set will contain all existing phrases to be added.
61         # It contains tuples with the following format: (label, class, type, operator)
62         self.word_phrases: Set[Tuple[str, str, str, str]] = set()
63         # This set will contain all existing place_classtype tables which doesn't match any
64         # special phrases class/type on the wiki.
65         self.table_phrases_to_delete: Set[str] = set()
66
67     def get_classtype_pairs(self, min: int = 0) -> Set[Tuple[str, str]]:
68         """
69             Returns list of allowed special phrases from the database,
70             restricting to a list of combinations of classes and types
71             which occur equal to or more than a specified amount of times.
72
73             Default value for this is 0, which allows everything in database.
74         """
75         db_combinations = set()
76
77         query = f"""
78         SELECT class AS CLS, type AS typ
79         FROM placex
80         GROUP BY class, type
81         HAVING COUNT(*) >= {min}
82         """
83
84         with self.db_connection.cursor() as db_cursor:
85             db_cursor.execute(SQL(query))
86             for row in db_cursor:
87                 db_combinations.add((row[0], row[1]))
88
89         return db_combinations
90
91     def import_phrases(self, tokenizer: AbstractTokenizer, should_replace: bool,
92                        min: int = 0) -> None:
93         """
94             Iterate through all SpecialPhrases extracted from the
95             loader and import them into the database.
96
97             If should_replace is set to True only the loaded phrases
98             will be kept into the database. All other phrases already
99             in the database will be removed.
100         """
101         LOG.warning('Special phrases importation starting')
102         self._fetch_existing_place_classtype_tables()
103
104         # Store pairs of class/type for further processing
105         class_type_pairs = set()
106
107         for phrase in self.sp_loader.generate_phrases():
108             result = self._process_phrase(phrase)
109             if result:
110                 class_type_pairs.add(result)
111
112         self._create_classtype_table_and_indexes(class_type_pairs, min)
113         if should_replace:
114             self._remove_non_existent_tables_from_db()
115
116         self.db_connection.commit()
117
118         with tokenizer.name_analyzer() as analyzer:
119             analyzer.update_special_phrases(self.word_phrases, should_replace)
120
121         LOG.warning('Import done.')
122         self.statistics_handler.notify_import_done()
123
124     def _fetch_existing_place_classtype_tables(self) -> None:
125         """
126             Fetch existing place_classtype tables.
127             Fill the table_phrases_to_delete set of the class.
128         """
129         query = """
130             SELECT table_name
131             FROM information_schema.tables
132             WHERE table_schema='public'
133             AND table_name like 'place_classtype_%';
134         """
135         with self.db_connection.cursor() as db_cursor:
136             db_cursor.execute(SQL(query))
137             for row in db_cursor:
138                 self.table_phrases_to_delete.add(row[0])
139
140     def _load_white_and_black_lists(self) \
141             -> Tuple[Mapping[str, Sequence[str]], Mapping[str, Sequence[str]]]:
142         """
143             Load white and black lists from phrases-settings.json.
144         """
145         settings = self.config.load_sub_configuration('phrase-settings.json')
146
147         return settings['blackList'], settings['whiteList']
148
149     def _check_sanity(self, phrase: SpecialPhrase) -> bool:
150         """
151             Check sanity of given inputs in case somebody added garbage in the wiki.
152             If a bad class/type is detected the system will exit with an error.
153         """
154         class_matchs = self.sanity_check_pattern.findall(phrase.p_class)
155         type_matchs = self.sanity_check_pattern.findall(phrase.p_type)
156
157         if not class_matchs or not type_matchs:
158             LOG.warning("Bad class/type: %s=%s. It will not be imported",
159                         phrase.p_class, phrase.p_type)
160             return False
161         return True
162
163     def _process_phrase(self, phrase: SpecialPhrase) -> Optional[Tuple[str, str]]:
164         """
165             Processes the given phrase by checking black and white list
166             and sanity.
167             Return the class/type pair corresponding to the phrase.
168         """
169
170         # blacklisting: disallow certain class/type combinations
171         if phrase.p_class in self.black_list.keys() \
172            and phrase.p_type in self.black_list[phrase.p_class]:
173             return None
174
175         # whitelisting: if class is in whitelist, allow only tags in the list
176         if phrase.p_class in self.white_list.keys() \
177            and phrase.p_type not in self.white_list[phrase.p_class]:
178             return None
179
180         # sanity check, in case somebody added garbage in the wiki
181         if not self._check_sanity(phrase):
182             self.statistics_handler.notify_one_phrase_invalid()
183             return None
184
185         self.word_phrases.add((phrase.p_label, phrase.p_class,
186                                phrase.p_type, phrase.p_operator))
187
188         return (phrase.p_class, phrase.p_type)
189
190     def _create_classtype_table_and_indexes(self,
191                                             class_type_pairs: Iterable[Tuple[str, str]],
192                                             min: int = 0) -> None:
193         """
194             Create table place_classtype for each given pair.
195             Also create indexes on place_id and centroid.
196         """
197         LOG.warning('Create tables and indexes...')
198
199         sql_tablespace = self.config.TABLESPACE_AUX_DATA
200         if sql_tablespace:
201             sql_tablespace = ' TABLESPACE ' + sql_tablespace
202
203         with self.db_connection.cursor() as db_cursor:
204             db_cursor.execute("CREATE INDEX idx_placex_classtype ON placex (class, type)")
205
206         if min:
207             allowed_special_phrases = self.get_classtype_pairs(min)
208
209         for pair in class_type_pairs:
210             phrase_class = pair[0]
211             phrase_type = pair[1]
212
213             # Will only filter if min is not 0
214             if min and (phrase_class, phrase_type) not in allowed_special_phrases:
215                 LOG.warning("Skipping phrase %s=%s: not in allowed special phrases",
216                             phrase_class, phrase_type)
217                 continue
218
219             table_name = _classtype_table(phrase_class, phrase_type)
220
221             if table_name in self.table_phrases_to_delete:
222                 self.statistics_handler.notify_one_table_ignored()
223                 # Remove this table from the ones to delete as it match a
224                 # class/type still existing on the special phrases of the wiki.
225                 self.table_phrases_to_delete.remove(table_name)
226                 # So don't need to create the table and indexes.
227                 continue
228
229             # Table creation
230             self._create_place_classtype_table(sql_tablespace, phrase_class, phrase_type)
231
232             # Indexes creation
233             self._create_place_classtype_indexes(sql_tablespace, phrase_class, phrase_type)
234
235             # Grant access on read to the web user.
236             self._grant_access_to_webuser(phrase_class, phrase_type)
237
238             self.statistics_handler.notify_one_table_created()
239
240         with self.db_connection.cursor() as db_cursor:
241             db_cursor.execute("DROP INDEX idx_placex_classtype")
242
243     def _create_place_classtype_table(self, sql_tablespace: str,
244                                       phrase_class: str, phrase_type: str) -> None:
245         """
246             Create table place_classtype of the given phrase_class/phrase_type
247             if doesn't exit.
248         """
249         table_name = _classtype_table(phrase_class, phrase_type)
250         with self.db_connection.cursor() as cur:
251             cur.execute(SQL("""CREATE TABLE IF NOT EXISTS {} {} AS
252                                  SELECT place_id AS place_id,
253                                         st_centroid(geometry) AS centroid
254                                  FROM placex
255                                  WHERE class = %s AND type = %s
256                              """).format(Identifier(table_name), SQL(sql_tablespace)),
257                         (phrase_class, phrase_type))
258
259     def _create_place_classtype_indexes(self, sql_tablespace: str,
260                                         phrase_class: str, phrase_type: str) -> None:
261         """
262             Create indexes on centroid and place_id for the place_classtype table.
263         """
264         index_prefix = f'idx_place_classtype_{phrase_class}_{phrase_type}_'
265         base_table = _classtype_table(phrase_class, phrase_type)
266         # Index on centroid
267         if not index_exists(self.db_connection, index_prefix + 'centroid'):
268             with self.db_connection.cursor() as db_cursor:
269                 db_cursor.execute(SQL("CREATE INDEX {} ON {} USING GIST (centroid) {}")
270                                   .format(Identifier(index_prefix + 'centroid'),
271                                           Identifier(base_table),
272                                           SQL(sql_tablespace)))
273
274         # Index on place_id
275         if not index_exists(self.db_connection, index_prefix + 'place_id'):
276             with self.db_connection.cursor() as db_cursor:
277                 db_cursor.execute(SQL("CREATE INDEX {} ON {} USING btree(place_id) {}")
278                                   .format(Identifier(index_prefix + 'place_id'),
279                                           Identifier(base_table),
280                                           SQL(sql_tablespace)))
281
282     def _grant_access_to_webuser(self, phrase_class: str, phrase_type: str) -> None:
283         """
284             Grant access on read to the table place_classtype for the webuser.
285         """
286         table_name = _classtype_table(phrase_class, phrase_type)
287         with self.db_connection.cursor() as db_cursor:
288             db_cursor.execute(SQL("""GRANT SELECT ON {} TO {}""")
289                               .format(Identifier(table_name),
290                                       Identifier(self.config.DATABASE_WEBUSER)))
291
292     def _remove_non_existent_tables_from_db(self) -> None:
293         """
294             Remove special phrases which doesn't exist on the wiki anymore.
295             Delete the place_classtype tables.
296         """
297         LOG.warning('Cleaning database...')
298
299         # Delete place_classtype tables corresponding to class/type which
300         # are not on the wiki anymore.
301         drop_tables(self.db_connection, *self.table_phrases_to_delete)
302         for _ in self.table_phrases_to_delete:
303             self.statistics_handler.notify_one_table_deleted()