1 # SPDX-License-Identifier: GPL-3.0-or-later
 
   3 # This file is part of Nominatim. (https://nominatim.org)
 
   5 # Copyright (C) 2024 by the Nominatim developer community.
 
   6 # For a full list of authors see the git log.
 
   8 Implementation of place lookup by ID (doing many places at once).
 
  10 from typing import Optional, Callable, Type, Iterable, Tuple, Union
 
  11 from dataclasses import dataclass
 
  14 import sqlalchemy as sa
 
  16 from .typing import SaColumn, SaRow, SaSelect
 
  17 from .connection import SearchConnection
 
  18 from .logging import log
 
  19 from . import types as ntyp
 
  20 from . import results as nres
 
  22 RowFunc = Callable[[Optional[SaRow], Type[nres.BaseResultT]], Optional[nres.BaseResultT]]
 
  26     'MULTIPOINT': 'ST_MultiPoint',
 
  27     'LINESTRING': 'ST_LineString',
 
  28     'MULTILINESTRING': 'ST_MultiLineString',
 
  29     'POLYGON': 'ST_Polygon',
 
  30     'MULTIPOLYGON': 'ST_MultiPolygon',
 
  31     'GEOMETRYCOLLECTION': 'ST_GeometryCollection'
 
  37     """ Data class saving the SQL result for a single lookup.
 
  40     result: Optional[nres.SearchResult] = None
 
  43 class LookupCollector:
 
  44     """ Result collector for the simple lookup.
 
  46         Allows for lookup of multiple places simultaneously.
 
  49     def __init__(self, places: Iterable[ntyp.PlaceRef],
 
  50                  details: ntyp.LookupDetails) -> None:
 
  51         self.details = details
 
  52         self.lookups = [LookupTuple(p) for p in places]
 
  54     def get_results(self) -> nres.SearchResults:
 
  55         """ Return the list of results available.
 
  57         return nres.SearchResults(p.result for p in self.lookups if p.result is not None)
 
  59     async def add_rows_from_sql(self, conn: SearchConnection, sql: SaSelect,
 
  60                                 col: SaColumn, row_func: RowFunc[nres.SearchResult]) -> bool:
 
  61         if self.details.geometry_output:
 
  62             if self.details.geometry_simplification > 0.0:
 
  63                 col = sa.func.ST_SimplifyPreserveTopology(
 
  64                     col, self.details.geometry_simplification)
 
  66             if self.details.geometry_output & ntyp.GeometryFormat.GEOJSON:
 
  67                 sql = sql.add_columns(sa.func.ST_AsGeoJSON(col, 7).label('geometry_geojson'))
 
  68             if self.details.geometry_output & ntyp.GeometryFormat.TEXT:
 
  69                 sql = sql.add_columns(sa.func.ST_AsText(col).label('geometry_text'))
 
  70             if self.details.geometry_output & ntyp.GeometryFormat.KML:
 
  71                 sql = sql.add_columns(sa.func.ST_AsKML(col, 7).label('geometry_kml'))
 
  72             if self.details.geometry_output & ntyp.GeometryFormat.SVG:
 
  73                 sql = sql.add_columns(sa.func.ST_AsSVG(col, 0, 7).label('geometry_svg'))
 
  75         for row in await conn.execute(sql):
 
  76             result = row_func(row, nres.SearchResult)
 
  77             assert result is not None
 
  78             if hasattr(row, 'bbox'):
 
  79                 result.bbox = ntyp.Bbox.from_wkb(row.bbox)
 
  81             if self.lookups[row._idx].result is None:
 
  82                 self.lookups[row._idx].result = result
 
  84         return all(p.result is not None for p in self.lookups)
 
  86     def enumerate_free_place_ids(self) -> Iterable[Tuple[int, ntyp.PlaceID]]:
 
  87         return ((i, p.pid) for i, p in enumerate(self.lookups)
 
  88                 if p.result is None and isinstance(p.pid, ntyp.PlaceID))
 
  90     def enumerate_free_osm_ids(self) -> Iterable[Tuple[int, ntyp.OsmID]]:
 
  91         return ((i, p.pid) for i, p in enumerate(self.lookups)
 
  92                 if p.result is None and isinstance(p.pid, ntyp.OsmID))
 
  95 class DetailedCollector:
 
  96     """ Result collector for detailed lookup.
 
  98         Only one place at the time may be looked up.
 
 101     def __init__(self, place: ntyp.PlaceRef, with_geometry: bool) -> None:
 
 102         self.with_geometry = with_geometry
 
 104         self.result: Optional[nres.DetailedResult] = None
 
 106     async def add_rows_from_sql(self, conn: SearchConnection, sql: SaSelect,
 
 107                                 col: SaColumn, row_func: RowFunc[nres.DetailedResult]) -> bool:
 
 108         if self.with_geometry:
 
 109             sql = sql.add_columns(
 
 110                 sa.func.ST_AsGeoJSON(
 
 111                     sa.case((sa.func.ST_NPoints(col) > 5000,
 
 112                              sa.func.ST_SimplifyPreserveTopology(col, 0.0001)),
 
 113                             else_=col), 7).label('geometry_geojson'))
 
 115             sql = sql.add_columns(sa.func.ST_GeometryType(col).label('geometry_type'))
 
 117         for row in await conn.execute(sql):
 
 118             self.result = row_func(row, nres.DetailedResult)
 
 119             assert self.result is not None
 
 120             # add missing details
 
 121             if 'type' in self.result.geometry:
 
 122                 self.result.geometry['type'] = \
 
 123                     GEOMETRY_TYPE_MAP.get(self.result.geometry['type'],
 
 124                                           self.result.geometry['type'])
 
 125             indexed_date = getattr(row, 'indexed_date', None)
 
 126             if indexed_date is not None:
 
 127                 self.result.indexed_date = indexed_date.replace(tzinfo=dt.timezone.utc)
 
 134     def enumerate_free_place_ids(self) -> Iterable[Tuple[int, ntyp.PlaceID]]:
 
 135         if self.result is None and isinstance(self.place, ntyp.PlaceID):
 
 136             return [(0, self.place)]
 
 139     def enumerate_free_osm_ids(self) -> Iterable[Tuple[int, ntyp.OsmID]]:
 
 140         if self.result is None and isinstance(self.place, ntyp.OsmID):
 
 141             return [(0, self.place)]
 
 145 Collector = Union[LookupCollector, DetailedCollector]
 
 148 async def get_detailed_place(conn: SearchConnection, place: ntyp.PlaceRef,
 
 149                              details: ntyp.LookupDetails) -> Optional[nres.DetailedResult]:
 
 150     """ Retrieve a place with additional details from the database.
 
 152     log().function('get_detailed_place', place=place, details=details)
 
 154     if details.geometry_output and details.geometry_output != ntyp.GeometryFormat.GEOJSON:
 
 155         raise ValueError("lookup only supports geojosn polygon output.")
 
 157     collector = DetailedCollector(place,
 
 158                                   bool(details.geometry_output & ntyp.GeometryFormat.GEOJSON))
 
 160     for func in (find_in_placex, find_in_osmline, find_in_postcode, find_in_tiger):
 
 161         if await func(conn, collector):
 
 164     if collector.result is not None:
 
 165         await nres.add_result_details(conn, [collector.result], details)
 
 167     return collector.result
 
 170 async def get_places(conn: SearchConnection, places: Iterable[ntyp.PlaceRef],
 
 171                      details: ntyp.LookupDetails) -> nres.SearchResults:
 
 172     """ Retrieve a list of places as simple search results from the
 
 175     log().function('get_places', places=places, details=details)
 
 177     collector = LookupCollector(places, details)
 
 179     for func in (find_in_placex, find_in_osmline, find_in_postcode, find_in_tiger):
 
 180         if await func(conn, collector):
 
 183     results = collector.get_results()
 
 184     await nres.add_result_details(conn, results, details)
 
 189 async def find_in_placex(conn: SearchConnection, collector: Collector) -> bool:
 
 190     """ Search for the given places in the main placex table.
 
 192     log().section("Find in placex table")
 
 194     sql = sa.select(t.c.place_id, t.c.osm_type, t.c.osm_id, t.c.name,
 
 195                     t.c.class_, t.c.type, t.c.admin_level,
 
 196                     t.c.address, t.c.extratags,
 
 197                     t.c.housenumber, t.c.postcode, t.c.country_code,
 
 198                     t.c.importance, t.c.wikipedia, t.c.indexed_date,
 
 199                     t.c.parent_place_id, t.c.rank_address, t.c.rank_search,
 
 201                     t.c.geometry.ST_Expand(0).label('bbox'),
 
 204     for osm_type in ('N', 'W', 'R'):
 
 205         osm_ids = [{'i': i, 'oi': p.osm_id, 'oc': p.osm_class or ''}
 
 206                    for i, p in collector.enumerate_free_osm_ids()
 
 207                    if p.osm_type == osm_type]
 
 210             oid_tab = sa.func.JsonArrayEach(sa.type_coerce(osm_ids, sa.JSON))\
 
 211                         .table_valued(sa.column('value', type_=sa.JSON))
 
 212             psql = sql.add_columns(oid_tab.c.value['i'].as_integer().label('_idx'))\
 
 213                       .where(t.c.osm_type == osm_type)\
 
 214                       .where(t.c.osm_id == oid_tab.c.value['oi'].as_string().cast(sa.BigInteger))\
 
 215                       .where(sa.or_(oid_tab.c.value['oc'].as_string() == '',
 
 216                                     oid_tab.c.value['oc'].as_string() == t.c.class_))\
 
 217                       .order_by(t.c.class_)
 
 219             if await collector.add_rows_from_sql(conn, psql, t.c.geometry,
 
 220                                                  nres.create_from_placex_row):
 
 223     place_ids = [{'i': i, 'id': p.place_id}
 
 224                  for i, p in collector.enumerate_free_place_ids()]
 
 227         pid_tab = sa.func.JsonArrayEach(sa.type_coerce(place_ids, sa.JSON))\
 
 228                     .table_valued(sa.column('value', type_=sa.JSON))
 
 229         psql = sql.add_columns(pid_tab.c.value['i'].as_integer().label('_idx'))\
 
 230                   .where(t.c.place_id == pid_tab.c.value['id'].as_string().cast(sa.BigInteger))
 
 232         return await collector.add_rows_from_sql(conn, psql, t.c.geometry,
 
 233                                                  nres.create_from_placex_row)
 
 238 async def find_in_osmline(conn: SearchConnection, collector: Collector) -> bool:
 
 239     """ Search for the given places in the table for address interpolations.
 
 241         Return true when all places have been resolved.
 
 243     log().section("Find in interpolation table")
 
 245     sql = sa.select(t.c.place_id, t.c.osm_id, t.c.parent_place_id,
 
 246                     t.c.indexed_date, t.c.startnumber, t.c.endnumber,
 
 247                     t.c.step, t.c.address, t.c.postcode, t.c.country_code,
 
 248                     t.c.linegeo.ST_Centroid().label('centroid'))
 
 250     osm_ids = [{'i': i, 'oi': p.osm_id, 'oc': p.class_as_housenumber()}
 
 251                for i, p in collector.enumerate_free_osm_ids() if p.osm_type == 'W']
 
 254         oid_tab = sa.func.JsonArrayEach(sa.type_coerce(osm_ids, sa.JSON))\
 
 255                     .table_valued(sa.column('value', type_=sa.JSON))
 
 256         psql = sql.add_columns(oid_tab.c.value['i'].as_integer().label('_idx'))\
 
 257                   .where(t.c.osm_id == oid_tab.c.value['oi'].as_string().cast(sa.BigInteger))\
 
 258                   .order_by(sa.func.greatest(0,
 
 259                                              oid_tab.c.value['oc'].as_integer() - t.c.endnumber,
 
 260                                              t.c.startnumber - oid_tab.c.value['oc'].as_integer()))
 
 262         if await collector.add_rows_from_sql(conn, psql, t.c.linegeo,
 
 263                                              nres.create_from_osmline_row):
 
 266     place_ids = [{'i': i, 'id': p.place_id}
 
 267                  for i, p in collector.enumerate_free_place_ids()]
 
 270         pid_tab = sa.func.JsonArrayEach(sa.type_coerce(place_ids, sa.JSON))\
 
 271                     .table_valued(sa.column('value', type_=sa.JSON))
 
 272         psql = sql.add_columns(pid_tab.c.value['i'].label('_idx'))\
 
 273                   .where(t.c.place_id == pid_tab.c.value['id'].as_string().cast(sa.BigInteger))
 
 275         return await collector.add_rows_from_sql(conn, psql, t.c.linegeo,
 
 276                                                  nres.create_from_osmline_row)
 
 281 async def find_in_postcode(conn: SearchConnection, collector: Collector) -> bool:
 
 282     """ Search for the given places in the postcode table.
 
 284         Return true when all places have been resolved.
 
 286     log().section("Find in postcode table")
 
 288     place_ids = [{'i': i, 'id': p.place_id}
 
 289                  for i, p in collector.enumerate_free_place_ids()]
 
 292         pid_tab = sa.func.JsonArrayEach(sa.type_coerce(place_ids, sa.JSON))\
 
 293                     .table_valued(sa.column('value', type_=sa.JSON))
 
 295         sql = sa.select(pid_tab.c.value['i'].as_integer().label('_idx'),
 
 296                         t.c.place_id, t.c.parent_place_id,
 
 297                         t.c.rank_search, t.c.rank_address,
 
 298                         t.c.indexed_date, t.c.postcode, t.c.country_code,
 
 299                         t.c.geometry.label('centroid'))\
 
 300                 .where(t.c.place_id == pid_tab.c.value['id'].as_string().cast(sa.BigInteger))
 
 302         return await collector.add_rows_from_sql(conn, sql, t.c.geometry,
 
 303                                                  nres.create_from_postcode_row)
 
 308 async def find_in_tiger(conn: SearchConnection, collector: Collector) -> bool:
 
 309     """ Search for the given places in the TIGER address table.
 
 311         Return true when all places have been resolved.
 
 313     log().section("Find in tiger table")
 
 315     place_ids = [{'i': i, 'id': p.place_id}
 
 316                  for i, p in collector.enumerate_free_place_ids()]
 
 319         pid_tab = sa.func.JsonArrayEach(sa.type_coerce(place_ids, sa.JSON))\
 
 320                     .table_valued(sa.column('value', type_=sa.JSON))
 
 322         parent = conn.t.placex
 
 323         sql = sa.select(pid_tab.c.value['i'].as_integer().label('_idx'),
 
 324                         t.c.place_id, t.c.parent_place_id,
 
 325                         parent.c.osm_type, parent.c.osm_id,
 
 326                         t.c.startnumber, t.c.endnumber, t.c.step,
 
 328                         t.c.linegeo.ST_Centroid().label('centroid'))\
 
 329                 .join(parent, t.c.parent_place_id == parent.c.place_id, isouter=True)\
 
 330                 .where(t.c.place_id == pid_tab.c.value['id'].as_string().cast(sa.BigInteger))
 
 332         return await collector.add_rows_from_sql(conn, sql, t.c.linegeo,
 
 333                                                  nres.create_from_tiger_row)