]> git.openstreetmap.org Git - nominatim.git/blob - src/nominatim_db/tools/special_phrases/sp_importer.py
Added command line (default 0) min argument for minimum filtering, updated args.py...
[nominatim.git] / src / nominatim_db / tools / special_phrases / sp_importer.py
1 # SPDX-License-Identifier: GPL-3.0-or-later
2 #
3 # This file is part of Nominatim. (https://nominatim.org)
4 #
5 # Copyright (C) 2024 by the Nominatim developer community.
6 # For a full list of authors see the git log.
7 """
8     Module containing the class handling the import
9     of the special phrases.
10
11     Phrases are analyzed and imported into the database.
12
13     The phrases already present in the database which are not
14     valids anymore are removed.
15 """
16 from typing import Iterable, Tuple, Mapping, Sequence, Optional, Set
17 import logging
18 import re
19 from psycopg.sql import Identifier, SQL
20
21 from ...typing import Protocol
22 from ...config import Configuration
23 from ...db.connection import Connection, drop_tables, index_exists
24 from .importer_statistics import SpecialPhrasesImporterStatistics
25 from .special_phrase import SpecialPhrase
26 from ...tokenizer.base import AbstractTokenizer
27
28 LOG = logging.getLogger()
29
30
31 def _classtype_table(phrase_class: str, phrase_type: str) -> str:
32     """ Return the name of the table for the given class and type.
33     """
34     return f'place_classtype_{phrase_class}_{phrase_type}'
35
36
37 class SpecialPhraseLoader(Protocol):
38     """ Protocol for classes implementing a loader for special phrases.
39     """
40
41     def generate_phrases(self) -> Iterable[SpecialPhrase]:
42         """ Generates all special phrase terms this loader can produce.
43         """
44
45
46 class SPImporter():
47     """
48         Class handling the process of special phrases importation into the database.
49
50         Take a sp loader which load the phrases from an external source.
51     """
52     def __init__(self, config: Configuration, conn: Connection,
53                  sp_loader: SpecialPhraseLoader) -> None:
54         self.config = config
55         self.db_connection = conn
56         self.sp_loader = sp_loader
57         self.statistics_handler = SpecialPhrasesImporterStatistics()
58         self.black_list, self.white_list = self._load_white_and_black_lists()
59         self.sanity_check_pattern = re.compile(r'^\w+$')
60         # This set will contain all existing phrases to be added.
61         # It contains tuples with the following format: (label, class, type, operator)
62         self.word_phrases: Set[Tuple[str, str, str, str]] = set()
63         # This set will contain all existing place_classtype tables which doesn't match any
64         # special phrases class/type on the wiki.
65         self.table_phrases_to_delete: Set[str] = set()
66
67     def get_classtype_pairs(self, min: int = 0) -> Set[Tuple[str, str]]:
68         """
69             Returns list of allowed special phrases from the database,
70             restricting to a list of combinations of classes and types
71             which occur more than a specified amount of times.
72
73             Default value for this, if not specified, is at least once.
74         """
75         db_combinations = set()
76         query = f"""
77         SELECT class AS CLS, type AS typ
78         FROM placex
79         GROUP BY class, type
80         HAVING COUNT(*) > {min}
81         """
82
83         with self.db_connection.cursor() as db_cursor:
84             db_cursor.execute(SQL(query))
85             for row in db_cursor:
86                 db_combinations.add((row[0], row[1]))
87
88         return db_combinations
89
90     def import_phrases(self, tokenizer: AbstractTokenizer, should_replace: bool, min: int) -> None:
91         """
92             Iterate through all SpecialPhrases extracted from the
93             loader and import them into the database.
94
95             If should_replace is set to True only the loaded phrases
96             will be kept into the database. All other phrases already
97             in the database will be removed.
98         """
99         LOG.warning('Special phrases importation starting')
100         self._fetch_existing_place_classtype_tables()
101
102         # Store pairs of class/type for further processing
103         class_type_pairs = set()
104
105         for phrase in self.sp_loader.generate_phrases():
106             result = self._process_phrase(phrase)
107             if result:
108                 class_type_pairs.add(result)
109
110         self._create_classtype_table_and_indexes(class_type_pairs, min)
111         if should_replace:
112             self._remove_non_existent_tables_from_db()
113
114         self.db_connection.commit()
115
116         with tokenizer.name_analyzer() as analyzer:
117             analyzer.update_special_phrases(self.word_phrases, should_replace)
118
119         LOG.warning('Import done.')
120         self.statistics_handler.notify_import_done()
121
122     def _fetch_existing_place_classtype_tables(self) -> None:
123         """
124             Fetch existing place_classtype tables.
125             Fill the table_phrases_to_delete set of the class.
126         """
127         query = """
128             SELECT table_name
129             FROM information_schema.tables
130             WHERE table_schema='public'
131             AND table_name like 'place_classtype_%';
132         """
133         with self.db_connection.cursor() as db_cursor:
134             db_cursor.execute(SQL(query))
135             for row in db_cursor:
136                 self.table_phrases_to_delete.add(row[0])
137
138     def _load_white_and_black_lists(self) \
139             -> Tuple[Mapping[str, Sequence[str]], Mapping[str, Sequence[str]]]:
140         """
141             Load white and black lists from phrases-settings.json.
142         """
143         settings = self.config.load_sub_configuration('phrase-settings.json')
144
145         return settings['blackList'], settings['whiteList']
146
147     def _check_sanity(self, phrase: SpecialPhrase) -> bool:
148         """
149             Check sanity of given inputs in case somebody added garbage in the wiki.
150             If a bad class/type is detected the system will exit with an error.
151         """
152         class_matchs = self.sanity_check_pattern.findall(phrase.p_class)
153         type_matchs = self.sanity_check_pattern.findall(phrase.p_type)
154
155         if not class_matchs or not type_matchs:
156             LOG.warning("Bad class/type: %s=%s. It will not be imported",
157                         phrase.p_class, phrase.p_type)
158             return False
159         return True
160
161     def _process_phrase(self, phrase: SpecialPhrase) -> Optional[Tuple[str, str]]:
162         """
163             Processes the given phrase by checking black and white list
164             and sanity.
165             Return the class/type pair corresponding to the phrase.
166         """
167
168         # blacklisting: disallow certain class/type combinations
169         if phrase.p_class in self.black_list.keys() \
170            and phrase.p_type in self.black_list[phrase.p_class]:
171             return None
172
173         # whitelisting: if class is in whitelist, allow only tags in the list
174         if phrase.p_class in self.white_list.keys() \
175            and phrase.p_type not in self.white_list[phrase.p_class]:
176             return None
177
178         # sanity check, in case somebody added garbage in the wiki
179         if not self._check_sanity(phrase):
180             self.statistics_handler.notify_one_phrase_invalid()
181             return None
182
183         self.word_phrases.add((phrase.p_label, phrase.p_class,
184                                phrase.p_type, phrase.p_operator))
185
186         return (phrase.p_class, phrase.p_type)
187
188     def _create_classtype_table_and_indexes(self,
189                                             class_type_pairs: Iterable[Tuple[str, str]],
190                                             min: int) -> None:
191         """
192             Create table place_classtype for each given pair.
193             Also create indexes on place_id and centroid.
194         """
195         LOG.warning('Create tables and indexes...')
196
197         sql_tablespace = self.config.TABLESPACE_AUX_DATA
198         if sql_tablespace:
199             sql_tablespace = ' TABLESPACE ' + sql_tablespace
200
201         with self.db_connection.cursor() as db_cursor:
202             db_cursor.execute("CREATE INDEX idx_placex_classtype ON placex (class, type)")
203
204         allowed_special_phrases = self.get_classtype_pairs(min)
205
206         for pair in class_type_pairs:
207             phrase_class = pair[0]
208             phrase_type = pair[1]
209
210             if (phrase_class, phrase_type) not in allowed_special_phrases:
211                 LOG.warning("Skipping phrase %s=%s: not in allowed special phrases",
212                             phrase_class, phrase_type)
213                 continue
214
215             table_name = _classtype_table(phrase_class, phrase_type)
216
217             if table_name in self.table_phrases_to_delete:
218                 self.statistics_handler.notify_one_table_ignored()
219                 # Remove this table from the ones to delete as it match a
220                 # class/type still existing on the special phrases of the wiki.
221                 self.table_phrases_to_delete.remove(table_name)
222                 # So don't need to create the table and indexes.
223                 continue
224
225             # Table creation
226             self._create_place_classtype_table(sql_tablespace, phrase_class, phrase_type)
227
228             # Indexes creation
229             self._create_place_classtype_indexes(sql_tablespace, phrase_class, phrase_type)
230
231             # Grant access on read to the web user.
232             self._grant_access_to_webuser(phrase_class, phrase_type)
233
234             self.statistics_handler.notify_one_table_created()
235
236         with self.db_connection.cursor() as db_cursor:
237             db_cursor.execute("DROP INDEX idx_placex_classtype")
238
239     def _create_place_classtype_table(self, sql_tablespace: str,
240                                       phrase_class: str, phrase_type: str) -> None:
241         """
242             Create table place_classtype of the given phrase_class/phrase_type
243             if doesn't exit.
244         """
245         table_name = _classtype_table(phrase_class, phrase_type)
246         with self.db_connection.cursor() as cur:
247             cur.execute(SQL("""CREATE TABLE IF NOT EXISTS {} {} AS
248                                  SELECT place_id AS place_id,
249                                         st_centroid(geometry) AS centroid
250                                  FROM placex
251                                  WHERE class = %s AND type = %s
252                              """).format(Identifier(table_name), SQL(sql_tablespace)),
253                         (phrase_class, phrase_type))
254
255     def _create_place_classtype_indexes(self, sql_tablespace: str,
256                                         phrase_class: str, phrase_type: str) -> None:
257         """
258             Create indexes on centroid and place_id for the place_classtype table.
259         """
260         index_prefix = f'idx_place_classtype_{phrase_class}_{phrase_type}_'
261         base_table = _classtype_table(phrase_class, phrase_type)
262         # Index on centroid
263         if not index_exists(self.db_connection, index_prefix + 'centroid'):
264             with self.db_connection.cursor() as db_cursor:
265                 db_cursor.execute(SQL("CREATE INDEX {} ON {} USING GIST (centroid) {}")
266                                   .format(Identifier(index_prefix + 'centroid'),
267                                           Identifier(base_table),
268                                           SQL(sql_tablespace)))
269
270         # Index on place_id
271         if not index_exists(self.db_connection, index_prefix + 'place_id'):
272             with self.db_connection.cursor() as db_cursor:
273                 db_cursor.execute(SQL("CREATE INDEX {} ON {} USING btree(place_id) {}")
274                                   .format(Identifier(index_prefix + 'place_id'),
275                                           Identifier(base_table),
276                                           SQL(sql_tablespace)))
277
278     def _grant_access_to_webuser(self, phrase_class: str, phrase_type: str) -> None:
279         """
280             Grant access on read to the table place_classtype for the webuser.
281         """
282         table_name = _classtype_table(phrase_class, phrase_type)
283         with self.db_connection.cursor() as db_cursor:
284             db_cursor.execute(SQL("""GRANT SELECT ON {} TO {}""")
285                               .format(Identifier(table_name),
286                                       Identifier(self.config.DATABASE_WEBUSER)))
287
288     def _remove_non_existent_tables_from_db(self) -> None:
289         """
290             Remove special phrases which doesn't exist on the wiki anymore.
291             Delete the place_classtype tables.
292         """
293         LOG.warning('Cleaning database...')
294
295         # Delete place_classtype tables corresponding to class/type which
296         # are not on the wiki anymore.
297         drop_tables(self.db_connection, *self.table_phrases_to_delete)
298         for _ in self.table_phrases_to_delete:
299             self.statistics_handler.notify_one_table_deleted()