1 # SPDX-License-Identifier: GPL-3.0-or-later
 
   3 # This file is part of Nominatim. (https://nominatim.org)
 
   5 # Copyright (C) 2024 by the Nominatim developer community.
 
   6 # For a full list of authors see the git log.
 
   8 Mix-ins that provide the actual commands for the indexer for various indexing
 
  11 from typing import Any, Sequence
 
  13 from psycopg import sql as pysql
 
  14 from psycopg.abc import Query
 
  15 from psycopg.rows import DictRow
 
  16 from psycopg.types.json import Json
 
  18 from ..typing import Protocol
 
  19 from ..data.place_info import PlaceInfo
 
  20 from ..tokenizer.base import AbstractAnalyzer
 
  23 def _mk_valuelist(template: str, num: int) -> pysql.Composed:
 
  24     return pysql.SQL(',').join([pysql.SQL(template)] * num)
 
  27 def _analyze_place(place: DictRow, analyzer: AbstractAnalyzer) -> Json:
 
  28     return Json(analyzer.process_place(PlaceInfo(place)))
 
  31 class Runner(Protocol):
 
  32     def name(self) -> str: ...
 
  33     def sql_count_objects(self) -> Query: ...
 
  34     def sql_get_objects(self) -> Query: ...
 
  35     def index_places_query(self, batch_size: int) -> Query: ...
 
  36     def index_places_params(self, place: DictRow) -> Sequence[Any]: ...
 
  39 SELECT_SQL = pysql.SQL("""SELECT place_id, extra.*
 
  40                           FROM (SELECT * FROM placex {}) as px,
 
  41                           LATERAL placex_indexing_prepare(px) as extra """)
 
  42 UPDATE_LINE = "(%s, %s::hstore, %s::hstore, %s::int, %s::jsonb)"
 
  45 class AbstractPlacexRunner:
 
  46     """ Returns SQL commands for indexing of the placex table.
 
  49     def __init__(self, rank: int, analyzer: AbstractAnalyzer) -> None:
 
  51         self.analyzer = analyzer
 
  53     def index_places_query(self, batch_size: int) -> Query:
 
  56                 SET indexed_status = 0, address = v.addr, token_info = v.ti,
 
  57                     name = v.name, linked_place_id = v.linked_place_id
 
  58                 FROM (VALUES {}) as v(id, name, addr, linked_place_id, ti)
 
  60             """).format(_mk_valuelist(UPDATE_LINE, batch_size))
 
  62     def index_places_params(self, place: DictRow) -> Sequence[Any]:
 
  63         return (place['place_id'],
 
  66                 place['linked_place_id'],
 
  67                 _analyze_place(place, self.analyzer))
 
  70 class RankRunner(AbstractPlacexRunner):
 
  71     """ Returns SQL commands for indexing one rank within the placex table.
 
  74     def name(self) -> str:
 
  75         return f"rank {self.rank}"
 
  77     def sql_count_objects(self) -> pysql.Composed:
 
  78         return pysql.SQL("""SELECT count(*) FROM placex
 
  79                             WHERE rank_address = {} and indexed_status > 0
 
  80                          """).format(pysql.Literal(self.rank))
 
  82     def sql_get_objects(self) -> pysql.Composed:
 
  83         return SELECT_SQL.format(pysql.SQL(
 
  84                 """WHERE placex.indexed_status > 0 and placex.rank_address = {}
 
  85                    ORDER BY placex.geometry_sector
 
  86                 """).format(pysql.Literal(self.rank)))
 
  89 class BoundaryRunner(AbstractPlacexRunner):
 
  90     """ Returns SQL commands for indexing the administrative boundaries
 
  94     def name(self) -> str:
 
  95         return f"boundaries rank {self.rank}"
 
  97     def sql_count_objects(self) -> Query:
 
  98         return pysql.SQL("""SELECT count(*) FROM placex
 
  99                             WHERE indexed_status > 0
 
 101                               AND class = 'boundary' and type = 'administrative'
 
 102                          """).format(pysql.Literal(self.rank))
 
 104     def sql_get_objects(self) -> Query:
 
 105         return SELECT_SQL.format(pysql.SQL(
 
 106                 """WHERE placex.indexed_status > 0 and placex.rank_search = {}
 
 107                          and placex.class = 'boundary' and placex.type = 'administrative'
 
 108                    ORDER BY placex.partition, placex.admin_level
 
 109                 """).format(pysql.Literal(self.rank)))
 
 112 class InterpolationRunner:
 
 113     """ Returns SQL commands for indexing the address interpolation table
 
 114         location_property_osmline.
 
 117     def __init__(self, analyzer: AbstractAnalyzer) -> None:
 
 118         self.analyzer = analyzer
 
 120     def name(self) -> str:
 
 121         return "interpolation lines (location_property_osmline)"
 
 123     def sql_count_objects(self) -> Query:
 
 124         return """SELECT count(*) FROM location_property_osmline
 
 125                   WHERE indexed_status > 0"""
 
 127     def sql_get_objects(self) -> Query:
 
 128         return """SELECT place_id, get_interpolation_address(address, osm_id) as address
 
 129                   FROM location_property_osmline
 
 130                   WHERE indexed_status > 0
 
 131                   ORDER BY geometry_sector"""
 
 133     def index_places_query(self, batch_size: int) -> Query:
 
 134         return pysql.SQL("""UPDATE location_property_osmline
 
 135                             SET indexed_status = 0, address = v.addr, token_info = v.ti
 
 136                             FROM (VALUES {}) as v(id, addr, ti)
 
 137                             WHERE place_id = v.id
 
 138                          """).format(_mk_valuelist("(%s, %s::hstore, %s::jsonb)", batch_size))
 
 140     def index_places_params(self, place: DictRow) -> Sequence[Any]:
 
 141         return (place['place_id'], place['address'],
 
 142                 _analyze_place(place, self.analyzer))
 
 145 class PostcodeRunner(Runner):
 
 146     """ Provides the SQL commands for indexing the location_postcode table.
 
 149     def name(self) -> str:
 
 150         return "postcodes (location_postcode)"
 
 152     def sql_count_objects(self) -> Query:
 
 153         return 'SELECT count(*) FROM location_postcode WHERE indexed_status > 0'
 
 155     def sql_get_objects(self) -> Query:
 
 156         return """SELECT place_id FROM location_postcode
 
 157                   WHERE indexed_status > 0
 
 158                   ORDER BY country_code, postcode"""
 
 160     def index_places_query(self, batch_size: int) -> Query:
 
 161         return pysql.SQL("""UPDATE location_postcode SET indexed_status = 0
 
 162                                     WHERE place_id IN ({})""")\
 
 163                     .format(pysql.SQL(',').join((pysql.Placeholder() for _ in range(batch_size))))
 
 165     def index_places_params(self, place: DictRow) -> Sequence[Any]:
 
 166         return (place['place_id'], )