1 # SPDX-License-Identifier: GPL-2.0-only
 
   3 # This file is part of Nominatim. (https://nominatim.org)
 
   5 # Copyright (C) 2022 by the Nominatim developer community.
 
   6 # For a full list of authors see the git log.
 
   8     Module containing the class handling the import
 
   9     of the special phrases.
 
  11     Phrases are analyzed and imported into the database.
 
  13     The phrases already present in the database which are not
 
  14     valids anymore are removed.
 
  16 from typing import Iterable, Tuple, Mapping, Sequence, Optional, Set
 
  20 from psycopg2.sql import Identifier, SQL
 
  22 from nominatim.config import Configuration
 
  23 from nominatim.db.connection import Connection
 
  24 from nominatim.tools.special_phrases.importer_statistics import SpecialPhrasesImporterStatistics
 
  25 from nominatim.tools.special_phrases.special_phrase import SpecialPhrase
 
  26 from nominatim.tokenizer.base import AbstractTokenizer
 
  27 from nominatim.typing import Protocol
 
  29 LOG = logging.getLogger()
 
  31 def _classtype_table(phrase_class: str, phrase_type: str) -> str:
 
  32     """ Return the name of the table for the given class and type.
 
  34     return f'place_classtype_{phrase_class}_{phrase_type}'
 
  37 class SpecialPhraseLoader(Protocol):
 
  38     """ Protocol for classes implementing a loader for special phrases.
 
  41     def generate_phrases(self) -> Iterable[SpecialPhrase]:
 
  42         """ Generates all special phrase terms this loader can produce.
 
  47     # pylint: disable-msg=too-many-instance-attributes
 
  49         Class handling the process of special phrases importation into the database.
 
  51         Take a sp loader which load the phrases from an external source.
 
  53     def __init__(self, config: Configuration, conn: Connection,
 
  54                  sp_loader: SpecialPhraseLoader) -> None:
 
  56         self.db_connection = conn
 
  57         self.sp_loader = sp_loader
 
  58         self.statistics_handler = SpecialPhrasesImporterStatistics()
 
  59         self.black_list, self.white_list = self._load_white_and_black_lists()
 
  60         self.sanity_check_pattern = re.compile(r'^\w+$')
 
  61         # This set will contain all existing phrases to be added.
 
  62         # It contains tuples with the following format: (label, class, type, operator)
 
  63         self.word_phrases: Set[Tuple[str, str, str, str]] = set()
 
  64         # This set will contain all existing place_classtype tables which doesn't match any
 
  65         # special phrases class/type on the wiki.
 
  66         self.table_phrases_to_delete: Set[str] = set()
 
  68     def import_phrases(self, tokenizer: AbstractTokenizer, should_replace: bool) -> None:
 
  70             Iterate through all SpecialPhrases extracted from the
 
  71             loader and import them into the database.
 
  73             If should_replace is set to True only the loaded phrases
 
  74             will be kept into the database. All other phrases already
 
  75             in the database will be removed.
 
  77         LOG.warning('Special phrases importation starting')
 
  78         self._fetch_existing_place_classtype_tables()
 
  80         # Store pairs of class/type for further processing
 
  81         class_type_pairs = set()
 
  83         for phrase in self.sp_loader.generate_phrases():
 
  84             result = self._process_phrase(phrase)
 
  86                 class_type_pairs.add(result)
 
  88         self._create_classtype_table_and_indexes(class_type_pairs)
 
  90             self._remove_non_existent_tables_from_db()
 
  91         self.db_connection.commit()
 
  93         with tokenizer.name_analyzer() as analyzer:
 
  94             analyzer.update_special_phrases(self.word_phrases, should_replace)
 
  96         LOG.warning('Import done.')
 
  97         self.statistics_handler.notify_import_done()
 
 100     def _fetch_existing_place_classtype_tables(self) -> None:
 
 102             Fetch existing place_classtype tables.
 
 103             Fill the table_phrases_to_delete set of the class.
 
 107             FROM information_schema.tables
 
 108             WHERE table_schema='public'
 
 109             AND table_name like 'place_classtype_%';
 
 111         with self.db_connection.cursor() as db_cursor:
 
 112             db_cursor.execute(SQL(query))
 
 113             for row in db_cursor:
 
 114                 self.table_phrases_to_delete.add(row[0])
 
 116     def _load_white_and_black_lists(self) \
 
 117           -> Tuple[Mapping[str, Sequence[str]], Mapping[str, Sequence[str]]]:
 
 119             Load white and black lists from phrases-settings.json.
 
 121         settings = self.config.load_sub_configuration('phrase-settings.json')
 
 123         return settings['blackList'], settings['whiteList']
 
 125     def _check_sanity(self, phrase: SpecialPhrase) -> bool:
 
 127             Check sanity of given inputs in case somebody added garbage in the wiki.
 
 128             If a bad class/type is detected the system will exit with an error.
 
 130         class_matchs = self.sanity_check_pattern.findall(phrase.p_class)
 
 131         type_matchs = self.sanity_check_pattern.findall(phrase.p_type)
 
 133         if not class_matchs or not type_matchs:
 
 134             LOG.warning("Bad class/type: %s=%s. It will not be imported",
 
 135                         phrase.p_class, phrase.p_type)
 
 139     def _process_phrase(self, phrase: SpecialPhrase) -> Optional[Tuple[str, str]]:
 
 141             Processes the given phrase by checking black and white list
 
 143             Return the class/type pair corresponding to the phrase.
 
 146         # blacklisting: disallow certain class/type combinations
 
 147         if phrase.p_class in self.black_list.keys() \
 
 148            and phrase.p_type in self.black_list[phrase.p_class]:
 
 151         # whitelisting: if class is in whitelist, allow only tags in the list
 
 152         if phrase.p_class in self.white_list.keys() \
 
 153            and phrase.p_type not in self.white_list[phrase.p_class]:
 
 156         # sanity check, in case somebody added garbage in the wiki
 
 157         if not self._check_sanity(phrase):
 
 158             self.statistics_handler.notify_one_phrase_invalid()
 
 161         self.word_phrases.add((phrase.p_label, phrase.p_class,
 
 162                                phrase.p_type, phrase.p_operator))
 
 164         return (phrase.p_class, phrase.p_type)
 
 167     def _create_classtype_table_and_indexes(self,
 
 168                                             class_type_pairs: Iterable[Tuple[str, str]]) -> None:
 
 170             Create table place_classtype for each given pair.
 
 171             Also create indexes on place_id and centroid.
 
 173         LOG.warning('Create tables and indexes...')
 
 175         sql_tablespace = self.config.TABLESPACE_AUX_DATA
 
 177             sql_tablespace = ' TABLESPACE ' + sql_tablespace
 
 179         with self.db_connection.cursor() as db_cursor:
 
 180             db_cursor.execute("CREATE INDEX idx_placex_classtype ON placex (class, type)")
 
 182         for pair in class_type_pairs:
 
 183             phrase_class = pair[0]
 
 184             phrase_type = pair[1]
 
 186             table_name = _classtype_table(phrase_class, phrase_type)
 
 188             if table_name in self.table_phrases_to_delete:
 
 189                 self.statistics_handler.notify_one_table_ignored()
 
 190                 # Remove this table from the ones to delete as it match a
 
 191                 # class/type still existing on the special phrases of the wiki.
 
 192                 self.table_phrases_to_delete.remove(table_name)
 
 193                 # So don't need to create the table and indexes.
 
 197             self._create_place_classtype_table(sql_tablespace, phrase_class, phrase_type)
 
 200             self._create_place_classtype_indexes(sql_tablespace, phrase_class, phrase_type)
 
 202             # Grant access on read to the web user.
 
 203             self._grant_access_to_webuser(phrase_class, phrase_type)
 
 205             self.statistics_handler.notify_one_table_created()
 
 207         with self.db_connection.cursor() as db_cursor:
 
 208             db_cursor.execute("DROP INDEX idx_placex_classtype")
 
 211     def _create_place_classtype_table(self, sql_tablespace: str,
 
 212                                       phrase_class: str, phrase_type: str) -> None:
 
 214             Create table place_classtype of the given phrase_class/phrase_type
 
 217         table_name = _classtype_table(phrase_class, phrase_type)
 
 218         with self.db_connection.cursor() as cur:
 
 219             cur.execute(SQL("""CREATE TABLE IF NOT EXISTS {} {} AS
 
 220                                  SELECT place_id AS place_id,
 
 221                                         st_centroid(geometry) AS centroid
 
 223                                  WHERE class = %s AND type = %s
 
 224                              """).format(Identifier(table_name), SQL(sql_tablespace)),
 
 225                         (phrase_class, phrase_type))
 
 228     def _create_place_classtype_indexes(self, sql_tablespace: str,
 
 229                                         phrase_class: str, phrase_type: str) -> None:
 
 231             Create indexes on centroid and place_id for the place_classtype table.
 
 233         index_prefix = f'idx_place_classtype_{phrase_class}_{phrase_type}_'
 
 234         base_table = _classtype_table(phrase_class, phrase_type)
 
 236         if not self.db_connection.index_exists(index_prefix + 'centroid'):
 
 237             with self.db_connection.cursor() as db_cursor:
 
 238                 db_cursor.execute(SQL("CREATE INDEX {} ON {} USING GIST (centroid) {}")
 
 239                                   .format(Identifier(index_prefix + 'centroid'),
 
 240                                           Identifier(base_table),
 
 241                                           SQL(sql_tablespace)))
 
 244         if not self.db_connection.index_exists(index_prefix + 'place_id'):
 
 245             with self.db_connection.cursor() as db_cursor:
 
 246                 db_cursor.execute(SQL("CREATE INDEX {} ON {} USING btree(place_id) {}")
 
 247                                   .format(Identifier(index_prefix + 'place_id'),
 
 248                                           Identifier(base_table),
 
 249                                           SQL(sql_tablespace)))
 
 252     def _grant_access_to_webuser(self, phrase_class: str, phrase_type: str) -> None:
 
 254             Grant access on read to the table place_classtype for the webuser.
 
 256         table_name = _classtype_table(phrase_class, phrase_type)
 
 257         with self.db_connection.cursor() as db_cursor:
 
 258             db_cursor.execute(SQL("""GRANT SELECT ON {} TO {}""")
 
 259                               .format(Identifier(table_name),
 
 260                                       Identifier(self.config.DATABASE_WEBUSER)))
 
 262     def _remove_non_existent_tables_from_db(self) -> None:
 
 264             Remove special phrases which doesn't exist on the wiki anymore.
 
 265             Delete the place_classtype tables.
 
 267         LOG.warning('Cleaning database...')
 
 269         # Delete place_classtype tables corresponding to class/type which
 
 270         # are not on the wiki anymore.
 
 271         with self.db_connection.cursor() as db_cursor:
 
 272             for table in self.table_phrases_to_delete:
 
 273                 self.statistics_handler.notify_one_table_deleted()
 
 274                 db_cursor.drop_table(table)