]> git.openstreetmap.org Git - nominatim.git/blob - src/nominatim_db/tools/special_phrases/sp_importer.py
Added testing to test get classtype pairs in import special phrases
[nominatim.git] / src / nominatim_db / tools / special_phrases / sp_importer.py
1 # SPDX-License-Identifier: GPL-3.0-or-later
2 #
3 # This file is part of Nominatim. (https://nominatim.org)
4 #
5 # Copyright (C) 2024 by the Nominatim developer community.
6 # For a full list of authors see the git log.
7 """
8     Module containing the class handling the import
9     of the special phrases.
10
11     Phrases are analyzed and imported into the database.
12
13     The phrases already present in the database which are not
14     valids anymore are removed.
15 """
16 from typing import Iterable, Tuple, Mapping, Sequence, Optional, Set
17 import logging
18 import re
19 from psycopg.sql import Identifier, SQL
20
21 from ...typing import Protocol
22 from ...config import Configuration
23 from ...db.connection import Connection, drop_tables, index_exists
24 from .importer_statistics import SpecialPhrasesImporterStatistics
25 from .special_phrase import SpecialPhrase
26 from ...tokenizer.base import AbstractTokenizer
27
28 LOG = logging.getLogger()
29
30
31 def _classtype_table(phrase_class: str, phrase_type: str) -> str:
32     """ Return the name of the table for the given class and type.
33     """
34     return f'place_classtype_{phrase_class}_{phrase_type}'
35
36
37 class SpecialPhraseLoader(Protocol):
38     """ Protocol for classes implementing a loader for special phrases.
39     """
40
41     def generate_phrases(self) -> Iterable[SpecialPhrase]:
42         """ Generates all special phrase terms this loader can produce.
43         """
44
45
46 class SPImporter():
47     """
48         Class handling the process of special phrases importation into the database.
49
50         Take a sp loader which load the phrases from an external source.
51     """
52     def __init__(self, config: Configuration, conn: Connection,
53                  sp_loader: SpecialPhraseLoader) -> None:
54         self.config = config
55         self.db_connection = conn
56         self.sp_loader = sp_loader
57         self.statistics_handler = SpecialPhrasesImporterStatistics()
58         self.black_list, self.white_list = self._load_white_and_black_lists()
59         self.sanity_check_pattern = re.compile(r'^\w+$')
60         # This set will contain all existing phrases to be added.
61         # It contains tuples with the following format: (label, class, type, operator)
62         self.word_phrases: Set[Tuple[str, str, str, str]] = set()
63         # This set will contain all existing place_classtype tables which doesn't match any
64         # special phrases class/type on the wiki.
65         self.table_phrases_to_delete: Set[str] = set()
66
67     def get_classtype_pairs(self, min: int = 0) -> Set[Tuple[str, str]]:
68         """
69             Returns list of allowed special phrases from the database,
70             restricting to a list of combinations of classes and types
71             which occur more than a specified amount of times.
72
73             Default value for this, if not specified, is at least once.
74         """
75         db_combinations = set()
76         query = f"""
77         SELECT class AS CLS, type AS typ
78         FROM placex
79         GROUP BY class, type
80         HAVING COUNT(*) > {min}
81         """
82
83         with self.db_connection.cursor() as db_cursor:
84             db_cursor.execute(SQL(query))
85             for row in db_cursor:
86                 db_combinations.add((row[0], row[1]))
87
88         return db_combinations
89
90     def import_phrases(self, tokenizer: AbstractTokenizer, should_replace: bool) -> None:
91         """
92             Iterate through all SpecialPhrases extracted from the
93             loader and import them into the database.
94
95             If should_replace is set to True only the loaded phrases
96             will be kept into the database. All other phrases already
97             in the database will be removed.
98         """
99         LOG.warning('Special phrases importation starting')
100         self._fetch_existing_place_classtype_tables()
101
102         # Store pairs of class/type for further processing
103         class_type_pairs = set()
104
105         for phrase in self.sp_loader.generate_phrases():
106             result = self._process_phrase(phrase)
107             if result:
108                 class_type_pairs.add(result)
109
110         self._create_classtype_table_and_indexes(class_type_pairs)
111         if should_replace:
112             self._remove_non_existent_tables_from_db()
113
114         self.db_connection.commit()
115
116         with tokenizer.name_analyzer() as analyzer:
117             analyzer.update_special_phrases(self.word_phrases, should_replace)
118
119         LOG.warning('Import done.')
120         self.statistics_handler.notify_import_done()
121
122     def _fetch_existing_place_classtype_tables(self) -> None:
123         """
124             Fetch existing place_classtype tables.
125             Fill the table_phrases_to_delete set of the class.
126         """
127         query = """
128             SELECT table_name
129             FROM information_schema.tables
130             WHERE table_schema='public'
131             AND table_name like 'place_classtype_%';
132         """
133         with self.db_connection.cursor() as db_cursor:
134             db_cursor.execute(SQL(query))
135             for row in db_cursor:
136                 self.table_phrases_to_delete.add(row[0])
137
138     def _load_white_and_black_lists(self) \
139             -> Tuple[Mapping[str, Sequence[str]], Mapping[str, Sequence[str]]]:
140         """
141             Load white and black lists from phrases-settings.json.
142         """
143         settings = self.config.load_sub_configuration('phrase-settings.json')
144
145         return settings['blackList'], settings['whiteList']
146
147     def _check_sanity(self, phrase: SpecialPhrase) -> bool:
148         """
149             Check sanity of given inputs in case somebody added garbage in the wiki.
150             If a bad class/type is detected the system will exit with an error.
151         """
152         class_matchs = self.sanity_check_pattern.findall(phrase.p_class)
153         type_matchs = self.sanity_check_pattern.findall(phrase.p_type)
154
155         if not class_matchs or not type_matchs:
156             LOG.warning("Bad class/type: %s=%s. It will not be imported",
157                         phrase.p_class, phrase.p_type)
158             return False
159         return True
160
161     def _process_phrase(self, phrase: SpecialPhrase) -> Optional[Tuple[str, str]]:
162         """
163             Processes the given phrase by checking black and white list
164             and sanity.
165             Return the class/type pair corresponding to the phrase.
166         """
167
168         # blacklisting: disallow certain class/type combinations
169         if phrase.p_class in self.black_list.keys() \
170            and phrase.p_type in self.black_list[phrase.p_class]:
171             return None
172
173         # whitelisting: if class is in whitelist, allow only tags in the list
174         if phrase.p_class in self.white_list.keys() \
175            and phrase.p_type not in self.white_list[phrase.p_class]:
176             return None
177
178         # sanity check, in case somebody added garbage in the wiki
179         if not self._check_sanity(phrase):
180             self.statistics_handler.notify_one_phrase_invalid()
181             return None
182
183         self.word_phrases.add((phrase.p_label, phrase.p_class,
184                                phrase.p_type, phrase.p_operator))
185
186         return (phrase.p_class, phrase.p_type)
187
188     def _create_classtype_table_and_indexes(self,
189                                             class_type_pairs: Iterable[Tuple[str, str]]) -> None:
190         """
191             Create table place_classtype for each given pair.
192             Also create indexes on place_id and centroid.
193         """
194         LOG.warning('Create tables and indexes...')
195
196         sql_tablespace = self.config.TABLESPACE_AUX_DATA
197         if sql_tablespace:
198             sql_tablespace = ' TABLESPACE ' + sql_tablespace
199
200         with self.db_connection.cursor() as db_cursor:
201             db_cursor.execute("CREATE INDEX idx_placex_classtype ON placex (class, type)")
202
203         allowed_special_phrases = self.get_classtype_pairs()
204
205         for pair in class_type_pairs:
206             phrase_class = pair[0]
207             phrase_type = pair[1]
208
209             if (phrase_class, phrase_type) not in allowed_special_phrases:
210                 LOG.warning("Skipping phrase %s=%s: not in allowed special phrases",
211                             phrase_class, phrase_type)
212                 continue
213
214             table_name = _classtype_table(phrase_class, phrase_type)
215
216             if table_name in self.table_phrases_to_delete:
217                 self.statistics_handler.notify_one_table_ignored()
218                 # Remove this table from the ones to delete as it match a
219                 # class/type still existing on the special phrases of the wiki.
220                 self.table_phrases_to_delete.remove(table_name)
221                 # So don't need to create the table and indexes.
222                 continue
223
224             # Table creation
225             self._create_place_classtype_table(sql_tablespace, phrase_class, phrase_type)
226
227             # Indexes creation
228             self._create_place_classtype_indexes(sql_tablespace, phrase_class, phrase_type)
229
230             # Grant access on read to the web user.
231             self._grant_access_to_webuser(phrase_class, phrase_type)
232
233             self.statistics_handler.notify_one_table_created()
234
235         with self.db_connection.cursor() as db_cursor:
236             db_cursor.execute("DROP INDEX idx_placex_classtype")
237
238     def _create_place_classtype_table(self, sql_tablespace: str,
239                                       phrase_class: str, phrase_type: str) -> None:
240         """
241             Create table place_classtype of the given phrase_class/phrase_type
242             if doesn't exit.
243         """
244         table_name = _classtype_table(phrase_class, phrase_type)
245         with self.db_connection.cursor() as cur:
246             cur.execute(SQL("""CREATE TABLE IF NOT EXISTS {} {} AS
247                                  SELECT place_id AS place_id,
248                                         st_centroid(geometry) AS centroid
249                                  FROM placex
250                                  WHERE class = %s AND type = %s
251                              """).format(Identifier(table_name), SQL(sql_tablespace)),
252                         (phrase_class, phrase_type))
253
254     def _create_place_classtype_indexes(self, sql_tablespace: str,
255                                         phrase_class: str, phrase_type: str) -> None:
256         """
257             Create indexes on centroid and place_id for the place_classtype table.
258         """
259         index_prefix = f'idx_place_classtype_{phrase_class}_{phrase_type}_'
260         base_table = _classtype_table(phrase_class, phrase_type)
261         # Index on centroid
262         if not index_exists(self.db_connection, index_prefix + 'centroid'):
263             with self.db_connection.cursor() as db_cursor:
264                 db_cursor.execute(SQL("CREATE INDEX {} ON {} USING GIST (centroid) {}")
265                                   .format(Identifier(index_prefix + 'centroid'),
266                                           Identifier(base_table),
267                                           SQL(sql_tablespace)))
268
269         # Index on place_id
270         if not index_exists(self.db_connection, index_prefix + 'place_id'):
271             with self.db_connection.cursor() as db_cursor:
272                 db_cursor.execute(SQL("CREATE INDEX {} ON {} USING btree(place_id) {}")
273                                   .format(Identifier(index_prefix + 'place_id'),
274                                           Identifier(base_table),
275                                           SQL(sql_tablespace)))
276
277     def _grant_access_to_webuser(self, phrase_class: str, phrase_type: str) -> None:
278         """
279             Grant access on read to the table place_classtype for the webuser.
280         """
281         table_name = _classtype_table(phrase_class, phrase_type)
282         with self.db_connection.cursor() as db_cursor:
283             db_cursor.execute(SQL("""GRANT SELECT ON {} TO {}""")
284                               .format(Identifier(table_name),
285                                       Identifier(self.config.DATABASE_WEBUSER)))
286
287     def _remove_non_existent_tables_from_db(self) -> None:
288         """
289             Remove special phrases which doesn't exist on the wiki anymore.
290             Delete the place_classtype tables.
291         """
292         LOG.warning('Cleaning database...')
293
294         # Delete place_classtype tables corresponding to class/type which
295         # are not on the wiki anymore.
296         drop_tables(self.db_connection, *self.table_phrases_to_delete)
297         for _ in self.table_phrases_to_delete:
298             self.statistics_handler.notify_one_table_deleted()