1 # SPDX-License-Identifier: GPL-3.0-or-later
 
   3 # This file is part of Nominatim. (https://nominatim.org)
 
   5 # Copyright (C) 2023 by the Nominatim developer community.
 
   6 # For a full list of authors see the git log.
 
   8 Implementation of reverse geocoding.
 
  10 from typing import Optional, List, Callable, Type, Tuple, Dict, Any
 
  12 import sqlalchemy as sa
 
  14 from nominatim.typing import SaColumn, SaSelect, SaFromClause, SaLabel, SaRow,\
 
  15                              SaBind, SaLambdaSelect
 
  16 from nominatim.api.connection import SearchConnection
 
  17 import nominatim.api.results as nres
 
  18 from nominatim.api.logging import log
 
  19 from nominatim.api.types import AnyPoint, DataLayer, ReverseDetails, GeometryFormat, Bbox
 
  20 from nominatim.db.sqlalchemy_types import Geometry
 
  21 import nominatim.db.sqlalchemy_functions as snfn
 
  23 # In SQLAlchemy expression which compare with NULL need to be expressed with
 
  25 # pylint: disable=singleton-comparison
 
  27 RowFunc = Callable[[Optional[SaRow], Type[nres.ReverseResult]], Optional[nres.ReverseResult]]
 
  29 WKT_PARAM: SaBind = sa.bindparam('wkt', type_=Geometry)
 
  30 MAX_RANK_PARAM: SaBind = sa.bindparam('max_rank')
 
  32 def no_index(expr: SaColumn) -> SaColumn:
 
  33     """ Wrap the given expression, so that the query planner will
 
  34         refrain from using the expression for index lookup.
 
  36     return sa.func.coalesce(sa.null(), expr) # pylint: disable=not-callable
 
  39 def _select_from_placex(t: SaFromClause, use_wkt: bool = True) -> SaSelect:
 
  40     """ Create a select statement with the columns relevant for reverse
 
  44         distance = t.c.distance
 
  45         centroid = t.c.centroid
 
  47         distance = t.c.geometry.ST_Distance(WKT_PARAM)
 
  48         centroid = sa.case((t.c.geometry.is_line_like(), t.c.geometry.ST_ClosestPoint(WKT_PARAM)),
 
  49                            else_=t.c.centroid).label('centroid')
 
  52     return sa.select(t.c.place_id, t.c.osm_type, t.c.osm_id, t.c.name,
 
  54                      t.c.address, t.c.extratags,
 
  55                      t.c.housenumber, t.c.postcode, t.c.country_code,
 
  56                      t.c.importance, t.c.wikipedia,
 
  57                      t.c.parent_place_id, t.c.rank_address, t.c.rank_search,
 
  59                      distance.label('distance'),
 
  60                      t.c.geometry.ST_Expand(0).label('bbox'))
 
  63 def _interpolated_housenumber(table: SaFromClause) -> SaLabel:
 
  64     return sa.cast(table.c.startnumber
 
  65                     + sa.func.round(((table.c.endnumber - table.c.startnumber) * table.c.position)
 
  66                                     / table.c.step) * table.c.step,
 
  67                    sa.Integer).label('housenumber')
 
  70 def _interpolated_position(table: SaFromClause) -> SaLabel:
 
  71     fac = sa.cast(table.c.step, sa.Float) / (table.c.endnumber - table.c.startnumber)
 
  72     rounded_pos = sa.func.round(table.c.position / fac) * fac
 
  74              (table.c.endnumber == table.c.startnumber, table.c.linegeo.ST_Centroid()),
 
  75               else_=table.c.linegeo.ST_LineInterpolatePoint(rounded_pos)).label('centroid')
 
  78 def _locate_interpolation(table: SaFromClause) -> SaLabel:
 
  79     """ Given a position, locate the closest point on the line.
 
  81     return sa.case((table.c.linegeo.is_line_like(),
 
  82                     table.c.linegeo.ST_LineLocatePoint(WKT_PARAM)),
 
  83                    else_=0).label('position')
 
  86 def _is_address_point(table: SaFromClause) -> SaColumn:
 
  87     return sa.and_(table.c.rank_address == 30,
 
  88                    sa.or_(table.c.housenumber != None,
 
  89                           table.c.name.has_key('housename')))
 
  92 def _get_closest(*rows: Optional[SaRow]) -> Optional[SaRow]:
 
  93     return min(rows, key=lambda row: 1000 if row is None else row.distance)
 
  96 class ReverseGeocoder:
 
  97     """ Class implementing the logic for looking up a place from a
 
 101     def __init__(self, conn: SearchConnection, params: ReverseDetails) -> None:
 
 105         self.bind_params: Dict[str, Any] = {'max_rank': params.max_rank}
 
 109     def max_rank(self) -> int:
 
 110         """ Return the maximum configured rank.
 
 112         return self.params.max_rank
 
 115     def has_geometries(self) -> bool:
 
 116         """ Check if any geometries are requested.
 
 118         return bool(self.params.geometry_output)
 
 121     def layer_enabled(self, *layer: DataLayer) -> bool:
 
 122         """ Return true when any of the given layer types are requested.
 
 124         return any(self.params.layers & l for l in layer)
 
 127     def layer_disabled(self, *layer: DataLayer) -> bool:
 
 128         """ Return true when none of the given layer types is requested.
 
 130         return not any(self.params.layers & l for l in layer)
 
 133     def has_feature_layers(self) -> bool:
 
 134         """ Return true if any layer other than ADDRESS or POI is requested.
 
 136         return self.layer_enabled(DataLayer.RAILWAY, DataLayer.MANMADE, DataLayer.NATURAL)
 
 139     def _add_geometry_columns(self, sql: SaLambdaSelect, col: SaColumn) -> SaSelect:
 
 142         if self.params.geometry_simplification > 0.0:
 
 143             col = sa.func.ST_SimplifyPreserveTopology(col, self.params.geometry_simplification)
 
 145         if self.params.geometry_output & GeometryFormat.GEOJSON:
 
 146             out.append(sa.func.ST_AsGeoJSON(col).label('geometry_geojson'))
 
 147         if self.params.geometry_output & GeometryFormat.TEXT:
 
 148             out.append(sa.func.ST_AsText(col).label('geometry_text'))
 
 149         if self.params.geometry_output & GeometryFormat.KML:
 
 150             out.append(sa.func.ST_AsKML(col).label('geometry_kml'))
 
 151         if self.params.geometry_output & GeometryFormat.SVG:
 
 152             out.append(sa.func.ST_AsSVG(col).label('geometry_svg'))
 
 154         return sql.add_columns(*out)
 
 157     def _filter_by_layer(self, table: SaFromClause) -> SaColumn:
 
 158         if self.layer_enabled(DataLayer.MANMADE):
 
 160             if self.layer_disabled(DataLayer.RAILWAY):
 
 161                 exclude.append('railway')
 
 162             if self.layer_disabled(DataLayer.NATURAL):
 
 163                 exclude.extend(('natural', 'water', 'waterway'))
 
 164             return table.c.class_.not_in(tuple(exclude))
 
 167         if self.layer_enabled(DataLayer.RAILWAY):
 
 168             include.append('railway')
 
 169         if self.layer_enabled(DataLayer.NATURAL):
 
 170             include.extend(('natural', 'water', 'waterway'))
 
 171         return table.c.class_.in_(tuple(include))
 
 174     async def _find_closest_street_or_poi(self, distance: float) -> Optional[SaRow]:
 
 175         """ Look up the closest rank 26+ place in the database, which
 
 176             is closer than the given distance.
 
 178         t = self.conn.t.placex
 
 180         # PostgreSQL must not get the distance as a parameter because
 
 181         # there is a danger it won't be able to proberly estimate index use
 
 182         # when used with prepared statements
 
 183         diststr = sa.text(f"{distance}")
 
 185         sql: SaLambdaSelect = sa.lambda_stmt(lambda: _select_from_placex(t)
 
 186                 .where(t.c.geometry.ST_DWithin(WKT_PARAM, diststr))
 
 187                 .where(t.c.indexed_status == 0)
 
 188                 .where(t.c.linked_place_id == None)
 
 189                 .where(sa.or_(sa.not_(t.c.geometry.is_area()),
 
 190                               t.c.centroid.ST_Distance(WKT_PARAM) < diststr))
 
 191                 .order_by('distance')
 
 194         if self.has_geometries():
 
 195             sql = self._add_geometry_columns(sql, t.c.geometry)
 
 197         restrict: List[SaColumn] = []
 
 199         if self.layer_enabled(DataLayer.ADDRESS):
 
 200             restrict.append(no_index(t.c.rank_address).between(26, min(29, self.max_rank)))
 
 201             if self.max_rank == 30:
 
 202                 restrict.append(_is_address_point(t))
 
 203         if self.layer_enabled(DataLayer.POI) and self.max_rank == 30:
 
 204             restrict.append(sa.and_(no_index(t.c.rank_search) == 30,
 
 205                                     t.c.class_.not_in(('place', 'building')),
 
 206                                     sa.not_(t.c.geometry.is_line_like())))
 
 207         if self.has_feature_layers():
 
 208             restrict.append(sa.and_(no_index(t.c.rank_search).between(26, MAX_RANK_PARAM),
 
 209                                     no_index(t.c.rank_address) == 0,
 
 210                                     self._filter_by_layer(t)))
 
 215         sql = sql.where(sa.or_(*restrict))
 
 217         return (await self.conn.execute(sql, self.bind_params)).one_or_none()
 
 220     async def _find_housenumber_for_street(self, parent_place_id: int) -> Optional[SaRow]:
 
 221         t = self.conn.t.placex
 
 223         sql: SaLambdaSelect = sa.lambda_stmt(lambda: _select_from_placex(t)
 
 224                 .where(t.c.geometry.ST_DWithin(WKT_PARAM, 0.001))
 
 225                 .where(t.c.parent_place_id == parent_place_id)
 
 226                 .where(_is_address_point(t))
 
 227                 .where(t.c.indexed_status == 0)
 
 228                 .where(t.c.linked_place_id == None)
 
 229                 .order_by('distance')
 
 232         if self.has_geometries():
 
 233             sql = self._add_geometry_columns(sql, t.c.geometry)
 
 235         return (await self.conn.execute(sql, self.bind_params)).one_or_none()
 
 238     async def _find_interpolation_for_street(self, parent_place_id: Optional[int],
 
 239                                              distance: float) -> Optional[SaRow]:
 
 240         t = self.conn.t.osmline
 
 242         sql: Any = sa.lambda_stmt(lambda:
 
 244                              t.c.linegeo.ST_Distance(WKT_PARAM).label('distance'),
 
 245                              _locate_interpolation(t))
 
 246                      .where(t.c.linegeo.ST_DWithin(WKT_PARAM, distance))
 
 247                      .where(t.c.startnumber != None)
 
 248                      .order_by('distance')
 
 251         if parent_place_id is not None:
 
 252             sql += lambda s: s.where(t.c.parent_place_id == parent_place_id)
 
 254         def _wrap_query(base_sql: SaLambdaSelect) -> SaSelect:
 
 255             inner = base_sql.subquery('ipol')
 
 257             return sa.select(inner.c.place_id, inner.c.osm_id,
 
 258                              inner.c.parent_place_id, inner.c.address,
 
 259                              _interpolated_housenumber(inner),
 
 260                              _interpolated_position(inner),
 
 261                              inner.c.postcode, inner.c.country_code,
 
 266         if self.has_geometries():
 
 267             sub = sql.subquery('geom')
 
 268             sql = self._add_geometry_columns(sa.select(sub), sub.c.centroid)
 
 270         return (await self.conn.execute(sql, self.bind_params)).one_or_none()
 
 273     async def _find_tiger_number_for_street(self, parent_place_id: int,
 
 275                                             parent_id: int) -> Optional[SaRow]:
 
 276         t = self.conn.t.tiger
 
 278         def _base_query() -> SaSelect:
 
 280                               t.c.linegeo.ST_Distance(WKT_PARAM).label('distance'),
 
 281                               _locate_interpolation(t))\
 
 282                       .where(t.c.linegeo.ST_DWithin(WKT_PARAM, 0.001))\
 
 283                       .where(t.c.parent_place_id == parent_place_id)\
 
 284                       .order_by('distance')\
 
 288             return sa.select(inner.c.place_id,
 
 289                              inner.c.parent_place_id,
 
 290                              sa.sql.expression.label('osm_type', parent_type),
 
 291                              sa.sql.expression.label('osm_id', parent_id),
 
 292                              _interpolated_housenumber(inner),
 
 293                              _interpolated_position(inner),
 
 297         sql: SaLambdaSelect = sa.lambda_stmt(_base_query)
 
 299         if self.has_geometries():
 
 300             sub = sql.subquery('geom')
 
 301             sql = self._add_geometry_columns(sa.select(sub), sub.c.centroid)
 
 303         return (await self.conn.execute(sql, self.bind_params)).one_or_none()
 
 306     async def lookup_street_poi(self) -> Tuple[Optional[SaRow], RowFunc]:
 
 307         """ Find a street or POI/address for the given WKT point.
 
 309         log().section('Reverse lookup on street/address level')
 
 311         parent_place_id = None
 
 313         row = await self._find_closest_street_or_poi(distance)
 
 314         row_func: RowFunc = nres.create_from_placex_row
 
 315         log().var_dump('Result (street/building)', row)
 
 317         # If the closest result was a street, but an address was requested,
 
 318         # check for a housenumber nearby which is part of the street.
 
 320             if self.max_rank > 27 \
 
 321                and self.layer_enabled(DataLayer.ADDRESS) \
 
 322                and row.rank_address <= 27:
 
 324                 parent_place_id = row.place_id
 
 325                 log().comment('Find housenumber for street')
 
 326                 addr_row = await self._find_housenumber_for_street(parent_place_id)
 
 327                 log().var_dump('Result (street housenumber)', addr_row)
 
 329                 if addr_row is not None:
 
 331                     row_func = nres.create_from_placex_row
 
 332                     distance = addr_row.distance
 
 333                 elif row.country_code == 'us' and parent_place_id is not None:
 
 334                     log().comment('Find TIGER housenumber for street')
 
 335                     addr_row = await self._find_tiger_number_for_street(parent_place_id,
 
 338                     log().var_dump('Result (street Tiger housenumber)', addr_row)
 
 340                     if addr_row is not None:
 
 342                         row_func = nres.create_from_tiger_row
 
 344                 distance = row.distance
 
 346         # Check for an interpolation that is either closer than our result
 
 347         # or belongs to a close street found.
 
 348         if self.max_rank > 27 and self.layer_enabled(DataLayer.ADDRESS):
 
 349             log().comment('Find interpolation for street')
 
 350             addr_row = await self._find_interpolation_for_street(parent_place_id,
 
 352             log().var_dump('Result (street interpolation)', addr_row)
 
 353             if addr_row is not None:
 
 355                 row_func = nres.create_from_osmline_row
 
 360     async def _lookup_area_address(self) -> Optional[SaRow]:
 
 361         """ Lookup large addressable areas for the given WKT point.
 
 363         log().comment('Reverse lookup by larger address area features')
 
 364         t = self.conn.t.placex
 
 366         def _base_query() -> SaSelect:
 
 367             # The inner SQL brings results in the right order, so that
 
 368             # later only a minimum of results needs to be checked with ST_Contains.
 
 369             inner = sa.select(t, sa.literal(0.0).label('distance'))\
 
 370                       .where(t.c.rank_search.between(5, MAX_RANK_PARAM))\
 
 371                       .where(t.c.geometry.intersects(WKT_PARAM))\
 
 372                       .where(snfn.select_index_placex_geometry_reverse_lookuppolygon('placex'))\
 
 373                       .order_by(sa.desc(t.c.rank_search))\
 
 377             return _select_from_placex(inner, False)\
 
 378                       .where(inner.c.geometry.ST_Contains(WKT_PARAM))\
 
 379                       .order_by(sa.desc(inner.c.rank_search))\
 
 382         sql: SaLambdaSelect = sa.lambda_stmt(_base_query)
 
 383         if self.has_geometries():
 
 384             sql = self._add_geometry_columns(sql, sa.literal_column('area.geometry'))
 
 386         address_row = (await self.conn.execute(sql, self.bind_params)).one_or_none()
 
 387         log().var_dump('Result (area)', address_row)
 
 389         if address_row is not None and address_row.rank_search < self.max_rank:
 
 390             log().comment('Search for better matching place nodes inside the area')
 
 392             address_rank = address_row.rank_search
 
 393             address_id = address_row.place_id
 
 395             def _place_inside_area_query() -> SaSelect:
 
 398                               t.c.geometry.ST_Distance(WKT_PARAM).label('distance'))\
 
 399                       .where(t.c.rank_search > address_rank)\
 
 400                       .where(t.c.rank_search <= MAX_RANK_PARAM)\
 
 401                       .where(t.c.indexed_status == 0)\
 
 402                       .where(snfn.select_index_placex_geometry_reverse_lookupplacenode('placex'))\
 
 404                                 .ST_Buffer(sa.func.reverse_place_diameter(t.c.rank_search))
 
 405                                 .intersects(WKT_PARAM))\
 
 406                       .order_by(sa.desc(t.c.rank_search))\
 
 410                 touter = t.alias('outer')
 
 411                 return _select_from_placex(inner, False)\
 
 412                     .join(touter, touter.c.geometry.ST_Contains(inner.c.geometry))\
 
 413                     .where(touter.c.place_id == address_id)\
 
 414                     .where(inner.c.distance < sa.func.reverse_place_diameter(inner.c.rank_search))\
 
 415                     .order_by(sa.desc(inner.c.rank_search), inner.c.distance)\
 
 418             sql = sa.lambda_stmt(_place_inside_area_query)
 
 419             if self.has_geometries():
 
 420                 sql = self._add_geometry_columns(sql, sa.literal_column('places.geometry'))
 
 422             place_address_row = (await self.conn.execute(sql, self.bind_params)).one_or_none()
 
 423             log().var_dump('Result (place node)', place_address_row)
 
 425             if place_address_row is not None:
 
 426                 return place_address_row
 
 431     async def _lookup_area_others(self) -> Optional[SaRow]:
 
 432         t = self.conn.t.placex
 
 434         inner = sa.select(t, t.c.geometry.ST_Distance(WKT_PARAM).label('distance'))\
 
 435                   .where(t.c.rank_address == 0)\
 
 436                   .where(t.c.rank_search.between(5, MAX_RANK_PARAM))\
 
 437                   .where(t.c.name != None)\
 
 438                   .where(t.c.indexed_status == 0)\
 
 439                   .where(t.c.linked_place_id == None)\
 
 440                   .where(self._filter_by_layer(t))\
 
 442                                 .ST_Buffer(sa.func.reverse_place_diameter(t.c.rank_search))
 
 443                                 .intersects(WKT_PARAM))\
 
 444                   .order_by(sa.desc(t.c.rank_search))\
 
 448         sql = _select_from_placex(inner, False)\
 
 449                   .where(sa.or_(sa.not_(inner.c.geometry.is_area()),
 
 450                                 inner.c.geometry.ST_Contains(WKT_PARAM)))\
 
 451                   .order_by(sa.desc(inner.c.rank_search), inner.c.distance)\
 
 454         if self.has_geometries():
 
 455             sql = self._add_geometry_columns(sql, inner.c.geometry)
 
 457         row = (await self.conn.execute(sql, self.bind_params)).one_or_none()
 
 458         log().var_dump('Result (non-address feature)', row)
 
 463     async def lookup_area(self) -> Optional[SaRow]:
 
 464         """ Lookup large areas for the current search.
 
 466         log().section('Reverse lookup by larger area features')
 
 468         if self.layer_enabled(DataLayer.ADDRESS):
 
 469             address_row = await self._lookup_area_address()
 
 473         if self.has_feature_layers():
 
 474             other_row = await self._lookup_area_others()
 
 478         return _get_closest(address_row, other_row)
 
 481     async def lookup_country(self) -> Optional[SaRow]:
 
 482         """ Lookup the country for the current search.
 
 484         log().section('Reverse lookup by country code')
 
 485         t = self.conn.t.country_grid
 
 486         sql: SaLambdaSelect = sa.select(t.c.country_code).distinct()\
 
 487                 .where(t.c.geometry.ST_Contains(WKT_PARAM))
 
 489         ccodes = tuple((r[0] for r in await self.conn.execute(sql, self.bind_params)))
 
 490         log().var_dump('Country codes', ccodes)
 
 495         t = self.conn.t.placex
 
 496         if self.max_rank > 4:
 
 497             log().comment('Search for place nodes in country')
 
 499             def _base_query() -> SaSelect:
 
 502                               t.c.geometry.ST_Distance(WKT_PARAM).label('distance'))\
 
 503                       .where(t.c.rank_search > 4)\
 
 504                       .where(t.c.rank_search <= MAX_RANK_PARAM)\
 
 505                       .where(t.c.indexed_status == 0)\
 
 506                       .where(t.c.country_code.in_(ccodes))\
 
 507                       .where(snfn.select_index_placex_geometry_reverse_lookupplacenode('placex'))\
 
 509                                 .ST_Buffer(sa.func.reverse_place_diameter(t.c.rank_search))
 
 510                                 .intersects(WKT_PARAM))\
 
 511                       .order_by(sa.desc(t.c.rank_search))\
 
 515                 return _select_from_placex(inner, False)\
 
 516                     .where(inner.c.distance < sa.func.reverse_place_diameter(inner.c.rank_search))\
 
 517                     .order_by(sa.desc(inner.c.rank_search), inner.c.distance)\
 
 520             sql = sa.lambda_stmt(_base_query)
 
 521             if self.has_geometries():
 
 522                 sql = self._add_geometry_columns(sql, sa.literal_column('area.geometry'))
 
 524             address_row = (await self.conn.execute(sql, self.bind_params)).one_or_none()
 
 525             log().var_dump('Result (addressable place node)', address_row)
 
 529         if address_row is None:
 
 530             # Still nothing, then return a country with the appropriate country code.
 
 531             sql = sa.lambda_stmt(lambda: _select_from_placex(t)\
 
 532                       .where(t.c.country_code.in_(ccodes))\
 
 533                       .where(t.c.rank_address == 4)\
 
 534                       .where(t.c.rank_search == 4)\
 
 535                       .where(t.c.linked_place_id == None)\
 
 536                       .order_by('distance')\
 
 539             if self.has_geometries():
 
 540                 sql = self._add_geometry_columns(sql, t.c.geometry)
 
 542             address_row = (await self.conn.execute(sql, self.bind_params)).one_or_none()
 
 547     async def lookup(self, coord: AnyPoint) -> Optional[nres.ReverseResult]:
 
 548         """ Look up a single coordinate. Returns the place information,
 
 549             if a place was found near the coordinates or None otherwise.
 
 551         log().function('reverse_lookup', coord=coord, params=self.params)
 
 554         self.bind_params['wkt'] = f'POINT({coord[0]} {coord[1]})'
 
 556         row: Optional[SaRow] = None
 
 557         row_func: RowFunc = nres.create_from_placex_row
 
 559         if self.max_rank >= 26:
 
 560             row, tmp_row_func = await self.lookup_street_poi()
 
 562                 row_func = tmp_row_func
 
 563         if row is None and self.max_rank > 4:
 
 564             row = await self.lookup_area()
 
 565         if row is None and self.layer_enabled(DataLayer.ADDRESS):
 
 566             row = await self.lookup_country()
 
 568         result = row_func(row, nres.ReverseResult)
 
 569         if result is not None:
 
 570             assert row is not None
 
 571             result.distance = row.distance
 
 572             if hasattr(row, 'bbox'):
 
 573                 result.bbox = Bbox.from_wkb(row.bbox)
 
 574             await nres.add_result_details(self.conn, [result], self.params)