1 # SPDX-License-Identifier: GPL-3.0-or-later
 
   3 # This file is part of Nominatim. (https://nominatim.org)
 
   5 # Copyright (C) 2025 by the Nominatim developer community.
 
   6 # For a full list of authors see the git log.
 
   8 Functions for bringing auxiliary data in the database up-to-date.
 
  10 from typing import MutableSequence, Tuple, Any, Mapping, Sequence, List
 
  14 from pathlib import Path
 
  16 from psycopg import sql as pysql
 
  18 from ..config import Configuration
 
  19 from ..db.connection import Connection, connect, drop_tables
 
  20 from ..db.utils import execute_file
 
  21 from ..db.sql_preprocessor import SQLPreprocessor
 
  23 LOG = logging.getLogger()
 
  25 OSM_TYPE = {'N': 'node', 'W': 'way', 'R': 'relation'}
 
  28 def _add_address_level_rows_from_entry(rows: MutableSequence[Tuple[Any, ...]],
 
  29                                        entry: Mapping[str, Any]) -> None:
 
  30     """ Converts a single entry from the JSON format for address rank
 
  31         descriptions into a flat format suitable for inserting into a
 
  32         PostgreSQL table and adds these lines to `rows`.
 
  34     countries = entry.get('countries') or (None, )
 
  35     for key, values in entry['tags'].items():
 
  36         for value, ranks in values.items():
 
  37             if isinstance(ranks, list):
 
  38                 rank_search, rank_address = ranks
 
  40                 rank_search = rank_address = ranks
 
  43             for country in countries:
 
  44                 rows.append((country, key, value, rank_search, rank_address))
 
  47 def load_address_levels(conn: Connection, table: str, levels: Sequence[Mapping[str, Any]]) -> None:
 
  48     """ Replace the `address_levels` table with the contents of `levels'.
 
  50         A new table is created any previously existing table is dropped.
 
  51         The table has the following columns:
 
  52             country, class, type, rank_search, rank_address
 
  54     rows: List[Tuple[Any, ...]] = []
 
  56         _add_address_level_rows_from_entry(rows, entry)
 
  58     drop_tables(conn, table)
 
  60     with conn.cursor() as cur:
 
  61         cur.execute(pysql.SQL("""CREATE TABLE {} (
 
  62                                         country_code varchar(2),
 
  66                                         rank_address SMALLINT)
 
  67                               """).format(pysql.Identifier(table)))
 
  69         cur.executemany(pysql.SQL("INSERT INTO {} VALUES (%s, %s, %s, %s, %s)")
 
  70                              .format(pysql.Identifier(table)), rows)
 
  72         cur.execute(pysql.SQL('CREATE UNIQUE INDEX ON {} (country_code, class, type)')
 
  73                     .format(pysql.Identifier(table)))
 
  78 def load_address_levels_from_config(conn: Connection, config: Configuration) -> None:
 
  79     """ Replace the `address_levels` table with the content as
 
  80         defined in the given configuration. Uses the parameter
 
  81         NOMINATIM_ADDRESS_LEVEL_CONFIG to determine the location of the
 
  84     cfg = config.load_sub_configuration('', config='ADDRESS_LEVEL_CONFIG')
 
  85     load_address_levels(conn, 'address_levels', cfg)
 
  88 def create_functions(conn: Connection, config: Configuration,
 
  89                      enable_diff_updates: bool = True,
 
  90                      enable_debug: bool = False) -> None:
 
  91     """ (Re)create the PL/pgSQL functions.
 
  93     sql = SQLPreprocessor(conn, config)
 
  95     sql.run_sql_file(conn, 'functions.sql',
 
  96                      disable_diff_updates=not enable_diff_updates,
 
 100 def import_wikipedia_articles(dsn: str, data_path: Path, ignore_errors: bool = False) -> int:
 
 101     """ Replaces the wikipedia importance tables with new data.
 
 102         The import is run in a single transaction so that the new data
 
 103         is replace seamlessly.
 
 105         Returns 0 if all was well and 1 if the importance file could not
 
 106         be found. Throws an exception if there was an error reading the file.
 
 108     if import_importance_csv(dsn, data_path / 'wikimedia-importance.csv.gz') == 0 \
 
 109        or import_importance_sql(dsn, data_path / 'wikimedia-importance.sql.gz',
 
 116 def import_importance_csv(dsn: str, data_file: Path) -> int:
 
 117     """ Replace wikipedia importance table with data from a
 
 120         The file must be a gzipped CSV and have the following columns:
 
 121         language, title, importance, wikidata_id
 
 123         Other columns may be present but will be ignored.
 
 125     if not data_file.exists():
 
 128     # Only import the first occurrence of a wikidata ID.
 
 129     # This keeps indexes and table small.
 
 132     with connect(dsn) as conn:
 
 133         drop_tables(conn, 'wikipedia_article', 'wikipedia_redirect', 'wikimedia_importance')
 
 134         with conn.cursor() as cur:
 
 135             cur.execute("""CREATE TABLE wikimedia_importance (
 
 136                              language TEXT NOT NULL,
 
 138                              importance double precision NOT NULL,
 
 142             copy_cmd = """COPY wikimedia_importance(language, title, importance, wikidata)
 
 144             with gzip.open(str(data_file), 'rt') as fd, cur.copy(copy_cmd) as copy:
 
 145                 for row in csv.DictReader(fd, delimiter='\t', quotechar='|'):
 
 146                     wd_id = int(row['wikidata_id'][1:])
 
 147                     copy.write_row((row['language'],
 
 150                                     None if wd_id in wd_done else row['wikidata_id']))
 
 153             cur.execute("""CREATE INDEX IF NOT EXISTS idx_wikimedia_importance_title
 
 154                            ON wikimedia_importance (title)""")
 
 155             cur.execute("""CREATE INDEX IF NOT EXISTS idx_wikimedia_importance_wikidata
 
 156                            ON wikimedia_importance (wikidata)
 
 157                            WHERE wikidata is not null""")
 
 164 def import_importance_sql(dsn: str, data_file: Path, ignore_errors: bool) -> int:
 
 165     """ Replace wikipedia importance table with data from an SQL file.
 
 167     if not data_file.exists():
 
 171                   DROP TABLE IF EXISTS "wikipedia_article";
 
 172                   DROP TABLE IF EXISTS "wikipedia_redirect";
 
 173                   DROP TABLE IF EXISTS "wikipedia_importance";
 
 176     execute_file(dsn, data_file, ignore_errors=ignore_errors,
 
 177                  pre_code=pre_code, post_code=post_code)
 
 182 def import_secondary_importance(dsn: str, data_path: Path, ignore_errors: bool = False) -> int:
 
 183     """ Replaces the secondary importance raster data table with new data.
 
 185         Returns 0 if all was well and 1 if the raster SQL file could not
 
 186         be found. Throws an exception if there was an error reading the file.
 
 188     datafile = data_path / 'secondary_importance.sql.gz'
 
 189     if not datafile.exists():
 
 192     execute_file(dsn, datafile, ignore_errors=ignore_errors)
 
 197 def recompute_importance(conn: Connection) -> None:
 
 198     """ Recompute wikipedia links and importance for all entries in placex.
 
 199         This is a long-running operations that must not be executed in
 
 200         parallel with updates.
 
 202     with conn.cursor() as cur:
 
 203         cur.execute('ALTER TABLE placex DISABLE TRIGGER ALL')
 
 205             UPDATE placex SET (wikipedia, importance) =
 
 206                (SELECT wikipedia, importance
 
 207                 FROM compute_importance(extratags, country_code, rank_search, centroid))
 
 210             UPDATE placex s SET wikipedia = d.wikipedia, importance = d.importance
 
 212              WHERE s.place_id = d.linked_place_id and d.wikipedia is not null
 
 213                    and (s.wikipedia is null or s.importance < d.importance);
 
 216             UPDATE search_name s SET importance = p.importance
 
 218              WHERE s.place_id = p.place_id AND s.importance != p.importance
 
 221         cur.execute('ALTER TABLE placex ENABLE TRIGGER ALL')
 
 225 def invalidate_osm_object(osm_type: str, osm_id: int, conn: Connection,
 
 226                           recursive: bool = True) -> None:
 
 227     """ Mark the given OSM object for reindexing. When 'recursive' is set
 
 228         to True (the default), then all dependent objects are marked for
 
 231         'osm_type' must be on of 'N' (node), 'W' (way) or 'R' (relation).
 
 232         If the given object does not exist, then nothing happens.
 
 234     assert osm_type in ('N', 'R', 'W')
 
 236     LOG.warning("Invalidating OSM %s %s%s.",
 
 237                 OSM_TYPE[osm_type], osm_id,
 
 238                 ' and its dependent places' if recursive else '')
 
 240     with conn.cursor() as cur:
 
 242             sql = """SELECT place_force_update(place_id)
 
 243                      FROM placex WHERE osm_type = %s and osm_id = %s"""
 
 245             sql = """UPDATE placex SET indexed_status = 2
 
 246                      WHERE osm_type = %s and osm_id = %s"""
 
 248         cur.execute(sql, (osm_type, osm_id))