1 # SPDX-License-Identifier: GPL-3.0-or-later
 
   3 # This file is part of Nominatim. (https://nominatim.org)
 
   5 # Copyright (C) 2023 by the Nominatim developer community.
 
   6 # For a full list of authors see the git log.
 
   8 Implementation of reverse geocoding.
 
  10 from typing import Optional, List, Callable, Type, Tuple, Dict, Any, cast, Union
 
  13 import sqlalchemy as sa
 
  15 from nominatim.typing import SaColumn, SaSelect, SaFromClause, SaLabel, SaRow,\
 
  16                              SaBind, SaLambdaSelect
 
  17 from nominatim.api.connection import SearchConnection
 
  18 import nominatim.api.results as nres
 
  19 from nominatim.api.logging import log
 
  20 from nominatim.api.types import AnyPoint, DataLayer, ReverseDetails, GeometryFormat, Bbox
 
  21 from nominatim.db.sqlalchemy_types import Geometry
 
  23 # In SQLAlchemy expression which compare with NULL need to be expressed with
 
  25 # pylint: disable=singleton-comparison
 
  27 RowFunc = Callable[[Optional[SaRow], Type[nres.ReverseResult]], Optional[nres.ReverseResult]]
 
  29 WKT_PARAM: SaBind = sa.bindparam('wkt', type_=Geometry)
 
  30 MAX_RANK_PARAM: SaBind = sa.bindparam('max_rank')
 
  32 def no_index(expr: SaColumn) -> SaColumn:
 
  33     """ Wrap the given expression, so that the query planner will
 
  34         refrain from using the expression for index lookup.
 
  36     return sa.func.coalesce(sa.null(), expr) # pylint: disable=not-callable
 
  39 def _select_from_placex(t: SaFromClause, use_wkt: bool = True) -> SaSelect:
 
  40     """ Create a select statement with the columns relevant for reverse
 
  44         distance = t.c.distance
 
  45         centroid = t.c.centroid
 
  47         distance = t.c.geometry.ST_Distance(WKT_PARAM)
 
  48         centroid = sa.case((t.c.geometry.is_line_like(), t.c.geometry.ST_ClosestPoint(WKT_PARAM)),
 
  49                            else_=t.c.centroid).label('centroid')
 
  52     return sa.select(t.c.place_id, t.c.osm_type, t.c.osm_id, t.c.name,
 
  54                      t.c.address, t.c.extratags,
 
  55                      t.c.housenumber, t.c.postcode, t.c.country_code,
 
  56                      t.c.importance, t.c.wikipedia,
 
  57                      t.c.parent_place_id, t.c.rank_address, t.c.rank_search,
 
  59                      t.c.linked_place_id, t.c.admin_level,
 
  60                      distance.label('distance'),
 
  61                      t.c.geometry.ST_Expand(0).label('bbox'))
 
  64 def _interpolated_housenumber(table: SaFromClause) -> SaLabel:
 
  65     return sa.cast(table.c.startnumber
 
  66                     + sa.func.round(((table.c.endnumber - table.c.startnumber) * table.c.position)
 
  67                                     / table.c.step) * table.c.step,
 
  68                    sa.Integer).label('housenumber')
 
  71 def _interpolated_position(table: SaFromClause) -> SaLabel:
 
  72     fac = sa.cast(table.c.step, sa.Float) / (table.c.endnumber - table.c.startnumber)
 
  73     rounded_pos = sa.func.round(table.c.position / fac) * fac
 
  75              (table.c.endnumber == table.c.startnumber, table.c.linegeo.ST_Centroid()),
 
  76               else_=table.c.linegeo.ST_LineInterpolatePoint(rounded_pos)).label('centroid')
 
  79 def _locate_interpolation(table: SaFromClause) -> SaLabel:
 
  80     """ Given a position, locate the closest point on the line.
 
  82     return sa.case((table.c.linegeo.is_line_like(),
 
  83                     table.c.linegeo.ST_LineLocatePoint(WKT_PARAM)),
 
  84                    else_=0).label('position')
 
  87 def _get_closest(*rows: Optional[SaRow]) -> Optional[SaRow]:
 
  88     return min(rows, key=lambda row: 1000 if row is None else row.distance)
 
  91 class ReverseGeocoder:
 
  92     """ Class implementing the logic for looking up a place from a
 
  96     def __init__(self, conn: SearchConnection, params: ReverseDetails,
 
  97                  restrict_to_country_areas: bool = False) -> None:
 
 100         self.restrict_to_country_areas = restrict_to_country_areas
 
 102         self.bind_params: Dict[str, Any] = {'max_rank': params.max_rank}
 
 106     def max_rank(self) -> int:
 
 107         """ Return the maximum configured rank.
 
 109         return self.params.max_rank
 
 112     def has_geometries(self) -> bool:
 
 113         """ Check if any geometries are requested.
 
 115         return bool(self.params.geometry_output)
 
 118     def layer_enabled(self, *layer: DataLayer) -> bool:
 
 119         """ Return true when any of the given layer types are requested.
 
 121         return any(self.params.layers & l for l in layer)
 
 124     def layer_disabled(self, *layer: DataLayer) -> bool:
 
 125         """ Return true when none of the given layer types is requested.
 
 127         return not any(self.params.layers & l for l in layer)
 
 130     def has_feature_layers(self) -> bool:
 
 131         """ Return true if any layer other than ADDRESS or POI is requested.
 
 133         return self.layer_enabled(DataLayer.RAILWAY, DataLayer.MANMADE, DataLayer.NATURAL)
 
 136     def _add_geometry_columns(self, sql: SaLambdaSelect, col: SaColumn) -> SaSelect:
 
 139         if self.params.geometry_simplification > 0.0:
 
 140             col = sa.func.ST_SimplifyPreserveTopology(col, self.params.geometry_simplification)
 
 142         if self.params.geometry_output & GeometryFormat.GEOJSON:
 
 143             out.append(sa.func.ST_AsGeoJSON(col, 7).label('geometry_geojson'))
 
 144         if self.params.geometry_output & GeometryFormat.TEXT:
 
 145             out.append(sa.func.ST_AsText(col).label('geometry_text'))
 
 146         if self.params.geometry_output & GeometryFormat.KML:
 
 147             out.append(sa.func.ST_AsKML(col, 7).label('geometry_kml'))
 
 148         if self.params.geometry_output & GeometryFormat.SVG:
 
 149             out.append(sa.func.ST_AsSVG(col, 0, 7).label('geometry_svg'))
 
 151         return sql.add_columns(*out)
 
 154     def _filter_by_layer(self, table: SaFromClause) -> SaColumn:
 
 155         if self.layer_enabled(DataLayer.MANMADE):
 
 157             if self.layer_disabled(DataLayer.RAILWAY):
 
 158                 exclude.append('railway')
 
 159             if self.layer_disabled(DataLayer.NATURAL):
 
 160                 exclude.extend(('natural', 'water', 'waterway'))
 
 161             return table.c.class_.not_in(tuple(exclude))
 
 164         if self.layer_enabled(DataLayer.RAILWAY):
 
 165             include.append('railway')
 
 166         if self.layer_enabled(DataLayer.NATURAL):
 
 167             include.extend(('natural', 'water', 'waterway'))
 
 168         return table.c.class_.in_(tuple(include))
 
 171     async def _find_closest_street_or_poi(self, distance: float) -> Optional[SaRow]:
 
 172         """ Look up the closest rank 26+ place in the database, which
 
 173             is closer than the given distance.
 
 175         t = self.conn.t.placex
 
 177         # PostgreSQL must not get the distance as a parameter because
 
 178         # there is a danger it won't be able to proberly estimate index use
 
 179         # when used with prepared statements
 
 180         diststr = sa.text(f"{distance}")
 
 182         sql: SaLambdaSelect = sa.lambda_stmt(lambda: _select_from_placex(t)
 
 183                 .where(t.c.geometry.within_distance(WKT_PARAM, diststr))
 
 184                 .where(t.c.indexed_status == 0)
 
 185                 .where(t.c.linked_place_id == None)
 
 186                 .where(sa.or_(sa.not_(t.c.geometry.is_area()),
 
 187                               t.c.centroid.ST_Distance(WKT_PARAM) < diststr))
 
 188                 .order_by('distance')
 
 191         if self.has_geometries():
 
 192             sql = self._add_geometry_columns(sql, t.c.geometry)
 
 194         restrict: List[Union[SaColumn, Callable[[], SaColumn]]] = []
 
 196         if self.layer_enabled(DataLayer.ADDRESS):
 
 197             max_rank = min(29, self.max_rank)
 
 198             restrict.append(lambda: no_index(t.c.rank_address).between(26, max_rank))
 
 199             if self.max_rank == 30:
 
 200                 restrict.append(lambda: sa.func.IsAddressPoint(t))
 
 201         if self.layer_enabled(DataLayer.POI) and self.max_rank == 30:
 
 202             restrict.append(lambda: sa.and_(no_index(t.c.rank_search) == 30,
 
 203                                             t.c.class_.not_in(('place', 'building')),
 
 204                                             sa.not_(t.c.geometry.is_line_like())))
 
 205         if self.has_feature_layers():
 
 206             restrict.append(sa.and_(no_index(t.c.rank_search).between(26, MAX_RANK_PARAM),
 
 207                                     no_index(t.c.rank_address) == 0,
 
 208                                     self._filter_by_layer(t)))
 
 213         sql = sql.where(sa.or_(*restrict))
 
 215         return (await self.conn.execute(sql, self.bind_params)).one_or_none()
 
 218     async def _find_housenumber_for_street(self, parent_place_id: int) -> Optional[SaRow]:
 
 219         t = self.conn.t.placex
 
 221         def _base_query() -> SaSelect:
 
 222             return _select_from_placex(t)\
 
 223                 .where(t.c.geometry.within_distance(WKT_PARAM, 0.001))\
 
 224                 .where(t.c.parent_place_id == parent_place_id)\
 
 225                 .where(sa.func.IsAddressPoint(t))\
 
 226                 .where(t.c.indexed_status == 0)\
 
 227                 .where(t.c.linked_place_id == None)\
 
 228                 .order_by('distance')\
 
 232         if self.has_geometries():
 
 233             sql = self._add_geometry_columns(_base_query(), t.c.geometry)
 
 235             sql = sa.lambda_stmt(_base_query)
 
 237         return (await self.conn.execute(sql, self.bind_params)).one_or_none()
 
 240     async def _find_interpolation_for_street(self, parent_place_id: Optional[int],
 
 241                                              distance: float) -> Optional[SaRow]:
 
 242         t = self.conn.t.osmline
 
 245                         t.c.linegeo.ST_Distance(WKT_PARAM).label('distance'),
 
 246                         _locate_interpolation(t))\
 
 247                 .where(t.c.linegeo.within_distance(WKT_PARAM, distance))\
 
 248                 .where(t.c.startnumber != None)\
 
 249                 .order_by('distance')\
 
 252         if parent_place_id is not None:
 
 253             sql = sql.where(t.c.parent_place_id == parent_place_id)
 
 255         inner = sql.subquery('ipol')
 
 257         sql = sa.select(inner.c.place_id, inner.c.osm_id,
 
 258                              inner.c.parent_place_id, inner.c.address,
 
 259                              _interpolated_housenumber(inner),
 
 260                              _interpolated_position(inner),
 
 261                              inner.c.postcode, inner.c.country_code,
 
 264         if self.has_geometries():
 
 265             sub = sql.subquery('geom')
 
 266             sql = self._add_geometry_columns(sa.select(sub), sub.c.centroid)
 
 268         return (await self.conn.execute(sql, self.bind_params)).one_or_none()
 
 271     async def _find_tiger_number_for_street(self, parent_place_id: int) -> Optional[SaRow]:
 
 272         t = self.conn.t.tiger
 
 274         def _base_query() -> SaSelect:
 
 276                               t.c.linegeo.ST_Distance(WKT_PARAM).label('distance'),
 
 277                               _locate_interpolation(t))\
 
 278                       .where(t.c.linegeo.within_distance(WKT_PARAM, 0.001))\
 
 279                       .where(t.c.parent_place_id == parent_place_id)\
 
 280                       .order_by('distance')\
 
 284             return sa.select(inner.c.place_id,
 
 285                              inner.c.parent_place_id,
 
 286                              _interpolated_housenumber(inner),
 
 287                              _interpolated_position(inner),
 
 292         if self.has_geometries():
 
 293             sub = _base_query().subquery('geom')
 
 294             sql = self._add_geometry_columns(sa.select(sub), sub.c.centroid)
 
 296             sql = sa.lambda_stmt(_base_query)
 
 298         return (await self.conn.execute(sql, self.bind_params)).one_or_none()
 
 301     async def lookup_street_poi(self) -> Tuple[Optional[SaRow], RowFunc]:
 
 302         """ Find a street or POI/address for the given WKT point.
 
 304         log().section('Reverse lookup on street/address level')
 
 306         parent_place_id = None
 
 308         row = await self._find_closest_street_or_poi(distance)
 
 309         row_func: RowFunc = nres.create_from_placex_row
 
 310         log().var_dump('Result (street/building)', row)
 
 312         # If the closest result was a street, but an address was requested,
 
 313         # check for a housenumber nearby which is part of the street.
 
 315             if self.max_rank > 27 \
 
 316                and self.layer_enabled(DataLayer.ADDRESS) \
 
 317                and row.rank_address <= 27:
 
 319                 parent_place_id = row.place_id
 
 320                 log().comment('Find housenumber for street')
 
 321                 addr_row = await self._find_housenumber_for_street(parent_place_id)
 
 322                 log().var_dump('Result (street housenumber)', addr_row)
 
 324                 if addr_row is not None:
 
 326                     row_func = nres.create_from_placex_row
 
 327                     distance = addr_row.distance
 
 328                 elif row.country_code == 'us' and parent_place_id is not None:
 
 329                     log().comment('Find TIGER housenumber for street')
 
 330                     addr_row = await self._find_tiger_number_for_street(parent_place_id)
 
 331                     log().var_dump('Result (street Tiger housenumber)', addr_row)
 
 333                     if addr_row is not None:
 
 334                         row_func = cast(RowFunc,
 
 335                                         functools.partial(nres.create_from_tiger_row,
 
 336                                                           osm_type=row.osm_type,
 
 340                 distance = row.distance
 
 342         # Check for an interpolation that is either closer than our result
 
 343         # or belongs to a close street found.
 
 344         if self.max_rank > 27 and self.layer_enabled(DataLayer.ADDRESS):
 
 345             log().comment('Find interpolation for street')
 
 346             addr_row = await self._find_interpolation_for_street(parent_place_id,
 
 348             log().var_dump('Result (street interpolation)', addr_row)
 
 349             if addr_row is not None:
 
 351                 row_func = nres.create_from_osmline_row
 
 356     async def _lookup_area_address(self) -> Optional[SaRow]:
 
 357         """ Lookup large addressable areas for the given WKT point.
 
 359         log().comment('Reverse lookup by larger address area features')
 
 360         t = self.conn.t.placex
 
 362         def _base_query() -> SaSelect:
 
 363             # The inner SQL brings results in the right order, so that
 
 364             # later only a minimum of results needs to be checked with ST_Contains.
 
 365             inner = sa.select(t, sa.literal(0.0).label('distance'))\
 
 366                       .where(t.c.rank_search.between(5, MAX_RANK_PARAM))\
 
 367                       .where(t.c.geometry.intersects(WKT_PARAM))\
 
 368                       .where(sa.func.PlacexGeometryReverseLookuppolygon())\
 
 369                       .order_by(sa.desc(t.c.rank_search))\
 
 373             return _select_from_placex(inner, False)\
 
 374                       .where(inner.c.geometry.ST_Contains(WKT_PARAM))\
 
 375                       .order_by(sa.desc(inner.c.rank_search))\
 
 378         sql: SaLambdaSelect = sa.lambda_stmt(_base_query)
 
 379         if self.has_geometries():
 
 380             sql = self._add_geometry_columns(sql, sa.literal_column('area.geometry'))
 
 382         address_row = (await self.conn.execute(sql, self.bind_params)).one_or_none()
 
 383         log().var_dump('Result (area)', address_row)
 
 385         if address_row is not None and address_row.rank_search < self.max_rank:
 
 386             log().comment('Search for better matching place nodes inside the area')
 
 388             address_rank = address_row.rank_search
 
 389             address_id = address_row.place_id
 
 391             def _place_inside_area_query() -> SaSelect:
 
 394                               t.c.geometry.ST_Distance(WKT_PARAM).label('distance'))\
 
 395                       .where(t.c.rank_search > address_rank)\
 
 396                       .where(t.c.rank_search <= MAX_RANK_PARAM)\
 
 397                       .where(t.c.indexed_status == 0)\
 
 398                       .where(sa.func.IntersectsReverseDistance(t, WKT_PARAM))\
 
 399                       .order_by(sa.desc(t.c.rank_search))\
 
 403                 touter = t.alias('outer')
 
 404                 return _select_from_placex(inner, False)\
 
 405                     .join(touter, touter.c.geometry.ST_Contains(inner.c.geometry))\
 
 406                     .where(touter.c.place_id == address_id)\
 
 407                     .where(sa.func.IsBelowReverseDistance(inner.c.distance, inner.c.rank_search))\
 
 408                     .order_by(sa.desc(inner.c.rank_search), inner.c.distance)\
 
 411             if self.has_geometries():
 
 412                 sql = self._add_geometry_columns(_place_inside_area_query(),
 
 413                                                  sa.literal_column('places.geometry'))
 
 415                 sql = sa.lambda_stmt(_place_inside_area_query)
 
 417             place_address_row = (await self.conn.execute(sql, self.bind_params)).one_or_none()
 
 418             log().var_dump('Result (place node)', place_address_row)
 
 420             if place_address_row is not None:
 
 421                 return place_address_row
 
 426     async def _lookup_area_others(self) -> Optional[SaRow]:
 
 427         t = self.conn.t.placex
 
 429         inner = sa.select(t, t.c.geometry.ST_Distance(WKT_PARAM).label('distance'))\
 
 430                   .where(t.c.rank_address == 0)\
 
 431                   .where(t.c.rank_search.between(5, MAX_RANK_PARAM))\
 
 432                   .where(t.c.name != None)\
 
 433                   .where(t.c.indexed_status == 0)\
 
 434                   .where(t.c.linked_place_id == None)\
 
 435                   .where(self._filter_by_layer(t))\
 
 436                   .where(t.c.geometry.intersects(sa.func.ST_Expand(WKT_PARAM, 0.007)))\
 
 437                   .order_by(sa.desc(t.c.rank_search))\
 
 438                   .order_by('distance')\
 
 442         sql = _select_from_placex(inner, False)\
 
 443                   .where(sa.or_(sa.not_(inner.c.geometry.is_area()),
 
 444                                 inner.c.geometry.ST_Contains(WKT_PARAM)))\
 
 445                   .order_by(sa.desc(inner.c.rank_search), inner.c.distance)\
 
 448         if self.has_geometries():
 
 449             sql = self._add_geometry_columns(sql, inner.c.geometry)
 
 451         row = (await self.conn.execute(sql, self.bind_params)).one_or_none()
 
 452         log().var_dump('Result (non-address feature)', row)
 
 457     async def lookup_area(self) -> Optional[SaRow]:
 
 458         """ Lookup large areas for the current search.
 
 460         log().section('Reverse lookup by larger area features')
 
 462         if self.layer_enabled(DataLayer.ADDRESS):
 
 463             address_row = await self._lookup_area_address()
 
 467         if self.has_feature_layers():
 
 468             other_row = await self._lookup_area_others()
 
 472         return _get_closest(address_row, other_row)
 
 475     async def lookup_country_codes(self) -> List[str]:
 
 476         """ Lookup the country for the current search.
 
 478         log().section('Reverse lookup by country code')
 
 479         t = self.conn.t.country_grid
 
 480         sql = sa.select(t.c.country_code).distinct()\
 
 481                 .where(t.c.geometry.ST_Contains(WKT_PARAM))
 
 483         ccodes = [cast(str, r[0]) for r in await self.conn.execute(sql, self.bind_params)]
 
 484         log().var_dump('Country codes', ccodes)
 
 488     async def lookup_country(self, ccodes: List[str]) -> Optional[SaRow]:
 
 489         """ Lookup the country for the current search.
 
 492             ccodes = await self.lookup_country_codes()
 
 497         t = self.conn.t.placex
 
 498         if self.max_rank > 4:
 
 499             log().comment('Search for place nodes in country')
 
 501             def _base_query() -> SaSelect:
 
 504                               t.c.geometry.ST_Distance(WKT_PARAM).label('distance'))\
 
 505                       .where(t.c.rank_search > 4)\
 
 506                       .where(t.c.rank_search <= MAX_RANK_PARAM)\
 
 507                       .where(t.c.indexed_status == 0)\
 
 508                       .where(t.c.country_code.in_(ccodes))\
 
 509                       .where(sa.func.IntersectsReverseDistance(t, WKT_PARAM))\
 
 510                       .order_by(sa.desc(t.c.rank_search))\
 
 514                 return _select_from_placex(inner, False)\
 
 515                     .where(sa.func.IsBelowReverseDistance(inner.c.distance, inner.c.rank_search))\
 
 516                     .order_by(sa.desc(inner.c.rank_search), inner.c.distance)\
 
 520             if self.has_geometries():
 
 521                 sql = self._add_geometry_columns(_base_query(),
 
 522                                                  sa.literal_column('area.geometry'))
 
 524                 sql = sa.lambda_stmt(_base_query)
 
 526             address_row = (await self.conn.execute(sql, self.bind_params)).one_or_none()
 
 527             log().var_dump('Result (addressable place node)', address_row)
 
 531         if address_row is None:
 
 532             # Still nothing, then return a country with the appropriate country code.
 
 533             def _country_base_query() -> SaSelect:
 
 534                 return _select_from_placex(t)\
 
 535                          .where(t.c.country_code.in_(ccodes))\
 
 536                          .where(t.c.rank_address == 4)\
 
 537                          .where(t.c.rank_search == 4)\
 
 538                          .where(t.c.linked_place_id == None)\
 
 539                          .order_by('distance')\
 
 542             if self.has_geometries():
 
 543                 sql = self._add_geometry_columns(_country_base_query(), t.c.geometry)
 
 545                 sql = sa.lambda_stmt(_country_base_query)
 
 547             address_row = (await self.conn.execute(sql, self.bind_params)).one_or_none()
 
 552     async def lookup(self, coord: AnyPoint) -> Optional[nres.ReverseResult]:
 
 553         """ Look up a single coordinate. Returns the place information,
 
 554             if a place was found near the coordinates or None otherwise.
 
 556         log().function('reverse_lookup', coord=coord, params=self.params)
 
 559         self.bind_params['wkt'] = f'POINT({coord[0]} {coord[1]})'
 
 561         row: Optional[SaRow] = None
 
 562         row_func: RowFunc = nres.create_from_placex_row
 
 564         if self.max_rank >= 26:
 
 565             row, tmp_row_func = await self.lookup_street_poi()
 
 567                 row_func = tmp_row_func
 
 570             if self.restrict_to_country_areas:
 
 571                 ccodes = await self.lookup_country_codes()
 
 577             if self.max_rank > 4:
 
 578                 row = await self.lookup_area()
 
 579             if row is None and self.layer_enabled(DataLayer.ADDRESS):
 
 580                 row = await self.lookup_country(ccodes)
 
 582         result = row_func(row, nres.ReverseResult)
 
 583         if result is not None:
 
 584             assert row is not None
 
 585             result.distance = row.distance
 
 586             if hasattr(row, 'bbox'):
 
 587                 result.bbox = Bbox.from_wkb(row.bbox)
 
 588             await nres.add_result_details(self.conn, [result], self.params)