]> git.openstreetmap.org Git - nominatim.git/blob - src/nominatim_db/tools/special_phrases/sp_importer.py
Added default min = 0 argument for private functions
[nominatim.git] / src / nominatim_db / tools / special_phrases / sp_importer.py
1 # SPDX-License-Identifier: GPL-3.0-or-later
2 #
3 # This file is part of Nominatim. (https://nominatim.org)
4 #
5 # Copyright (C) 2024 by the Nominatim developer community.
6 # For a full list of authors see the git log.
7 """
8     Module containing the class handling the import
9     of the special phrases.
10
11     Phrases are analyzed and imported into the database.
12
13     The phrases already present in the database which are not
14     valids anymore are removed.
15 """
16 from typing import Iterable, Tuple, Mapping, Sequence, Optional, Set
17 import logging
18 import re
19 from psycopg.sql import Identifier, SQL
20
21 from ...typing import Protocol
22 from ...config import Configuration
23 from ...db.connection import Connection, drop_tables, index_exists
24 from .importer_statistics import SpecialPhrasesImporterStatistics
25 from .special_phrase import SpecialPhrase
26 from ...tokenizer.base import AbstractTokenizer
27
28 LOG = logging.getLogger()
29
30
31 def _classtype_table(phrase_class: str, phrase_type: str) -> str:
32     """ Return the name of the table for the given class and type.
33     """
34     return f'place_classtype_{phrase_class}_{phrase_type}'
35
36
37 class SpecialPhraseLoader(Protocol):
38     """ Protocol for classes implementing a loader for special phrases.
39     """
40
41     def generate_phrases(self) -> Iterable[SpecialPhrase]:
42         """ Generates all special phrase terms this loader can produce.
43         """
44
45
46 class SPImporter():
47     """
48         Class handling the process of special phrases importation into the database.
49
50         Take a sp loader which load the phrases from an external source.
51     """
52     def __init__(self, config: Configuration, conn: Connection,
53                  sp_loader: SpecialPhraseLoader) -> None:
54         self.config = config
55         self.db_connection = conn
56         self.sp_loader = sp_loader
57         self.statistics_handler = SpecialPhrasesImporterStatistics()
58         self.black_list, self.white_list = self._load_white_and_black_lists()
59         self.sanity_check_pattern = re.compile(r'^\w+$')
60         # This set will contain all existing phrases to be added.
61         # It contains tuples with the following format: (label, class, type, operator)
62         self.word_phrases: Set[Tuple[str, str, str, str]] = set()
63         # This set will contain all existing place_classtype tables which doesn't match any
64         # special phrases class/type on the wiki.
65         self.table_phrases_to_delete: Set[str] = set()
66
67     def get_classtype_pairs(self, min: int = 0) -> Set[Tuple[str, str]]:
68         """
69             Returns list of allowed special phrases from the database,
70             restricting to a list of combinations of classes and types
71             which occur equal to or more than a specified amount of times.
72
73             Default value for this is 0, which allows everything in database.
74         """
75         db_combinations = set()
76
77         query = f"""
78         SELECT class AS CLS, type AS typ
79         FROM placex
80         GROUP BY class, type
81         HAVING COUNT(*) >= {min}
82         """
83
84         with self.db_connection.cursor() as db_cursor:
85             db_cursor.execute(SQL(query))
86             for row in db_cursor:
87                 db_combinations.add((row[0], row[1]))
88
89         return db_combinations
90
91     def import_phrases(self, tokenizer: AbstractTokenizer, should_replace: bool,
92                        min: int = 0) -> None:
93         """
94             Iterate through all SpecialPhrases extracted from the
95             loader and import them into the database.
96
97             If should_replace is set to True only the loaded phrases
98             will be kept into the database. All other phrases already
99             in the database will be removed.
100         """
101         LOG.warning('Special phrases importation starting')
102         self._fetch_existing_place_classtype_tables()
103
104         # Store pairs of class/type for further processing
105         class_type_pairs = set()
106
107         for phrase in self.sp_loader.generate_phrases():
108             result = self._process_phrase(phrase)
109             if result:
110                 class_type_pairs.add(result)
111
112         self._create_classtype_table_and_indexes(class_type_pairs, min)
113         if should_replace:
114             self._remove_non_existent_tables_from_db()
115
116         self.db_connection.commit()
117
118         with tokenizer.name_analyzer() as analyzer:
119             analyzer.update_special_phrases(self.word_phrases, should_replace)
120
121         LOG.warning('Import done.')
122         self.statistics_handler.notify_import_done()
123
124     def _fetch_existing_place_classtype_tables(self) -> None:
125         """
126             Fetch existing place_classtype tables.
127             Fill the table_phrases_to_delete set of the class.
128         """
129         query = """
130             SELECT table_name
131             FROM information_schema.tables
132             WHERE table_schema='public'
133             AND table_name like 'place_classtype_%';
134         """
135         with self.db_connection.cursor() as db_cursor:
136             db_cursor.execute(SQL(query))
137             for row in db_cursor:
138                 self.table_phrases_to_delete.add(row[0])
139
140     def _load_white_and_black_lists(self) \
141             -> Tuple[Mapping[str, Sequence[str]], Mapping[str, Sequence[str]]]:
142         """
143             Load white and black lists from phrases-settings.json.
144         """
145         settings = self.config.load_sub_configuration('phrase-settings.json')
146
147         return settings['blackList'], settings['whiteList']
148
149     def _check_sanity(self, phrase: SpecialPhrase) -> bool:
150         """
151             Check sanity of given inputs in case somebody added garbage in the wiki.
152             If a bad class/type is detected the system will exit with an error.
153         """
154         class_matchs = self.sanity_check_pattern.findall(phrase.p_class)
155         type_matchs = self.sanity_check_pattern.findall(phrase.p_type)
156
157         if not class_matchs or not type_matchs:
158             LOG.warning("Bad class/type: %s=%s. It will not be imported",
159                         phrase.p_class, phrase.p_type)
160             return False
161         return True
162
163     def _process_phrase(self, phrase: SpecialPhrase) -> Optional[Tuple[str, str]]:
164         """
165             Processes the given phrase by checking black and white list
166             and sanity.
167             Return the class/type pair corresponding to the phrase.
168         """
169
170         # blacklisting: disallow certain class/type combinations
171         if phrase.p_class in self.black_list.keys() \
172            and phrase.p_type in self.black_list[phrase.p_class]:
173             return None
174
175         # whitelisting: if class is in whitelist, allow only tags in the list
176         if phrase.p_class in self.white_list.keys() \
177            and phrase.p_type not in self.white_list[phrase.p_class]:
178             return None
179
180         # sanity check, in case somebody added garbage in the wiki
181         if not self._check_sanity(phrase):
182             self.statistics_handler.notify_one_phrase_invalid()
183             return None
184
185         self.word_phrases.add((phrase.p_label, phrase.p_class,
186                                phrase.p_type, phrase.p_operator))
187
188         return (phrase.p_class, phrase.p_type)
189
190     def _create_classtype_table_and_indexes(self,
191                                             class_type_pairs: Iterable[Tuple[str, str]],
192                                             min: int = 0) -> None:
193         """
194             Create table place_classtype for each given pair.
195             Also create indexes on place_id and centroid.
196         """
197         LOG.warning('Create tables and indexes...')
198
199         sql_tablespace = self.config.TABLESPACE_AUX_DATA
200         if sql_tablespace:
201             sql_tablespace = ' TABLESPACE ' + sql_tablespace
202
203         with self.db_connection.cursor() as db_cursor:
204             db_cursor.execute("CREATE INDEX idx_placex_classtype ON placex (class, type)")
205
206         allowed_special_phrases = self.get_classtype_pairs(min)
207
208         for pair in class_type_pairs:
209             phrase_class = pair[0]
210             phrase_type = pair[1]
211
212             # Will only filter if min is not 0
213             if min and (phrase_class, phrase_type) not in allowed_special_phrases:
214                 LOG.warning("Skipping phrase %s=%s: not in allowed special phrases",
215                             phrase_class, phrase_type)
216                 continue
217
218             table_name = _classtype_table(phrase_class, phrase_type)
219
220             if table_name in self.table_phrases_to_delete:
221                 self.statistics_handler.notify_one_table_ignored()
222                 # Remove this table from the ones to delete as it match a
223                 # class/type still existing on the special phrases of the wiki.
224                 self.table_phrases_to_delete.remove(table_name)
225                 # So don't need to create the table and indexes.
226                 continue
227
228             # Table creation
229             self._create_place_classtype_table(sql_tablespace, phrase_class, phrase_type)
230
231             # Indexes creation
232             self._create_place_classtype_indexes(sql_tablespace, phrase_class, phrase_type)
233
234             # Grant access on read to the web user.
235             self._grant_access_to_webuser(phrase_class, phrase_type)
236
237             self.statistics_handler.notify_one_table_created()
238
239         with self.db_connection.cursor() as db_cursor:
240             db_cursor.execute("DROP INDEX idx_placex_classtype")
241
242     def _create_place_classtype_table(self, sql_tablespace: str,
243                                       phrase_class: str, phrase_type: str) -> None:
244         """
245             Create table place_classtype of the given phrase_class/phrase_type
246             if doesn't exit.
247         """
248         table_name = _classtype_table(phrase_class, phrase_type)
249         with self.db_connection.cursor() as cur:
250             cur.execute(SQL("""CREATE TABLE IF NOT EXISTS {} {} AS
251                                  SELECT place_id AS place_id,
252                                         st_centroid(geometry) AS centroid
253                                  FROM placex
254                                  WHERE class = %s AND type = %s
255                              """).format(Identifier(table_name), SQL(sql_tablespace)),
256                         (phrase_class, phrase_type))
257
258     def _create_place_classtype_indexes(self, sql_tablespace: str,
259                                         phrase_class: str, phrase_type: str) -> None:
260         """
261             Create indexes on centroid and place_id for the place_classtype table.
262         """
263         index_prefix = f'idx_place_classtype_{phrase_class}_{phrase_type}_'
264         base_table = _classtype_table(phrase_class, phrase_type)
265         # Index on centroid
266         if not index_exists(self.db_connection, index_prefix + 'centroid'):
267             with self.db_connection.cursor() as db_cursor:
268                 db_cursor.execute(SQL("CREATE INDEX {} ON {} USING GIST (centroid) {}")
269                                   .format(Identifier(index_prefix + 'centroid'),
270                                           Identifier(base_table),
271                                           SQL(sql_tablespace)))
272
273         # Index on place_id
274         if not index_exists(self.db_connection, index_prefix + 'place_id'):
275             with self.db_connection.cursor() as db_cursor:
276                 db_cursor.execute(SQL("CREATE INDEX {} ON {} USING btree(place_id) {}")
277                                   .format(Identifier(index_prefix + 'place_id'),
278                                           Identifier(base_table),
279                                           SQL(sql_tablespace)))
280
281     def _grant_access_to_webuser(self, phrase_class: str, phrase_type: str) -> None:
282         """
283             Grant access on read to the table place_classtype for the webuser.
284         """
285         table_name = _classtype_table(phrase_class, phrase_type)
286         with self.db_connection.cursor() as db_cursor:
287             db_cursor.execute(SQL("""GRANT SELECT ON {} TO {}""")
288                               .format(Identifier(table_name),
289                                       Identifier(self.config.DATABASE_WEBUSER)))
290
291     def _remove_non_existent_tables_from_db(self) -> None:
292         """
293             Remove special phrases which doesn't exist on the wiki anymore.
294             Delete the place_classtype tables.
295         """
296         LOG.warning('Cleaning database...')
297
298         # Delete place_classtype tables corresponding to class/type which
299         # are not on the wiki anymore.
300         drop_tables(self.db_connection, *self.table_phrases_to_delete)
301         for _ in self.table_phrases_to_delete:
302             self.statistics_handler.notify_one_table_deleted()