1 # SPDX-License-Identifier: GPL-3.0-or-later
 
   3 # This file is part of Nominatim. (https://nominatim.org)
 
   5 # Copyright (C) 2023 by the Nominatim developer community.
 
   6 # For a full list of authors see the git log.
 
   8 Implementation of reverse geocoding.
 
  10 from typing import Optional, List, Callable, Type, Tuple
 
  12 import sqlalchemy as sa
 
  13 from geoalchemy2 import WKTElement
 
  15 from nominatim.typing import SaColumn, SaSelect, SaFromClause, SaLabel, SaRow
 
  16 from nominatim.api.connection import SearchConnection
 
  17 import nominatim.api.results as nres
 
  18 from nominatim.api.logging import log
 
  19 from nominatim.api.types import AnyPoint, DataLayer, ReverseDetails, GeometryFormat, Bbox
 
  21 # In SQLAlchemy expression which compare with NULL need to be expressed with
 
  23 # pylint: disable=singleton-comparison
 
  25 RowFunc = Callable[[Optional[SaRow], Type[nres.ReverseResult]], Optional[nres.ReverseResult]]
 
  27 def _select_from_placex(t: SaFromClause, wkt: Optional[str] = None) -> SaSelect:
 
  28     """ Create a select statement with the columns relevant for reverse
 
  32         distance = t.c.distance
 
  33         centroid = t.c.centroid
 
  35         distance = t.c.geometry.ST_Distance(wkt)
 
  37                        (t.c.geometry.ST_GeometryType().in_(('ST_LineString',
 
  38                                                            'ST_MultiLineString')),
 
  39                         t.c.geometry.ST_ClosestPoint(wkt)),
 
  40                        else_=t.c.centroid).label('centroid')
 
  43     return sa.select(t.c.place_id, t.c.osm_type, t.c.osm_id, t.c.name,
 
  45                      t.c.address, t.c.extratags,
 
  46                      t.c.housenumber, t.c.postcode, t.c.country_code,
 
  47                      t.c.importance, t.c.wikipedia,
 
  48                      t.c.parent_place_id, t.c.rank_address, t.c.rank_search,
 
  50                      distance.label('distance'),
 
  51                      t.c.geometry.ST_Expand(0).label('bbox'))
 
  54 def _interpolated_housenumber(table: SaFromClause) -> SaLabel:
 
  55     return sa.cast(table.c.startnumber
 
  56                     + sa.func.round(((table.c.endnumber - table.c.startnumber) * table.c.position)
 
  57                                     / table.c.step) * table.c.step,
 
  58                    sa.Integer).label('housenumber')
 
  61 def _interpolated_position(table: SaFromClause) -> SaLabel:
 
  62     fac = sa.cast(table.c.step, sa.Float) / (table.c.endnumber - table.c.startnumber)
 
  63     rounded_pos = sa.func.round(table.c.position / fac) * fac
 
  65              (table.c.endnumber == table.c.startnumber, table.c.linegeo.ST_Centroid()),
 
  66               else_=table.c.linegeo.ST_LineInterpolatePoint(rounded_pos)).label('centroid')
 
  69 def _locate_interpolation(table: SaFromClause, wkt: WKTElement) -> SaLabel:
 
  70     """ Given a position, locate the closest point on the line.
 
  72     return sa.case((table.c.linegeo.ST_GeometryType() == 'ST_LineString',
 
  73                     sa.func.ST_LineLocatePoint(table.c.linegeo, wkt)),
 
  74                    else_=0).label('position')
 
  77 def _is_address_point(table: SaFromClause) -> SaColumn:
 
  78     return sa.and_(table.c.rank_address == 30,
 
  79                    sa.or_(table.c.housenumber != None,
 
  80                           table.c.name.has_key('housename')))
 
  82 def _get_closest(*rows: Optional[SaRow]) -> Optional[SaRow]:
 
  83     return min(rows, key=lambda row: 1000 if row is None else row.distance)
 
  85 class ReverseGeocoder:
 
  86     """ Class implementing the logic for looking up a place from a
 
  90     def __init__(self, conn: SearchConnection, params: ReverseDetails) -> None:
 
  96     def max_rank(self) -> int:
 
  97         """ Return the maximum configured rank.
 
  99         return self.params.max_rank
 
 102     def has_geometries(self) -> bool:
 
 103         """ Check if any geometries are requested.
 
 105         return bool(self.params.geometry_output)
 
 108     def layer_enabled(self, *layer: DataLayer) -> bool:
 
 109         """ Return true when any of the given layer types are requested.
 
 111         return any(self.params.layers & l for l in layer)
 
 114     def layer_disabled(self, *layer: DataLayer) -> bool:
 
 115         """ Return true when none of the given layer types is requested.
 
 117         return not any(self.params.layers & l for l in layer)
 
 120     def has_feature_layers(self) -> bool:
 
 121         """ Return true if any layer other than ADDRESS or POI is requested.
 
 123         return self.layer_enabled(DataLayer.RAILWAY, DataLayer.MANMADE, DataLayer.NATURAL)
 
 125     def _add_geometry_columns(self, sql: SaSelect, col: SaColumn) -> SaSelect:
 
 126         if not self.has_geometries():
 
 131         if self.params.geometry_simplification > 0.0:
 
 132             col = col.ST_SimplifyPreserveTopology(self.params.geometry_simplification)
 
 134         if self.params.geometry_output & GeometryFormat.GEOJSON:
 
 135             out.append(col.ST_AsGeoJSON().label('geometry_geojson'))
 
 136         if self.params.geometry_output & GeometryFormat.TEXT:
 
 137             out.append(col.ST_AsText().label('geometry_text'))
 
 138         if self.params.geometry_output & GeometryFormat.KML:
 
 139             out.append(col.ST_AsKML().label('geometry_kml'))
 
 140         if self.params.geometry_output & GeometryFormat.SVG:
 
 141             out.append(col.ST_AsSVG().label('geometry_svg'))
 
 143         return sql.add_columns(*out)
 
 146     def _filter_by_layer(self, table: SaFromClause) -> SaColumn:
 
 147         if self.layer_enabled(DataLayer.MANMADE):
 
 149             if self.layer_disabled(DataLayer.RAILWAY):
 
 150                 exclude.append('railway')
 
 151             if self.layer_disabled(DataLayer.NATURAL):
 
 152                 exclude.extend(('natural', 'water', 'waterway'))
 
 153             return table.c.class_.not_in(tuple(exclude))
 
 156         if self.layer_enabled(DataLayer.RAILWAY):
 
 157             include.append('railway')
 
 158         if self.layer_enabled(DataLayer.NATURAL):
 
 159             include.extend(('natural', 'water', 'waterway'))
 
 160         return table.c.class_.in_(tuple(include))
 
 163     async def _find_closest_street_or_poi(self, wkt: WKTElement,
 
 164                                           distance: float) -> Optional[SaRow]:
 
 165         """ Look up the closest rank 26+ place in the database, which
 
 166             is closer than the given distance.
 
 168         t = self.conn.t.placex
 
 170         sql = _select_from_placex(t, wkt)\
 
 171                 .where(t.c.geometry.ST_DWithin(wkt, distance))\
 
 172                 .where(t.c.indexed_status == 0)\
 
 173                 .where(t.c.linked_place_id == None)\
 
 174                 .where(sa.or_(t.c.geometry.ST_GeometryType()
 
 175                                           .not_in(('ST_Polygon', 'ST_MultiPolygon')),
 
 176                               t.c.centroid.ST_Distance(wkt) < distance))\
 
 177                 .order_by('distance')\
 
 180         sql = self._add_geometry_columns(sql, t.c.geometry)
 
 182         restrict: List[SaColumn] = []
 
 184         if self.layer_enabled(DataLayer.ADDRESS):
 
 185             restrict.append(sa.and_(t.c.rank_address >= 26,
 
 186                                     t.c.rank_address <= min(29, self.max_rank)))
 
 187             if self.max_rank == 30:
 
 188                 restrict.append(_is_address_point(t))
 
 189         if self.layer_enabled(DataLayer.POI) and self.max_rank == 30:
 
 190             restrict.append(sa.and_(t.c.rank_search == 30,
 
 191                                     t.c.class_.not_in(('place', 'building')),
 
 192                                     t.c.geometry.ST_GeometryType() != 'ST_LineString'))
 
 193         if self.has_feature_layers():
 
 194             restrict.append(sa.and_(t.c.rank_search.between(26, self.max_rank),
 
 195                                     t.c.rank_address == 0,
 
 196                                     self._filter_by_layer(t)))
 
 201         return (await self.conn.execute(sql.where(sa.or_(*restrict)))).one_or_none()
 
 204     async def _find_housenumber_for_street(self, parent_place_id: int,
 
 205                                            wkt: WKTElement) -> Optional[SaRow]:
 
 206         t = self.conn.t.placex
 
 208         sql = _select_from_placex(t, wkt)\
 
 209                 .where(t.c.geometry.ST_DWithin(wkt, 0.001))\
 
 210                 .where(t.c.parent_place_id == parent_place_id)\
 
 211                 .where(_is_address_point(t))\
 
 212                 .where(t.c.indexed_status == 0)\
 
 213                 .where(t.c.linked_place_id == None)\
 
 214                 .order_by('distance')\
 
 217         sql = self._add_geometry_columns(sql, t.c.geometry)
 
 219         return (await self.conn.execute(sql)).one_or_none()
 
 222     async def _find_interpolation_for_street(self, parent_place_id: Optional[int],
 
 224                                              distance: float) -> Optional[SaRow]:
 
 225         t = self.conn.t.osmline
 
 228                         t.c.linegeo.ST_Distance(wkt).label('distance'),
 
 229                         _locate_interpolation(t, wkt))\
 
 230                 .where(t.c.linegeo.ST_DWithin(wkt, distance))\
 
 231                 .where(t.c.startnumber != None)\
 
 232                 .order_by('distance')\
 
 235         if parent_place_id is not None:
 
 236             sql = sql.where(t.c.parent_place_id == parent_place_id)
 
 238         inner = sql.subquery('ipol')
 
 240         sql = sa.select(inner.c.place_id, inner.c.osm_id,
 
 241                         inner.c.parent_place_id, inner.c.address,
 
 242                         _interpolated_housenumber(inner),
 
 243                         _interpolated_position(inner),
 
 244                         inner.c.postcode, inner.c.country_code,
 
 247         if self.has_geometries():
 
 248             sub = sql.subquery('geom')
 
 249             sql = self._add_geometry_columns(sa.select(sub), sub.c.centroid)
 
 251         return (await self.conn.execute(sql)).one_or_none()
 
 254     async def _find_tiger_number_for_street(self, parent_place_id: int,
 
 255                                             parent_type: str, parent_id: int,
 
 256                                             wkt: WKTElement) -> Optional[SaRow]:
 
 257         t = self.conn.t.tiger
 
 260                           t.c.linegeo.ST_Distance(wkt).label('distance'),
 
 261                           _locate_interpolation(t, wkt))\
 
 262                   .where(t.c.linegeo.ST_DWithin(wkt, 0.001))\
 
 263                   .where(t.c.parent_place_id == parent_place_id)\
 
 264                   .order_by('distance')\
 
 268         sql = sa.select(inner.c.place_id,
 
 269                         inner.c.parent_place_id,
 
 270                         sa.literal(parent_type).label('osm_type'),
 
 271                         sa.literal(parent_id).label('osm_id'),
 
 272                         _interpolated_housenumber(inner),
 
 273                         _interpolated_position(inner),
 
 277         if self.has_geometries():
 
 278             sub = sql.subquery('geom')
 
 279             sql = self._add_geometry_columns(sa.select(sub), sub.c.centroid)
 
 281         return (await self.conn.execute(sql)).one_or_none()
 
 284     async def lookup_street_poi(self,
 
 285                                 wkt: WKTElement) -> Tuple[Optional[SaRow], RowFunc]:
 
 286         """ Find a street or POI/address for the given WKT point.
 
 288         log().section('Reverse lookup on street/address level')
 
 290         parent_place_id = None
 
 292         row = await self._find_closest_street_or_poi(wkt, distance)
 
 293         row_func: RowFunc = nres.create_from_placex_row
 
 294         log().var_dump('Result (street/building)', row)
 
 296         # If the closest result was a street, but an address was requested,
 
 297         # check for a housenumber nearby which is part of the street.
 
 299             if self.max_rank > 27 \
 
 300                and self.layer_enabled(DataLayer.ADDRESS) \
 
 301                and row.rank_address <= 27:
 
 303                 parent_place_id = row.place_id
 
 304                 log().comment('Find housenumber for street')
 
 305                 addr_row = await self._find_housenumber_for_street(parent_place_id, wkt)
 
 306                 log().var_dump('Result (street housenumber)', addr_row)
 
 308                 if addr_row is not None:
 
 310                     row_func = nres.create_from_placex_row
 
 311                     distance = addr_row.distance
 
 312                 elif row.country_code == 'us' and parent_place_id is not None:
 
 313                     log().comment('Find TIGER housenumber for street')
 
 314                     addr_row = await self._find_tiger_number_for_street(parent_place_id,
 
 318                     log().var_dump('Result (street Tiger housenumber)', addr_row)
 
 320                     if addr_row is not None:
 
 322                         row_func = nres.create_from_tiger_row
 
 324                 distance = row.distance
 
 326         # Check for an interpolation that is either closer than our result
 
 327         # or belongs to a close street found.
 
 328         if self.max_rank > 27 and self.layer_enabled(DataLayer.ADDRESS):
 
 329             log().comment('Find interpolation for street')
 
 330             addr_row = await self._find_interpolation_for_street(parent_place_id,
 
 332             log().var_dump('Result (street interpolation)', addr_row)
 
 333             if addr_row is not None:
 
 335                 row_func = nres.create_from_osmline_row
 
 340     async def _lookup_area_address(self, wkt: WKTElement) -> Optional[SaRow]:
 
 341         """ Lookup large addressable areas for the given WKT point.
 
 343         log().comment('Reverse lookup by larger address area features')
 
 344         t = self.conn.t.placex
 
 346         # The inner SQL brings results in the right order, so that
 
 347         # later only a minimum of results needs to be checked with ST_Contains.
 
 348         inner = sa.select(t, sa.literal(0.0).label('distance'))\
 
 349                   .where(t.c.rank_search.between(5, self.max_rank))\
 
 350                   .where(t.c.rank_address.between(5, 25))\
 
 351                   .where(t.c.geometry.ST_GeometryType().in_(('ST_Polygon', 'ST_MultiPolygon')))\
 
 352                   .where(t.c.geometry.intersects(wkt))\
 
 353                   .where(t.c.name != None)\
 
 354                   .where(t.c.indexed_status == 0)\
 
 355                   .where(t.c.linked_place_id == None)\
 
 356                   .where(t.c.type != 'postcode')\
 
 357                   .order_by(sa.desc(t.c.rank_search))\
 
 361         sql = _select_from_placex(inner)\
 
 362                   .where(inner.c.geometry.ST_Contains(wkt))\
 
 363                   .order_by(sa.desc(inner.c.rank_search))\
 
 366         sql = self._add_geometry_columns(sql, inner.c.geometry)
 
 368         address_row = (await self.conn.execute(sql)).one_or_none()
 
 369         log().var_dump('Result (area)', address_row)
 
 371         if address_row is not None and address_row.rank_search < self.max_rank:
 
 372             log().comment('Search for better matching place nodes inside the area')
 
 374                               t.c.geometry.ST_Distance(wkt).label('distance'))\
 
 375                       .where(t.c.osm_type == 'N')\
 
 376                       .where(t.c.rank_search > address_row.rank_search)\
 
 377                       .where(t.c.rank_search <= self.max_rank)\
 
 378                       .where(t.c.rank_address.between(5, 25))\
 
 379                       .where(t.c.name != None)\
 
 380                       .where(t.c.indexed_status == 0)\
 
 381                       .where(t.c.linked_place_id == None)\
 
 382                       .where(t.c.type != 'postcode')\
 
 384                                 .ST_Buffer(sa.func.reverse_place_diameter(t.c.rank_search))
 
 386                       .order_by(sa.desc(t.c.rank_search))\
 
 390             touter = self.conn.t.placex.alias('outer')
 
 391             sql = _select_from_placex(inner)\
 
 392                   .join(touter, touter.c.geometry.ST_Contains(inner.c.geometry))\
 
 393                   .where(touter.c.place_id == address_row.place_id)\
 
 394                   .where(inner.c.distance < sa.func.reverse_place_diameter(inner.c.rank_search))\
 
 395                   .order_by(sa.desc(inner.c.rank_search), inner.c.distance)\
 
 398             sql = self._add_geometry_columns(sql, inner.c.geometry)
 
 400             place_address_row = (await self.conn.execute(sql)).one_or_none()
 
 401             log().var_dump('Result (place node)', place_address_row)
 
 403             if place_address_row is not None:
 
 404                 return place_address_row
 
 409     async def _lookup_area_others(self, wkt: WKTElement) -> Optional[SaRow]:
 
 410         t = self.conn.t.placex
 
 412         inner = sa.select(t, t.c.geometry.ST_Distance(wkt).label('distance'))\
 
 413                   .where(t.c.rank_address == 0)\
 
 414                   .where(t.c.rank_search.between(5, self.max_rank))\
 
 415                   .where(t.c.name != None)\
 
 416                   .where(t.c.indexed_status == 0)\
 
 417                   .where(t.c.linked_place_id == None)\
 
 418                   .where(self._filter_by_layer(t))\
 
 420                                 .ST_Buffer(sa.func.reverse_place_diameter(t.c.rank_search))
 
 422                   .order_by(sa.desc(t.c.rank_search))\
 
 426         sql = _select_from_placex(inner)\
 
 427                   .where(sa.or_(inner.c.geometry.ST_GeometryType()
 
 428                                                 .not_in(('ST_Polygon', 'ST_MultiPolygon')),
 
 429                                 inner.c.geometry.ST_Contains(wkt)))\
 
 430                   .order_by(sa.desc(inner.c.rank_search), inner.c.distance)\
 
 433         sql = self._add_geometry_columns(sql, inner.c.geometry)
 
 435         row = (await self.conn.execute(sql)).one_or_none()
 
 436         log().var_dump('Result (non-address feature)', row)
 
 441     async def lookup_area(self, wkt: WKTElement) -> Optional[SaRow]:
 
 442         """ Lookup large areas for the given WKT point.
 
 444         log().section('Reverse lookup by larger area features')
 
 446         if self.layer_enabled(DataLayer.ADDRESS):
 
 447             address_row = await self._lookup_area_address(wkt)
 
 451         if self.has_feature_layers():
 
 452             other_row = await self._lookup_area_others(wkt)
 
 456         return _get_closest(address_row, other_row)
 
 459     async def lookup_country(self, wkt: WKTElement) -> Optional[SaRow]:
 
 460         """ Lookup the country for the given WKT point.
 
 462         log().section('Reverse lookup by country code')
 
 463         t = self.conn.t.country_grid
 
 464         sql = sa.select(t.c.country_code).distinct()\
 
 465                 .where(t.c.geometry.ST_Contains(wkt))
 
 467         ccodes = tuple((r[0] for r in await self.conn.execute(sql)))
 
 468         log().var_dump('Country codes', ccodes)
 
 473         t = self.conn.t.placex
 
 474         if self.max_rank > 4:
 
 475             log().comment('Search for place nodes in country')
 
 478                               t.c.geometry.ST_Distance(wkt).label('distance'))\
 
 479                       .where(t.c.osm_type == 'N')\
 
 480                       .where(t.c.rank_search > 4)\
 
 481                       .where(t.c.rank_search <= self.max_rank)\
 
 482                       .where(t.c.rank_address.between(5, 25))\
 
 483                       .where(t.c.name != None)\
 
 484                       .where(t.c.indexed_status == 0)\
 
 485                       .where(t.c.linked_place_id == None)\
 
 486                       .where(t.c.type != 'postcode')\
 
 487                       .where(t.c.country_code.in_(ccodes))\
 
 489                                 .ST_Buffer(sa.func.reverse_place_diameter(t.c.rank_search))
 
 491                       .order_by(sa.desc(t.c.rank_search))\
 
 495             sql = _select_from_placex(inner)\
 
 496                   .where(inner.c.distance < sa.func.reverse_place_diameter(inner.c.rank_search))\
 
 497                   .order_by(sa.desc(inner.c.rank_search), inner.c.distance)\
 
 500             sql = self._add_geometry_columns(sql, inner.c.geometry)
 
 502             address_row = (await self.conn.execute(sql)).one_or_none()
 
 503             log().var_dump('Result (addressable place node)', address_row)
 
 507         if address_row is None:
 
 508             # Still nothing, then return a country with the appropriate country code.
 
 509             sql = _select_from_placex(t, wkt)\
 
 510                       .where(t.c.country_code.in_(ccodes))\
 
 511                       .where(t.c.rank_address == 4)\
 
 512                       .where(t.c.rank_search == 4)\
 
 513                       .where(t.c.linked_place_id == None)\
 
 514                       .order_by('distance')\
 
 517             sql = self._add_geometry_columns(sql, t.c.geometry)
 
 519             address_row = (await self.conn.execute(sql)).one_or_none()
 
 524     async def lookup(self, coord: AnyPoint) -> Optional[nres.ReverseResult]:
 
 525         """ Look up a single coordinate. Returns the place information,
 
 526             if a place was found near the coordinates or None otherwise.
 
 528         log().function('reverse_lookup', coord=coord, params=self.params)
 
 531         wkt = WKTElement(f'POINT({coord[0]} {coord[1]})', srid=4326)
 
 533         row: Optional[SaRow] = None
 
 534         row_func: RowFunc = nres.create_from_placex_row
 
 536         if self.max_rank >= 26:
 
 537             row, tmp_row_func = await self.lookup_street_poi(wkt)
 
 539                 row_func = tmp_row_func
 
 540         if row is None and self.max_rank > 4:
 
 541             row = await self.lookup_area(wkt)
 
 542         if row is None and self.layer_enabled(DataLayer.ADDRESS):
 
 543             row = await self.lookup_country(wkt)
 
 545         result = row_func(row, nres.ReverseResult)
 
 546         if result is not None:
 
 547             assert row is not None
 
 548             result.distance = row.distance
 
 549             if hasattr(row, 'bbox'):
 
 550                 result.bbox = Bbox.from_wkb(row.bbox.data)
 
 551             await nres.add_result_details(self.conn, [result], self.params)