1 # SPDX-License-Identifier: GPL-3.0-or-later
 
   3 # This file is part of Nominatim. (https://nominatim.org)
 
   5 # Copyright (C) 2024 by the Nominatim developer community.
 
   6 # For a full list of authors see the git log.
 
   8 Implementation of place lookup by ID (doing many places at once).
 
  10 from typing import Optional, Callable, Type, Iterable, Tuple, Union
 
  11 from dataclasses import dataclass
 
  14 import sqlalchemy as sa
 
  16 from .typing import SaColumn, SaRow, SaSelect
 
  17 from .connection import SearchConnection
 
  18 from .logging import log
 
  19 from . import types as ntyp
 
  20 from . import results as nres
 
  22 RowFunc = Callable[[SaRow, Type[nres.BaseResultT]], nres.BaseResultT]
 
  26     'MULTIPOINT': 'ST_MultiPoint',
 
  27     'LINESTRING': 'ST_LineString',
 
  28     'MULTILINESTRING': 'ST_MultiLineString',
 
  29     'POLYGON': 'ST_Polygon',
 
  30     'MULTIPOLYGON': 'ST_MultiPolygon',
 
  31     'GEOMETRYCOLLECTION': 'ST_GeometryCollection'
 
  37     """ Data class saving the SQL result for a single lookup.
 
  40     result: Optional[nres.SearchResult] = None
 
  43 class LookupCollector:
 
  44     """ Result collector for the simple lookup.
 
  46         Allows for lookup of multiple places simultaneously.
 
  49     def __init__(self, places: Iterable[ntyp.PlaceRef],
 
  50                  details: ntyp.LookupDetails) -> None:
 
  51         self.details = details
 
  52         self.lookups = [LookupTuple(p) for p in places]
 
  54     def get_results(self) -> nres.SearchResults:
 
  55         """ Return the list of results available.
 
  57         return nres.SearchResults(p.result for p in self.lookups if p.result is not None)
 
  59     async def add_rows_from_sql(self, conn: SearchConnection, sql: SaSelect,
 
  60                                 col: SaColumn, row_func: RowFunc[nres.SearchResult]) -> bool:
 
  61         if self.details.geometry_output:
 
  62             if self.details.geometry_simplification > 0.0:
 
  63                 col = sa.func.ST_SimplifyPreserveTopology(
 
  64                     col, self.details.geometry_simplification)
 
  66             if self.details.geometry_output & ntyp.GeometryFormat.GEOJSON:
 
  67                 sql = sql.add_columns(sa.func.ST_AsGeoJSON(col, 7).label('geometry_geojson'))
 
  68             if self.details.geometry_output & ntyp.GeometryFormat.TEXT:
 
  69                 sql = sql.add_columns(sa.func.ST_AsText(col).label('geometry_text'))
 
  70             if self.details.geometry_output & ntyp.GeometryFormat.KML:
 
  71                 sql = sql.add_columns(sa.func.ST_AsKML(col, 7).label('geometry_kml'))
 
  72             if self.details.geometry_output & ntyp.GeometryFormat.SVG:
 
  73                 sql = sql.add_columns(sa.func.ST_AsSVG(col, 0, 7).label('geometry_svg'))
 
  75         for row in await conn.execute(sql):
 
  76             result = row_func(row, nres.SearchResult)
 
  77             if hasattr(row, 'bbox'):
 
  78                 result.bbox = ntyp.Bbox.from_wkb(row.bbox)
 
  80             if self.lookups[row._idx].result is None:
 
  81                 self.lookups[row._idx].result = result
 
  83         return all(p.result is not None for p in self.lookups)
 
  85     def enumerate_free_place_ids(self) -> Iterable[Tuple[int, ntyp.PlaceID]]:
 
  86         return ((i, p.pid) for i, p in enumerate(self.lookups)
 
  87                 if p.result is None and isinstance(p.pid, ntyp.PlaceID))
 
  89     def enumerate_free_osm_ids(self) -> Iterable[Tuple[int, ntyp.OsmID]]:
 
  90         return ((i, p.pid) for i, p in enumerate(self.lookups)
 
  91                 if p.result is None and isinstance(p.pid, ntyp.OsmID))
 
  94 class DetailedCollector:
 
  95     """ Result collector for detailed lookup.
 
  97         Only one place at the time may be looked up.
 
 100     def __init__(self, place: ntyp.PlaceRef, with_geometry: bool) -> None:
 
 101         self.with_geometry = with_geometry
 
 103         self.result: Optional[nres.DetailedResult] = None
 
 105     async def add_rows_from_sql(self, conn: SearchConnection, sql: SaSelect,
 
 106                                 col: SaColumn, row_func: RowFunc[nres.DetailedResult]) -> bool:
 
 107         if self.with_geometry:
 
 108             sql = sql.add_columns(
 
 109                 sa.func.ST_AsGeoJSON(
 
 110                     sa.case((sa.func.ST_NPoints(col) > 5000,
 
 111                              sa.func.ST_SimplifyPreserveTopology(col, 0.0001)),
 
 112                             else_=col), 7).label('geometry_geojson'))
 
 114             sql = sql.add_columns(sa.func.ST_GeometryType(col).label('geometry_type'))
 
 116         for row in await conn.execute(sql):
 
 117             self.result = row_func(row, nres.DetailedResult)
 
 118             # add missing details
 
 119             if 'type' in self.result.geometry:
 
 120                 self.result.geometry['type'] = \
 
 121                     GEOMETRY_TYPE_MAP.get(self.result.geometry['type'],
 
 122                                           self.result.geometry['type'])
 
 123             indexed_date = getattr(row, 'indexed_date', None)
 
 124             if indexed_date is not None:
 
 125                 self.result.indexed_date = indexed_date.replace(tzinfo=dt.timezone.utc)
 
 132     def enumerate_free_place_ids(self) -> Iterable[Tuple[int, ntyp.PlaceID]]:
 
 133         if self.result is None and isinstance(self.place, ntyp.PlaceID):
 
 134             return [(0, self.place)]
 
 137     def enumerate_free_osm_ids(self) -> Iterable[Tuple[int, ntyp.OsmID]]:
 
 138         if self.result is None and isinstance(self.place, ntyp.OsmID):
 
 139             return [(0, self.place)]
 
 143 Collector = Union[LookupCollector, DetailedCollector]
 
 146 async def get_detailed_place(conn: SearchConnection, place: ntyp.PlaceRef,
 
 147                              details: ntyp.LookupDetails) -> Optional[nres.DetailedResult]:
 
 148     """ Retrieve a place with additional details from the database.
 
 150     log().function('get_detailed_place', place=place, details=details)
 
 152     if details.geometry_output and details.geometry_output != ntyp.GeometryFormat.GEOJSON:
 
 153         raise ValueError("lookup only supports geojosn polygon output.")
 
 155     collector = DetailedCollector(place,
 
 156                                   bool(details.geometry_output & ntyp.GeometryFormat.GEOJSON))
 
 158     for func in (find_in_placex, find_in_osmline, find_in_postcode, find_in_tiger):
 
 159         if await func(conn, collector):
 
 162     if collector.result is not None:
 
 163         await nres.add_result_details(conn, [collector.result], details)
 
 165     return collector.result
 
 168 async def get_places(conn: SearchConnection, places: Iterable[ntyp.PlaceRef],
 
 169                      details: ntyp.LookupDetails) -> nres.SearchResults:
 
 170     """ Retrieve a list of places as simple search results from the
 
 173     log().function('get_places', places=places, details=details)
 
 175     collector = LookupCollector(places, details)
 
 177     for func in (find_in_placex, find_in_osmline, find_in_postcode, find_in_tiger):
 
 178         if await func(conn, collector):
 
 181     results = collector.get_results()
 
 182     await nres.add_result_details(conn, results, details)
 
 187 async def find_in_placex(conn: SearchConnection, collector: Collector) -> bool:
 
 188     """ Search for the given places in the main placex table.
 
 190     log().section("Find in placex table")
 
 192     sql = sa.select(t.c.place_id, t.c.osm_type, t.c.osm_id, t.c.name,
 
 193                     t.c.class_, t.c.type, t.c.admin_level,
 
 194                     t.c.address, t.c.extratags,
 
 195                     t.c.housenumber, t.c.postcode, t.c.country_code,
 
 196                     t.c.importance, t.c.wikipedia, t.c.indexed_date,
 
 197                     t.c.parent_place_id, t.c.rank_address, t.c.rank_search,
 
 199                     t.c.geometry.ST_Expand(0).label('bbox'),
 
 202     for osm_type in ('N', 'W', 'R'):
 
 203         osm_ids = [{'i': i, 'oi': p.osm_id, 'oc': p.osm_class or ''}
 
 204                    for i, p in collector.enumerate_free_osm_ids()
 
 205                    if p.osm_type == osm_type]
 
 208             oid_tab = sa.func.JsonArrayEach(sa.type_coerce(osm_ids, sa.JSON))\
 
 209                         .table_valued(sa.column('value', type_=sa.JSON))
 
 210             psql = sql.add_columns(oid_tab.c.value['i'].as_integer().label('_idx'))\
 
 211                       .where(t.c.osm_type == osm_type)\
 
 212                       .where(t.c.osm_id == oid_tab.c.value['oi'].as_string().cast(sa.BigInteger))\
 
 213                       .where(sa.or_(oid_tab.c.value['oc'].as_string() == '',
 
 214                                     oid_tab.c.value['oc'].as_string() == t.c.class_))\
 
 215                       .order_by(t.c.class_)
 
 217             if await collector.add_rows_from_sql(conn, psql, t.c.geometry,
 
 218                                                  nres.create_from_placex_row):
 
 221     place_ids = [{'i': i, 'id': p.place_id}
 
 222                  for i, p in collector.enumerate_free_place_ids()]
 
 225         pid_tab = sa.func.JsonArrayEach(sa.type_coerce(place_ids, sa.JSON))\
 
 226                     .table_valued(sa.column('value', type_=sa.JSON))
 
 227         psql = sql.add_columns(pid_tab.c.value['i'].as_integer().label('_idx'))\
 
 228                   .where(t.c.place_id == pid_tab.c.value['id'].as_string().cast(sa.BigInteger))
 
 230         return await collector.add_rows_from_sql(conn, psql, t.c.geometry,
 
 231                                                  nres.create_from_placex_row)
 
 236 async def find_in_osmline(conn: SearchConnection, collector: Collector) -> bool:
 
 237     """ Search for the given places in the table for address interpolations.
 
 239         Return true when all places have been resolved.
 
 241     log().section("Find in interpolation table")
 
 243     sql = sa.select(t.c.place_id, t.c.osm_id, t.c.parent_place_id,
 
 244                     t.c.indexed_date, t.c.startnumber, t.c.endnumber,
 
 245                     t.c.step, t.c.address, t.c.postcode, t.c.country_code,
 
 246                     t.c.linegeo.ST_Centroid().label('centroid'))
 
 248     osm_ids = [{'i': i, 'oi': p.osm_id, 'oc': p.class_as_housenumber()}
 
 249                for i, p in collector.enumerate_free_osm_ids() if p.osm_type == 'W']
 
 252         oid_tab = sa.func.JsonArrayEach(sa.type_coerce(osm_ids, sa.JSON))\
 
 253                     .table_valued(sa.column('value', type_=sa.JSON))
 
 254         psql = sql.add_columns(oid_tab.c.value['i'].as_integer().label('_idx'))\
 
 255                   .where(t.c.osm_id == oid_tab.c.value['oi'].as_string().cast(sa.BigInteger))\
 
 256                   .order_by(sa.func.greatest(0,
 
 257                                              oid_tab.c.value['oc'].as_integer() - t.c.endnumber,
 
 258                                              t.c.startnumber - oid_tab.c.value['oc'].as_integer()))
 
 260         if await collector.add_rows_from_sql(conn, psql, t.c.linegeo,
 
 261                                              nres.create_from_osmline_row):
 
 264     place_ids = [{'i': i, 'id': p.place_id}
 
 265                  for i, p in collector.enumerate_free_place_ids()]
 
 268         pid_tab = sa.func.JsonArrayEach(sa.type_coerce(place_ids, sa.JSON))\
 
 269                     .table_valued(sa.column('value', type_=sa.JSON))
 
 270         psql = sql.add_columns(pid_tab.c.value['i'].label('_idx'))\
 
 271                   .where(t.c.place_id == pid_tab.c.value['id'].as_string().cast(sa.BigInteger))
 
 273         return await collector.add_rows_from_sql(conn, psql, t.c.linegeo,
 
 274                                                  nres.create_from_osmline_row)
 
 279 async def find_in_postcode(conn: SearchConnection, collector: Collector) -> bool:
 
 280     """ Search for the given places in the postcode table.
 
 282         Return true when all places have been resolved.
 
 284     log().section("Find in postcode table")
 
 286     place_ids = [{'i': i, 'id': p.place_id}
 
 287                  for i, p in collector.enumerate_free_place_ids()]
 
 290         pid_tab = sa.func.JsonArrayEach(sa.type_coerce(place_ids, sa.JSON))\
 
 291                     .table_valued(sa.column('value', type_=sa.JSON))
 
 293         sql = sa.select(pid_tab.c.value['i'].as_integer().label('_idx'),
 
 294                         t.c.place_id, t.c.parent_place_id,
 
 295                         t.c.rank_search, t.c.rank_address,
 
 296                         t.c.indexed_date, t.c.postcode, t.c.country_code,
 
 297                         t.c.geometry.label('centroid'))\
 
 298                 .where(t.c.place_id == pid_tab.c.value['id'].as_string().cast(sa.BigInteger))
 
 300         return await collector.add_rows_from_sql(conn, sql, t.c.geometry,
 
 301                                                  nres.create_from_postcode_row)
 
 306 async def find_in_tiger(conn: SearchConnection, collector: Collector) -> bool:
 
 307     """ Search for the given places in the TIGER address table.
 
 309         Return true when all places have been resolved.
 
 311     log().section("Find in tiger table")
 
 313     place_ids = [{'i': i, 'id': p.place_id}
 
 314                  for i, p in collector.enumerate_free_place_ids()]
 
 317         pid_tab = sa.func.JsonArrayEach(sa.type_coerce(place_ids, sa.JSON))\
 
 318                     .table_valued(sa.column('value', type_=sa.JSON))
 
 320         parent = conn.t.placex
 
 321         sql = sa.select(pid_tab.c.value['i'].as_integer().label('_idx'),
 
 322                         t.c.place_id, t.c.parent_place_id,
 
 323                         parent.c.osm_type, parent.c.osm_id,
 
 324                         t.c.startnumber, t.c.endnumber, t.c.step,
 
 326                         t.c.linegeo.ST_Centroid().label('centroid'))\
 
 327                 .join(parent, t.c.parent_place_id == parent.c.place_id, isouter=True)\
 
 328                 .where(t.c.place_id == pid_tab.c.value['id'].as_string().cast(sa.BigInteger))
 
 330         return await collector.add_rows_from_sql(conn, sql, t.c.linegeo,
 
 331                                                  nres.create_from_tiger_row)