1 # SPDX-License-Identifier: GPL-2.0-only
 
   3 # This file is part of Nominatim. (https://nominatim.org)
 
   5 # Copyright (C) 2022 by the Nominatim developer community.
 
   6 # For a full list of authors see the git log.
 
   8 Mix-ins that provide the actual commands for the indexer for various indexing
 
  11 from typing import Any, List
 
  14 from psycopg2 import sql as pysql
 
  15 import psycopg2.extras
 
  17 from nominatim.data.place_info import PlaceInfo
 
  18 from nominatim.tokenizer.base import AbstractAnalyzer
 
  19 from nominatim.db.async_connection import DBConnection
 
  20 from nominatim.typing import Query, DictCursorResult, DictCursorResults, Protocol
 
  22 # pylint: disable=C0111
 
  24 def _mk_valuelist(template: str, num: int) -> pysql.Composed:
 
  25     return pysql.SQL(',').join([pysql.SQL(template)] * num)
 
  27 def _analyze_place(place: DictCursorResult, analyzer: AbstractAnalyzer) -> psycopg2.extras.Json:
 
  28     return psycopg2.extras.Json(analyzer.process_place(PlaceInfo(place)))
 
  31 class Runner(Protocol):
 
  32     def name(self) -> str: ...
 
  33     def sql_count_objects(self) -> Query: ...
 
  34     def sql_get_objects(self) -> Query: ...
 
  35     def get_place_details(self, worker: DBConnection,
 
  36                           ids: DictCursorResults) -> DictCursorResults: ...
 
  37     def index_places(self, worker: DBConnection, places: DictCursorResults) -> None: ...
 
  40 class AbstractPlacexRunner:
 
  41     """ Returns SQL commands for indexing of the placex table.
 
  43     SELECT_SQL = pysql.SQL('SELECT place_id FROM placex ')
 
  44     UPDATE_LINE = "(%s, %s::hstore, %s::hstore, %s::int, %s::jsonb)"
 
  46     def __init__(self, rank: int, analyzer: AbstractAnalyzer) -> None:
 
  48         self.analyzer = analyzer
 
  51     @functools.lru_cache(maxsize=1)
 
  52     def _index_sql(self, num_places: int) -> pysql.Composed:
 
  55                 SET indexed_status = 0, address = v.addr, token_info = v.ti,
 
  56                     name = v.name, linked_place_id = v.linked_place_id
 
  57                 FROM (VALUES {}) as v(id, name, addr, linked_place_id, ti)
 
  59             """).format(_mk_valuelist(AbstractPlacexRunner.UPDATE_LINE, num_places))
 
  62     def get_place_details(self, worker: DBConnection, ids: DictCursorResults) -> DictCursorResults:
 
  63         worker.perform("""SELECT place_id, extra.*
 
  64                           FROM placex, LATERAL placex_indexing_prepare(placex) as extra
 
  65                           WHERE place_id IN %s""",
 
  66                        (tuple((p[0] for p in ids)), ))
 
  71     def index_places(self, worker: DBConnection, places: DictCursorResults) -> None:
 
  72         values: List[Any] = []
 
  74             for field in ('place_id', 'name', 'address', 'linked_place_id'):
 
  75                 values.append(place[field])
 
  76             values.append(_analyze_place(place, self.analyzer))
 
  78         worker.perform(self._index_sql(len(places)), values)
 
  81 class RankRunner(AbstractPlacexRunner):
 
  82     """ Returns SQL commands for indexing one rank within the placex table.
 
  85     def name(self) -> str:
 
  86         return f"rank {self.rank}"
 
  88     def sql_count_objects(self) -> pysql.Composed:
 
  89         return pysql.SQL("""SELECT count(*) FROM placex
 
  90                             WHERE rank_address = {} and indexed_status > 0
 
  91                          """).format(pysql.Literal(self.rank))
 
  93     def sql_get_objects(self) -> pysql.Composed:
 
  94         return self.SELECT_SQL + pysql.SQL(
 
  95             """WHERE indexed_status > 0 and rank_address = {}
 
  96                ORDER BY geometry_sector
 
  97             """).format(pysql.Literal(self.rank))
 
 100 class BoundaryRunner(AbstractPlacexRunner):
 
 101     """ Returns SQL commands for indexing the administrative boundaries
 
 105     def name(self) -> str:
 
 106         return f"boundaries rank {self.rank}"
 
 108     def sql_count_objects(self) -> pysql.Composed:
 
 109         return pysql.SQL("""SELECT count(*) FROM placex
 
 110                             WHERE indexed_status > 0
 
 112                               AND class = 'boundary' and type = 'administrative'
 
 113                          """).format(pysql.Literal(self.rank))
 
 115     def sql_get_objects(self) -> pysql.Composed:
 
 116         return self.SELECT_SQL + pysql.SQL(
 
 117             """WHERE indexed_status > 0 and rank_search = {}
 
 118                      and class = 'boundary' and type = 'administrative'
 
 119                ORDER BY partition, admin_level
 
 120             """).format(pysql.Literal(self.rank))
 
 123 class InterpolationRunner:
 
 124     """ Returns SQL commands for indexing the address interpolation table
 
 125         location_property_osmline.
 
 128     def __init__(self, analyzer: AbstractAnalyzer) -> None:
 
 129         self.analyzer = analyzer
 
 132     def name(self) -> str:
 
 133         return "interpolation lines (location_property_osmline)"
 
 135     def sql_count_objects(self) -> str:
 
 136         return """SELECT count(*) FROM location_property_osmline
 
 137                   WHERE indexed_status > 0"""
 
 139     def sql_get_objects(self) -> str:
 
 140         return """SELECT place_id
 
 141                   FROM location_property_osmline
 
 142                   WHERE indexed_status > 0
 
 143                   ORDER BY geometry_sector"""
 
 146     def get_place_details(self, worker: DBConnection, ids: DictCursorResults) -> DictCursorResults:
 
 147         worker.perform("""SELECT place_id, get_interpolation_address(address, osm_id) as address
 
 148                           FROM location_property_osmline WHERE place_id IN %s""",
 
 149                        (tuple((p[0] for p in ids)), ))
 
 153     @functools.lru_cache(maxsize=1)
 
 154     def _index_sql(self, num_places: int) -> pysql.Composed:
 
 155         return pysql.SQL("""UPDATE location_property_osmline
 
 156                             SET indexed_status = 0, address = v.addr, token_info = v.ti
 
 157                             FROM (VALUES {}) as v(id, addr, ti)
 
 158                             WHERE place_id = v.id
 
 159                          """).format(_mk_valuelist("(%s, %s::hstore, %s::jsonb)", num_places))
 
 162     def index_places(self, worker: DBConnection, places: DictCursorResults) -> None:
 
 163         values: List[Any] = []
 
 165             values.extend((place[x] for x in ('place_id', 'address')))
 
 166             values.append(_analyze_place(place, self.analyzer))
 
 168         worker.perform(self._index_sql(len(places)), values)
 
 172 class PostcodeRunner(Runner):
 
 173     """ Provides the SQL commands for indexing the location_postcode table.
 
 176     def name(self) -> str:
 
 177         return "postcodes (location_postcode)"
 
 180     def sql_count_objects(self) -> str:
 
 181         return 'SELECT count(*) FROM location_postcode WHERE indexed_status > 0'
 
 184     def sql_get_objects(self) -> str:
 
 185         return """SELECT place_id FROM location_postcode
 
 186                   WHERE indexed_status > 0
 
 187                   ORDER BY country_code, postcode"""
 
 190     def get_place_details(self, worker: DBConnection, ids: DictCursorResults) -> DictCursorResults:
 
 193     def index_places(self, worker: DBConnection, places: DictCursorResults) -> None:
 
 194         worker.perform(pysql.SQL("""UPDATE location_postcode SET indexed_status = 0
 
 195                                     WHERE place_id IN ({})""")
 
 196                        .format(pysql.SQL(',').join((pysql.Literal(i[0]) for i in places))))