]> git.openstreetmap.org Git - nominatim.git/blob - src/nominatim_db/tools/special_phrases/sp_importer.py
Refactored min and associated tests to follow greater than or equal to logic, so...
[nominatim.git] / src / nominatim_db / tools / special_phrases / sp_importer.py
1 # SPDX-License-Identifier: GPL-3.0-or-later
2 #
3 # This file is part of Nominatim. (https://nominatim.org)
4 #
5 # Copyright (C) 2024 by the Nominatim developer community.
6 # For a full list of authors see the git log.
7 """
8     Module containing the class handling the import
9     of the special phrases.
10
11     Phrases are analyzed and imported into the database.
12
13     The phrases already present in the database which are not
14     valids anymore are removed.
15 """
16 from typing import Iterable, Tuple, Mapping, Sequence, Optional, Set
17 import logging
18 import re
19 from psycopg.sql import Identifier, SQL
20
21 from ...typing import Protocol
22 from ...config import Configuration
23 from ...db.connection import Connection, drop_tables, index_exists
24 from .importer_statistics import SpecialPhrasesImporterStatistics
25 from .special_phrase import SpecialPhrase
26 from ...tokenizer.base import AbstractTokenizer
27
28 LOG = logging.getLogger()
29
30
31 def _classtype_table(phrase_class: str, phrase_type: str) -> str:
32     """ Return the name of the table for the given class and type.
33     """
34     return f'place_classtype_{phrase_class}_{phrase_type}'
35
36
37 class SpecialPhraseLoader(Protocol):
38     """ Protocol for classes implementing a loader for special phrases.
39     """
40
41     def generate_phrases(self) -> Iterable[SpecialPhrase]:
42         """ Generates all special phrase terms this loader can produce.
43         """
44
45
46 class SPImporter():
47     """
48         Class handling the process of special phrases importation into the database.
49
50         Take a sp loader which load the phrases from an external source.
51     """
52     def __init__(self, config: Configuration, conn: Connection,
53                  sp_loader: SpecialPhraseLoader) -> None:
54         self.config = config
55         self.db_connection = conn
56         self.sp_loader = sp_loader
57         self.statistics_handler = SpecialPhrasesImporterStatistics()
58         self.black_list, self.white_list = self._load_white_and_black_lists()
59         self.sanity_check_pattern = re.compile(r'^\w+$')
60         # This set will contain all existing phrases to be added.
61         # It contains tuples with the following format: (label, class, type, operator)
62         self.word_phrases: Set[Tuple[str, str, str, str]] = set()
63         # This set will contain all existing place_classtype tables which doesn't match any
64         # special phrases class/type on the wiki.
65         self.table_phrases_to_delete: Set[str] = set()
66
67     def get_classtype_pairs(self, min: int = 0) -> Set[Tuple[str, str]]:
68         """
69             Returns list of allowed special phrases from the database,
70             restricting to a list of combinations of classes and types
71             which occur equal to or more than a specified amount of times.
72
73             Default value for this is 0, which allows everything in database.
74         """
75         db_combinations = set()
76
77         query = f"""
78         SELECT class AS CLS, type AS typ
79         FROM placex
80         GROUP BY class, type
81         HAVING COUNT(*) >= {min}
82         """
83
84         with self.db_connection.cursor() as db_cursor:
85             db_cursor.execute(SQL(query))
86             for row in db_cursor:
87                 db_combinations.add((row[0], row[1]))
88
89         return db_combinations
90
91     def import_phrases(self, tokenizer: AbstractTokenizer, should_replace: bool, min: int) -> None:
92         """
93             Iterate through all SpecialPhrases extracted from the
94             loader and import them into the database.
95
96             If should_replace is set to True only the loaded phrases
97             will be kept into the database. All other phrases already
98             in the database will be removed.
99         """
100         LOG.warning('Special phrases importation starting')
101         self._fetch_existing_place_classtype_tables()
102
103         # Store pairs of class/type for further processing
104         class_type_pairs = set()
105
106         for phrase in self.sp_loader.generate_phrases():
107             result = self._process_phrase(phrase)
108             if result:
109                 class_type_pairs.add(result)
110
111         self._create_classtype_table_and_indexes(class_type_pairs, min)
112         if should_replace:
113             self._remove_non_existent_tables_from_db()
114
115         self.db_connection.commit()
116
117         with tokenizer.name_analyzer() as analyzer:
118             analyzer.update_special_phrases(self.word_phrases, should_replace)
119
120         LOG.warning('Import done.')
121         self.statistics_handler.notify_import_done()
122
123     def _fetch_existing_place_classtype_tables(self) -> None:
124         """
125             Fetch existing place_classtype tables.
126             Fill the table_phrases_to_delete set of the class.
127         """
128         query = """
129             SELECT table_name
130             FROM information_schema.tables
131             WHERE table_schema='public'
132             AND table_name like 'place_classtype_%';
133         """
134         with self.db_connection.cursor() as db_cursor:
135             db_cursor.execute(SQL(query))
136             for row in db_cursor:
137                 self.table_phrases_to_delete.add(row[0])
138
139     def _load_white_and_black_lists(self) \
140             -> Tuple[Mapping[str, Sequence[str]], Mapping[str, Sequence[str]]]:
141         """
142             Load white and black lists from phrases-settings.json.
143         """
144         settings = self.config.load_sub_configuration('phrase-settings.json')
145
146         return settings['blackList'], settings['whiteList']
147
148     def _check_sanity(self, phrase: SpecialPhrase) -> bool:
149         """
150             Check sanity of given inputs in case somebody added garbage in the wiki.
151             If a bad class/type is detected the system will exit with an error.
152         """
153         class_matchs = self.sanity_check_pattern.findall(phrase.p_class)
154         type_matchs = self.sanity_check_pattern.findall(phrase.p_type)
155
156         if not class_matchs or not type_matchs:
157             LOG.warning("Bad class/type: %s=%s. It will not be imported",
158                         phrase.p_class, phrase.p_type)
159             return False
160         return True
161
162     def _process_phrase(self, phrase: SpecialPhrase) -> Optional[Tuple[str, str]]:
163         """
164             Processes the given phrase by checking black and white list
165             and sanity.
166             Return the class/type pair corresponding to the phrase.
167         """
168
169         # blacklisting: disallow certain class/type combinations
170         if phrase.p_class in self.black_list.keys() \
171            and phrase.p_type in self.black_list[phrase.p_class]:
172             return None
173
174         # whitelisting: if class is in whitelist, allow only tags in the list
175         if phrase.p_class in self.white_list.keys() \
176            and phrase.p_type not in self.white_list[phrase.p_class]:
177             return None
178
179         # sanity check, in case somebody added garbage in the wiki
180         if not self._check_sanity(phrase):
181             self.statistics_handler.notify_one_phrase_invalid()
182             return None
183
184         self.word_phrases.add((phrase.p_label, phrase.p_class,
185                                phrase.p_type, phrase.p_operator))
186
187         return (phrase.p_class, phrase.p_type)
188
189     def _create_classtype_table_and_indexes(self,
190                                             class_type_pairs: Iterable[Tuple[str, str]],
191                                             min: int) -> None:
192         """
193             Create table place_classtype for each given pair.
194             Also create indexes on place_id and centroid.
195         """
196         LOG.warning('Create tables and indexes...')
197
198         sql_tablespace = self.config.TABLESPACE_AUX_DATA
199         if sql_tablespace:
200             sql_tablespace = ' TABLESPACE ' + sql_tablespace
201
202         with self.db_connection.cursor() as db_cursor:
203             db_cursor.execute("CREATE INDEX idx_placex_classtype ON placex (class, type)")
204
205         allowed_special_phrases = self.get_classtype_pairs(min)
206
207         for pair in class_type_pairs:
208             phrase_class = pair[0]
209             phrase_type = pair[1]
210
211             # Will only filter if min is not 0
212             if min and (phrase_class, phrase_type) not in allowed_special_phrases:
213                 LOG.warning("Skipping phrase %s=%s: not in allowed special phrases",
214                             phrase_class, phrase_type)
215                 continue
216
217             table_name = _classtype_table(phrase_class, phrase_type)
218
219             if table_name in self.table_phrases_to_delete:
220                 self.statistics_handler.notify_one_table_ignored()
221                 # Remove this table from the ones to delete as it match a
222                 # class/type still existing on the special phrases of the wiki.
223                 self.table_phrases_to_delete.remove(table_name)
224                 # So don't need to create the table and indexes.
225                 continue
226
227             # Table creation
228             self._create_place_classtype_table(sql_tablespace, phrase_class, phrase_type)
229
230             # Indexes creation
231             self._create_place_classtype_indexes(sql_tablespace, phrase_class, phrase_type)
232
233             # Grant access on read to the web user.
234             self._grant_access_to_webuser(phrase_class, phrase_type)
235
236             self.statistics_handler.notify_one_table_created()
237
238         with self.db_connection.cursor() as db_cursor:
239             db_cursor.execute("DROP INDEX idx_placex_classtype")
240
241     def _create_place_classtype_table(self, sql_tablespace: str,
242                                       phrase_class: str, phrase_type: str) -> None:
243         """
244             Create table place_classtype of the given phrase_class/phrase_type
245             if doesn't exit.
246         """
247         table_name = _classtype_table(phrase_class, phrase_type)
248         with self.db_connection.cursor() as cur:
249             cur.execute(SQL("""CREATE TABLE IF NOT EXISTS {} {} AS
250                                  SELECT place_id AS place_id,
251                                         st_centroid(geometry) AS centroid
252                                  FROM placex
253                                  WHERE class = %s AND type = %s
254                              """).format(Identifier(table_name), SQL(sql_tablespace)),
255                         (phrase_class, phrase_type))
256
257     def _create_place_classtype_indexes(self, sql_tablespace: str,
258                                         phrase_class: str, phrase_type: str) -> None:
259         """
260             Create indexes on centroid and place_id for the place_classtype table.
261         """
262         index_prefix = f'idx_place_classtype_{phrase_class}_{phrase_type}_'
263         base_table = _classtype_table(phrase_class, phrase_type)
264         # Index on centroid
265         if not index_exists(self.db_connection, index_prefix + 'centroid'):
266             with self.db_connection.cursor() as db_cursor:
267                 db_cursor.execute(SQL("CREATE INDEX {} ON {} USING GIST (centroid) {}")
268                                   .format(Identifier(index_prefix + 'centroid'),
269                                           Identifier(base_table),
270                                           SQL(sql_tablespace)))
271
272         # Index on place_id
273         if not index_exists(self.db_connection, index_prefix + 'place_id'):
274             with self.db_connection.cursor() as db_cursor:
275                 db_cursor.execute(SQL("CREATE INDEX {} ON {} USING btree(place_id) {}")
276                                   .format(Identifier(index_prefix + 'place_id'),
277                                           Identifier(base_table),
278                                           SQL(sql_tablespace)))
279
280     def _grant_access_to_webuser(self, phrase_class: str, phrase_type: str) -> None:
281         """
282             Grant access on read to the table place_classtype for the webuser.
283         """
284         table_name = _classtype_table(phrase_class, phrase_type)
285         with self.db_connection.cursor() as db_cursor:
286             db_cursor.execute(SQL("""GRANT SELECT ON {} TO {}""")
287                               .format(Identifier(table_name),
288                                       Identifier(self.config.DATABASE_WEBUSER)))
289
290     def _remove_non_existent_tables_from_db(self) -> None:
291         """
292             Remove special phrases which doesn't exist on the wiki anymore.
293             Delete the place_classtype tables.
294         """
295         LOG.warning('Cleaning database...')
296
297         # Delete place_classtype tables corresponding to class/type which
298         # are not on the wiki anymore.
299         drop_tables(self.db_connection, *self.table_phrases_to_delete)
300         for _ in self.table_phrases_to_delete:
301             self.statistics_handler.notify_one_table_deleted()