1 # SPDX-License-Identifier: GPL-3.0-or-later
 
   3 # This file is part of Nominatim. (https://nominatim.org)
 
   5 # Copyright (C) 2023 by the Nominatim developer community.
 
   6 # For a full list of authors see the git log.
 
   8 Implementation of place lookup by ID.
 
  10 from typing import Optional, Callable, Tuple, Type
 
  13 import sqlalchemy as sa
 
  15 from nominatim.typing import SaColumn, SaRow, SaSelect
 
  16 from nominatim.api.connection import SearchConnection
 
  17 import nominatim.api.types as ntyp
 
  18 import nominatim.api.results as nres
 
  19 from nominatim.api.logging import log
 
  21 RowFunc = Callable[[Optional[SaRow], Type[nres.BaseResultT]], Optional[nres.BaseResultT]]
 
  23 GeomFunc = Callable[[SaSelect, SaColumn], SaSelect]
 
  27 async def find_in_placex(conn: SearchConnection, place: ntyp.PlaceRef,
 
  28                          add_geometries: GeomFunc) -> Optional[SaRow]:
 
  29     """ Search for the given place in the placex table and return the
 
  32     log().section("Find in placex table")
 
  34     sql = sa.select(t.c.place_id, t.c.osm_type, t.c.osm_id, t.c.name,
 
  35                     t.c.class_, t.c.type, t.c.admin_level,
 
  36                     t.c.address, t.c.extratags,
 
  37                     t.c.housenumber, t.c.postcode, t.c.country_code,
 
  38                     t.c.importance, t.c.wikipedia, t.c.indexed_date,
 
  39                     t.c.parent_place_id, t.c.rank_address, t.c.rank_search,
 
  41                     t.c.geometry.ST_Expand(0).label('bbox'),
 
  44     if isinstance(place, ntyp.PlaceID):
 
  45         sql = sql.where(t.c.place_id == place.place_id)
 
  46     elif isinstance(place, ntyp.OsmID):
 
  47         sql = sql.where(t.c.osm_type == place.osm_type)\
 
  48                  .where(t.c.osm_id == place.osm_id)
 
  50             sql = sql.where(t.c.class_ == place.osm_class)
 
  52             sql = sql.order_by(t.c.class_)
 
  57     return (await conn.execute(add_geometries(sql, t.c.geometry))).one_or_none()
 
  60 async def find_in_osmline(conn: SearchConnection, place: ntyp.PlaceRef,
 
  61                           add_geometries: GeomFunc) -> Optional[SaRow]:
 
  62     """ Search for the given place in the osmline table and return the
 
  65     log().section("Find in interpolation table")
 
  67     sql = sa.select(t.c.place_id, t.c.osm_id, t.c.parent_place_id,
 
  68                     t.c.indexed_date, t.c.startnumber, t.c.endnumber,
 
  69                     t.c.step, t.c.address, t.c.postcode, t.c.country_code,
 
  70                     t.c.linegeo.ST_Centroid().label('centroid'))
 
  72     if isinstance(place, ntyp.PlaceID):
 
  73         sql = sql.where(t.c.place_id == place.place_id)
 
  74     elif isinstance(place, ntyp.OsmID) and place.osm_type == 'W':
 
  75         # There may be multiple interpolations for a single way.
 
  76         # If 'class' contains a number, return the one that belongs to that number.
 
  77         sql = sql.where(t.c.osm_id == place.osm_id).limit(1)
 
  78         if place.osm_class and place.osm_class.isdigit():
 
  79             sql = sql.order_by(sa.func.greatest(0,
 
  80                                                 int(place.osm_class) - t.c.endnumber,
 
  81                                                 t.c.startnumber - int(place.osm_class)))
 
  85     return (await conn.execute(add_geometries(sql, t.c.linegeo))).one_or_none()
 
  88 async def find_in_tiger(conn: SearchConnection, place: ntyp.PlaceRef,
 
  89                         add_geometries: GeomFunc) -> Optional[SaRow]:
 
  90     """ Search for the given place in the table of Tiger addresses and return
 
  91         the base information. Only lookup by place ID is supported.
 
  93     if not isinstance(place, ntyp.PlaceID):
 
  96     log().section("Find in TIGER table")
 
  98     parent = conn.t.placex
 
  99     sql = sa.select(t.c.place_id, t.c.parent_place_id,
 
 100                     parent.c.osm_type, parent.c.osm_id,
 
 101                     t.c.startnumber, t.c.endnumber, t.c.step,
 
 103                     t.c.linegeo.ST_Centroid().label('centroid'))\
 
 104             .where(t.c.place_id == place.place_id)\
 
 105             .join(parent, t.c.parent_place_id == parent.c.place_id, isouter=True)
 
 107     return (await conn.execute(add_geometries(sql, t.c.linegeo))).one_or_none()
 
 110 async def find_in_postcode(conn: SearchConnection, place: ntyp.PlaceRef,
 
 111                            add_geometries: GeomFunc) -> Optional[SaRow]:
 
 112     """ Search for the given place in the postcode table and return the
 
 113         base information. Only lookup by place ID is supported.
 
 115     if not isinstance(place, ntyp.PlaceID):
 
 118     log().section("Find in postcode table")
 
 120     sql = sa.select(t.c.place_id, t.c.parent_place_id,
 
 121                     t.c.rank_search, t.c.rank_address,
 
 122                     t.c.indexed_date, t.c.postcode, t.c.country_code,
 
 123                     t.c.geometry.label('centroid')) \
 
 124             .where(t.c.place_id == place.place_id)
 
 126     return (await conn.execute(add_geometries(sql, t.c.geometry))).one_or_none()
 
 129 async def find_in_all_tables(conn: SearchConnection, place: ntyp.PlaceRef,
 
 130                              add_geometries: GeomFunc
 
 131                             ) -> Tuple[Optional[SaRow], RowFunc[nres.BaseResultT]]:
 
 132     """ Search for the given place in all data tables
 
 133         and return the base information.
 
 135     row = await find_in_placex(conn, place, add_geometries)
 
 136     log().var_dump('Result (placex)', row)
 
 138         return row, nres.create_from_placex_row
 
 140     row = await find_in_osmline(conn, place, add_geometries)
 
 141     log().var_dump('Result (osmline)', row)
 
 143         return row, nres.create_from_osmline_row
 
 145     row = await find_in_postcode(conn, place, add_geometries)
 
 146     log().var_dump('Result (postcode)', row)
 
 148         return row, nres.create_from_postcode_row
 
 150     row = await find_in_tiger(conn, place, add_geometries)
 
 151     log().var_dump('Result (tiger)', row)
 
 152     return row, nres.create_from_tiger_row
 
 155 async def get_detailed_place(conn: SearchConnection, place: ntyp.PlaceRef,
 
 156                              details: ntyp.LookupDetails) -> Optional[nres.DetailedResult]:
 
 157     """ Retrieve a place with additional details from the database.
 
 159     log().function('get_detailed_place', place=place, details=details)
 
 161     if details.geometry_output and details.geometry_output != ntyp.GeometryFormat.GEOJSON:
 
 162         raise ValueError("lookup only supports geojosn polygon output.")
 
 164     if details.geometry_output & ntyp.GeometryFormat.GEOJSON:
 
 165         def _add_geometry(sql: SaSelect, column: SaColumn) -> SaSelect:
 
 166             return sql.add_columns(sa.func.ST_AsGeoJSON(
 
 167                                     sa.case((sa.func.ST_NPoints(column) > 5000,
 
 168                                              sa.func.ST_SimplifyPreserveTopology(column, 0.0001)),
 
 169                                             else_=column), 7).label('geometry_geojson'))
 
 171         def _add_geometry(sql: SaSelect, column: SaColumn) -> SaSelect:
 
 172             return sql.add_columns(sa.func.ST_GeometryType(column).label('geometry_type'))
 
 174     row_func: RowFunc[nres.DetailedResult]
 
 175     row, row_func = await find_in_all_tables(conn, place, _add_geometry)
 
 180     result = row_func(row, nres.DetailedResult)
 
 181     assert result is not None
 
 183     # add missing details
 
 184     assert result is not None
 
 185     if 'type' in result.geometry:
 
 186         result.geometry['type'] = GEOMETRY_TYPE_MAP.get(result.geometry['type'],
 
 187                                                         result.geometry['type'])
 
 188     indexed_date = getattr(row, 'indexed_date', None)
 
 189     if indexed_date is not None:
 
 190         result.indexed_date = indexed_date.replace(tzinfo=dt.timezone.utc)
 
 192     await nres.add_result_details(conn, [result], details)
 
 197 async def get_simple_place(conn: SearchConnection, place: ntyp.PlaceRef,
 
 198                            details: ntyp.LookupDetails) -> Optional[nres.SearchResult]:
 
 199     """ Retrieve a place as a simple search result from the database.
 
 201     log().function('get_simple_place', place=place, details=details)
 
 203     def _add_geometry(sql: SaSelect, col: SaColumn) -> SaSelect:
 
 204         if not details.geometry_output:
 
 209         if details.geometry_simplification > 0.0:
 
 210             col = sa.func.ST_SimplifyPreserveTopology(col, details.geometry_simplification)
 
 212         if details.geometry_output & ntyp.GeometryFormat.GEOJSON:
 
 213             out.append(sa.func.ST_AsGeoJSON(col, 7).label('geometry_geojson'))
 
 214         if details.geometry_output & ntyp.GeometryFormat.TEXT:
 
 215             out.append(sa.func.ST_AsText(col).label('geometry_text'))
 
 216         if details.geometry_output & ntyp.GeometryFormat.KML:
 
 217             out.append(sa.func.ST_AsKML(col, 7).label('geometry_kml'))
 
 218         if details.geometry_output & ntyp.GeometryFormat.SVG:
 
 219             out.append(sa.func.ST_AsSVG(col, 0, 7).label('geometry_svg'))
 
 221         return sql.add_columns(*out)
 
 224     row_func: RowFunc[nres.SearchResult]
 
 225     row, row_func = await find_in_all_tables(conn, place, _add_geometry)
 
 230     result = row_func(row, nres.SearchResult)
 
 231     assert result is not None
 
 233     # add missing details
 
 234     assert result is not None
 
 235     if hasattr(row, 'bbox'):
 
 236         result.bbox = ntyp.Bbox.from_wkb(row.bbox)
 
 238     await nres.add_result_details(conn, [result], details)
 
 243 GEOMETRY_TYPE_MAP = {
 
 245     'MULTIPOINT': 'ST_MultiPoint',
 
 246     'LINESTRING': 'ST_LineString',
 
 247     'MULTILINESTRING': 'ST_MultiLineString',
 
 248     'POLYGON': 'ST_Polygon',
 
 249     'MULTIPOLYGON': 'ST_MultiPolygon',
 
 250     'GEOMETRYCOLLECTION': 'ST_GeometryCollection'