1 # SPDX-License-Identifier: GPL-3.0-or-later
3 # This file is part of Nominatim. (https://nominatim.org)
5 # Copyright (C) 2024 by the Nominatim developer community.
6 # For a full list of authors see the git log.
8 Implementation of reverse geocoding.
10 from typing import Optional, List, Callable, Type, Tuple, Dict, Any, cast, Union
13 import sqlalchemy as sa
15 from .typing import SaColumn, SaSelect, SaFromClause, SaLabel, SaRow, \
16 SaBind, SaLambdaSelect
17 from .sql.sqlalchemy_types import Geometry
18 from .connection import SearchConnection
19 from . import results as nres
20 from .logging import log
21 from .types import AnyPoint, DataLayer, ReverseDetails, GeometryFormat, Bbox
24 RowFunc = Callable[[SaRow, Type[nres.ReverseResult]], nres.ReverseResult]
26 WKT_PARAM: SaBind = sa.bindparam('wkt', type_=Geometry)
27 MAX_RANK_PARAM: SaBind = sa.bindparam('max_rank')
30 def no_index(expr: SaColumn) -> SaColumn:
31 """ Wrap the given expression, so that the query planner will
32 refrain from using the expression for index lookup.
34 return sa.func.coalesce(sa.null(), expr)
37 def _select_from_placex(t: SaFromClause, use_wkt: bool = True) -> SaSelect:
38 """ Create a select statement with the columns relevant for reverse
42 distance = t.c.distance
43 centroid = t.c.centroid
45 distance = t.c.geometry.ST_Distance(WKT_PARAM)
46 centroid = sa.case((t.c.geometry.is_line_like(), t.c.geometry.ST_ClosestPoint(WKT_PARAM)),
47 else_=t.c.centroid).label('centroid')
49 return sa.select(t.c.place_id, t.c.osm_type, t.c.osm_id, t.c.name,
51 t.c.address, t.c.extratags,
52 t.c.housenumber, t.c.postcode, t.c.country_code,
53 t.c.importance, t.c.wikipedia,
54 t.c.parent_place_id, t.c.rank_address, t.c.rank_search,
56 t.c.linked_place_id, t.c.admin_level,
57 distance.label('distance'),
58 t.c.geometry.ST_Expand(0).label('bbox'))
61 def _interpolated_housenumber(table: SaFromClause) -> SaLabel:
62 return sa.cast(table.c.startnumber
63 + sa.func.round(((table.c.endnumber - table.c.startnumber) * table.c.position)
64 / table.c.step) * table.c.step,
65 sa.Integer).label('housenumber')
68 def _interpolated_position(table: SaFromClause) -> SaLabel:
69 fac = sa.cast(table.c.step, sa.Float) / (table.c.endnumber - table.c.startnumber)
70 rounded_pos = sa.func.round(table.c.position / fac) * fac
72 (table.c.endnumber == table.c.startnumber, table.c.linegeo.ST_Centroid()),
73 else_=table.c.linegeo.ST_LineInterpolatePoint(rounded_pos)).label('centroid')
76 def _locate_interpolation(table: SaFromClause) -> SaLabel:
77 """ Given a position, locate the closest point on the line.
79 return sa.case((table.c.linegeo.is_line_like(),
80 table.c.linegeo.ST_LineLocatePoint(WKT_PARAM)),
81 else_=0).label('position')
84 def _get_closest(*rows: Optional[SaRow]) -> Optional[SaRow]:
85 return min(rows, key=lambda row: 1000 if row is None else row.distance)
88 class ReverseGeocoder:
89 """ Class implementing the logic for looking up a place from a
93 def __init__(self, conn: SearchConnection, params: ReverseDetails,
94 restrict_to_country_areas: bool = False) -> None:
97 self.restrict_to_country_areas = restrict_to_country_areas
99 self.bind_params: Dict[str, Any] = {'max_rank': params.max_rank}
102 def max_rank(self) -> int:
103 """ Return the maximum configured rank.
105 return self.params.max_rank
107 def has_geometries(self) -> bool:
108 """ Check if any geometries are requested.
110 return bool(self.params.geometry_output)
112 def layer_enabled(self, *layer: DataLayer) -> bool:
113 """ Return true when any of the given layer types are requested.
115 return any(self.params.layers & ly for ly in layer)
117 def layer_disabled(self, *layer: DataLayer) -> bool:
118 """ Return true when none of the given layer types is requested.
120 return not any(self.params.layers & ly for ly in layer)
122 def has_feature_layers(self) -> bool:
123 """ Return true if any layer other than ADDRESS or POI is requested.
125 return self.layer_enabled(DataLayer.RAILWAY, DataLayer.MANMADE, DataLayer.NATURAL)
127 def _add_geometry_columns(self, sql: SaLambdaSelect, col: SaColumn) -> SaSelect:
130 if self.params.geometry_simplification > 0.0:
131 col = sa.func.ST_SimplifyPreserveTopology(col, self.params.geometry_simplification)
133 if self.params.geometry_output & GeometryFormat.GEOJSON:
134 out.append(sa.func.ST_AsGeoJSON(col, 7).label('geometry_geojson'))
135 if self.params.geometry_output & GeometryFormat.TEXT:
136 out.append(sa.func.ST_AsText(col).label('geometry_text'))
137 if self.params.geometry_output & GeometryFormat.KML:
138 out.append(sa.func.ST_AsKML(col, 7).label('geometry_kml'))
139 if self.params.geometry_output & GeometryFormat.SVG:
140 out.append(sa.func.ST_AsSVG(col, 0, 7).label('geometry_svg'))
142 return sql.add_columns(*out)
144 def _filter_by_layer(self, table: SaFromClause) -> SaColumn:
145 if self.layer_enabled(DataLayer.MANMADE):
147 if self.layer_disabled(DataLayer.RAILWAY):
148 exclude.append('railway')
149 if self.layer_disabled(DataLayer.NATURAL):
150 exclude.extend(('natural', 'water', 'waterway'))
151 return table.c.class_.not_in(tuple(exclude))
154 if self.layer_enabled(DataLayer.RAILWAY):
155 include.append('railway')
156 if self.layer_enabled(DataLayer.NATURAL):
157 include.extend(('natural', 'water', 'waterway'))
158 return table.c.class_.in_(tuple(include))
160 async def _find_closest_street_or_poi(self, distance: float) -> Optional[SaRow]:
161 """ Look up the closest rank 26+ place in the database, which
162 is closer than the given distance.
164 t = self.conn.t.placex
166 # PostgreSQL must not get the distance as a parameter because
167 # there is a danger it won't be able to properly estimate index use
168 # when used with prepared statements
169 diststr = sa.text(f"{distance}")
171 sql: SaLambdaSelect = sa.lambda_stmt(
172 lambda: _select_from_placex(t)
173 .where(t.c.geometry.within_distance(WKT_PARAM, diststr))
174 .where(t.c.indexed_status == 0)
175 .where(t.c.linked_place_id == None)
176 .where(sa.or_(sa.not_(t.c.geometry.is_area()),
177 t.c.centroid.ST_Distance(WKT_PARAM) < diststr))
178 .order_by('distance')
181 if self.has_geometries():
182 sql = self._add_geometry_columns(sql, t.c.geometry)
184 restrict: List[Union[SaColumn, Callable[[], SaColumn]]] = []
186 if self.layer_enabled(DataLayer.ADDRESS):
187 max_rank = min(29, self.max_rank)
188 restrict.append(lambda: no_index(t.c.rank_address).between(26, max_rank))
189 if self.max_rank == 30:
190 restrict.append(lambda: sa.func.IsAddressPoint(t))
191 if self.layer_enabled(DataLayer.POI) and self.max_rank == 30:
192 restrict.append(lambda: sa.and_(no_index(t.c.rank_search) == 30,
193 t.c.class_.not_in(('place', 'building')),
194 sa.not_(t.c.geometry.is_line_like())))
195 if self.has_feature_layers():
196 restrict.append(sa.and_(no_index(t.c.rank_search).between(26, MAX_RANK_PARAM),
197 no_index(t.c.rank_address) == 0,
198 self._filter_by_layer(t)))
203 sql = sql.where(sa.or_(*restrict))
205 # If the closest object is inside an area, then check if there is a
206 # POI node nearby and return that.
208 for row in await self.conn.execute(sql, self.bind_params):
210 if row.rank_search <= 27 or row.osm_type == 'N' or row.distance > 0:
214 if row.rank_search > 27 and row.osm_type == 'N'\
215 and row.distance < 0.0001:
220 async def _find_housenumber_for_street(self, parent_place_id: int) -> Optional[SaRow]:
221 t = self.conn.t.placex
223 def _base_query() -> SaSelect:
224 return _select_from_placex(t)\
225 .where(t.c.geometry.within_distance(WKT_PARAM, 0.001))\
226 .where(t.c.parent_place_id == parent_place_id)\
227 .where(sa.func.IsAddressPoint(t))\
228 .where(t.c.indexed_status == 0)\
229 .where(t.c.linked_place_id == None)\
230 .order_by('distance')\
234 if self.has_geometries():
235 sql = self._add_geometry_columns(_base_query(), t.c.geometry)
237 sql = sa.lambda_stmt(_base_query)
239 return (await self.conn.execute(sql, self.bind_params)).one_or_none()
241 async def _find_interpolation_for_street(self, parent_place_id: Optional[int],
242 distance: float) -> Optional[SaRow]:
243 t = self.conn.t.osmline
246 t.c.linegeo.ST_Distance(WKT_PARAM).label('distance'),
247 _locate_interpolation(t))\
248 .where(t.c.linegeo.within_distance(WKT_PARAM, distance))\
249 .where(t.c.startnumber != None)\
250 .order_by('distance')\
253 if parent_place_id is not None:
254 sql = sql.where(t.c.parent_place_id == parent_place_id)
256 inner = sql.subquery('ipol')
258 sql = sa.select(inner.c.place_id, inner.c.osm_id,
259 inner.c.parent_place_id, inner.c.address,
260 _interpolated_housenumber(inner),
261 _interpolated_position(inner),
262 inner.c.postcode, inner.c.country_code,
265 if self.has_geometries():
266 sub = sql.subquery('geom')
267 sql = self._add_geometry_columns(sa.select(sub), sub.c.centroid)
269 return (await self.conn.execute(sql, self.bind_params)).one_or_none()
271 async def _find_tiger_number_for_street(self, parent_place_id: int) -> Optional[SaRow]:
272 t = self.conn.t.tiger
274 def _base_query() -> SaSelect:
276 t.c.linegeo.ST_Distance(WKT_PARAM).label('distance'),
277 _locate_interpolation(t))\
278 .where(t.c.linegeo.within_distance(WKT_PARAM, 0.001))\
279 .where(t.c.parent_place_id == parent_place_id)\
280 .order_by('distance')\
284 return sa.select(inner.c.place_id,
285 inner.c.parent_place_id,
286 _interpolated_housenumber(inner),
287 _interpolated_position(inner),
292 if self.has_geometries():
293 sub = _base_query().subquery('geom')
294 sql = self._add_geometry_columns(sa.select(sub), sub.c.centroid)
296 sql = sa.lambda_stmt(_base_query)
298 return (await self.conn.execute(sql, self.bind_params)).one_or_none()
300 async def lookup_street_poi(self) -> Tuple[Optional[SaRow], RowFunc]:
301 """ Find a street or POI/address for the given WKT point.
303 log().section('Reverse lookup on street/address level')
305 parent_place_id = None
307 row = await self._find_closest_street_or_poi(distance)
308 row_func: RowFunc = nres.create_from_placex_row
309 log().var_dump('Result (street/building)', row)
311 # If the closest result was a street, but an address was requested,
312 # check for a housenumber nearby which is part of the street.
314 if self.max_rank > 27 \
315 and self.layer_enabled(DataLayer.ADDRESS) \
316 and row.rank_address <= 27:
318 parent_place_id = row.place_id
319 log().comment('Find housenumber for street')
320 addr_row = await self._find_housenumber_for_street(parent_place_id)
321 log().var_dump('Result (street housenumber)', addr_row)
323 if addr_row is not None:
325 row_func = nres.create_from_placex_row
326 distance = addr_row.distance
327 elif row.country_code == 'us' and parent_place_id is not None:
328 log().comment('Find TIGER housenumber for street')
329 addr_row = await self._find_tiger_number_for_street(parent_place_id)
330 log().var_dump('Result (street Tiger housenumber)', addr_row)
332 if addr_row is not None:
333 row_func = cast(RowFunc,
334 functools.partial(nres.create_from_tiger_row,
335 osm_type=row.osm_type,
339 distance = row.distance
341 # Check for an interpolation that is either closer than our result
342 # or belongs to a close street found.
343 # No point in doing this when the result is already inside a building,
344 # i.e. when the distance is already 0.
345 if self.max_rank > 27 and self.layer_enabled(DataLayer.ADDRESS) and distance > 0:
346 log().comment('Find interpolation for street')
347 addr_row = await self._find_interpolation_for_street(parent_place_id,
349 log().var_dump('Result (street interpolation)', addr_row)
350 if addr_row is not None:
352 row_func = nres.create_from_osmline_row
356 async def _lookup_area_address(self) -> Optional[SaRow]:
357 """ Lookup large addressable areas for the given WKT point.
359 log().comment('Reverse lookup by larger address area features')
360 t = self.conn.t.placex
362 def _base_query() -> SaSelect:
363 # The inner SQL brings results in the right order, so that
364 # later only a minimum of results needs to be checked with ST_Contains.
365 inner = sa.select(t, sa.literal(0.0).label('distance'))\
366 .where(t.c.rank_search.between(5, MAX_RANK_PARAM))\
367 .where(t.c.rank_address != 5)\
368 .where(t.c.rank_address != 11)\
369 .where(t.c.geometry.intersects(WKT_PARAM))\
370 .where(sa.func.PlacexGeometryReverseLookuppolygon())\
371 .order_by(sa.desc(t.c.rank_search))\
375 return _select_from_placex(inner, False)\
376 .where(inner.c.geometry.ST_Contains(WKT_PARAM))\
377 .order_by(sa.desc(inner.c.rank_search))\
380 sql: SaLambdaSelect = sa.lambda_stmt(_base_query)
381 if self.has_geometries():
382 sql = self._add_geometry_columns(sql, sa.literal_column('area.geometry'))
384 address_row = (await self.conn.execute(sql, self.bind_params)).one_or_none()
385 log().var_dump('Result (area)', address_row)
387 if address_row is not None and address_row.rank_search < self.max_rank:
388 log().comment('Search for better matching place nodes inside the area')
390 address_rank = address_row.rank_search
391 address_id = address_row.place_id
393 def _place_inside_area_query() -> SaSelect:
395 sa.select(t, t.c.geometry.ST_Distance(WKT_PARAM).label('distance'))\
396 .where(t.c.rank_search > address_rank)\
397 .where(t.c.rank_search <= MAX_RANK_PARAM)\
398 .where(t.c.indexed_status == 0)\
399 .where(sa.func.IntersectsReverseDistance(t, WKT_PARAM))\
400 .order_by(sa.desc(t.c.rank_search))\
404 touter = t.alias('outer')
405 return _select_from_placex(inner, False)\
406 .join(touter, touter.c.geometry.ST_Contains(inner.c.geometry))\
407 .where(touter.c.place_id == address_id)\
408 .where(sa.func.IsBelowReverseDistance(inner.c.distance, inner.c.rank_search))\
409 .order_by(sa.desc(inner.c.rank_search), inner.c.distance)\
412 if self.has_geometries():
413 sql = self._add_geometry_columns(_place_inside_area_query(),
414 sa.literal_column('places.geometry'))
416 sql = sa.lambda_stmt(_place_inside_area_query)
418 place_address_row = (await self.conn.execute(sql, self.bind_params)).one_or_none()
419 log().var_dump('Result (place node)', place_address_row)
421 if place_address_row is not None:
422 return place_address_row
426 async def _lookup_area_others(self) -> Optional[SaRow]:
427 t = self.conn.t.placex
429 inner = sa.select(t, t.c.geometry.ST_Distance(WKT_PARAM).label('distance'))\
430 .where(t.c.rank_address == 0)\
431 .where(t.c.rank_search.between(5, MAX_RANK_PARAM))\
432 .where(t.c.name != None)\
433 .where(t.c.indexed_status == 0)\
434 .where(t.c.linked_place_id == None)\
435 .where(self._filter_by_layer(t))\
436 .where(t.c.geometry.intersects(sa.func.ST_Expand(WKT_PARAM, 0.007)))\
437 .order_by(sa.desc(t.c.rank_search))\
438 .order_by('distance')\
442 sql = _select_from_placex(inner, False)\
443 .where(sa.or_(sa.not_(inner.c.geometry.is_area()),
444 inner.c.geometry.ST_Contains(WKT_PARAM)))\
445 .order_by(sa.desc(inner.c.rank_search), inner.c.distance)\
448 if self.has_geometries():
449 sql = self._add_geometry_columns(sql, inner.c.geometry)
451 row = (await self.conn.execute(sql, self.bind_params)).one_or_none()
452 log().var_dump('Result (non-address feature)', row)
456 async def lookup_area(self) -> Optional[SaRow]:
457 """ Lookup large areas for the current search.
459 log().section('Reverse lookup by larger area features')
461 if self.layer_enabled(DataLayer.ADDRESS):
462 address_row = await self._lookup_area_address()
466 if self.has_feature_layers():
467 other_row = await self._lookup_area_others()
471 return _get_closest(address_row, other_row)
473 async def lookup_country_codes(self) -> List[str]:
474 """ Lookup the country for the current search.
476 log().section('Reverse lookup by country code')
477 t = self.conn.t.country_grid
478 sql = sa.select(t.c.country_code).distinct()\
479 .where(t.c.geometry.ST_Contains(WKT_PARAM))
481 ccodes = [cast(str, r[0]) for r in await self.conn.execute(sql, self.bind_params)]
482 log().var_dump('Country codes', ccodes)
485 async def lookup_country(self, ccodes: List[str]) -> Tuple[Optional[SaRow], RowFunc]:
486 """ Lookup the country for the current search.
488 row_func = nres.create_from_placex_row
490 ccodes = await self.lookup_country_codes()
493 return None, row_func
495 t = self.conn.t.placex
496 if self.max_rank > 4:
497 log().comment('Search for place nodes in country')
499 def _base_query() -> SaSelect:
500 inner = sa.select(t, t.c.geometry.ST_Distance(WKT_PARAM).label('distance'))\
501 .where(t.c.rank_search > 4)\
502 .where(t.c.rank_search <= MAX_RANK_PARAM)\
503 .where(t.c.indexed_status == 0)\
504 .where(t.c.country_code.in_(ccodes))\
505 .where(sa.func.IntersectsReverseDistance(t, WKT_PARAM))\
506 .order_by(sa.desc(t.c.rank_search))\
510 return _select_from_placex(inner, False)\
511 .where(sa.func.IsBelowReverseDistance(inner.c.distance, inner.c.rank_search))\
512 .order_by(sa.desc(inner.c.rank_search), inner.c.distance)\
516 if self.has_geometries():
517 sql = self._add_geometry_columns(_base_query(),
518 sa.literal_column('area.geometry'))
520 sql = sa.lambda_stmt(_base_query)
522 address_row = (await self.conn.execute(sql, self.bind_params)).one_or_none()
523 log().var_dump('Result (addressable place node)', address_row)
527 if address_row is None:
528 # Still nothing, then return a country with the appropriate country code.
529 def _country_base_query() -> SaSelect:
530 return _select_from_placex(t)\
531 .where(t.c.country_code.in_(ccodes))\
532 .where(t.c.rank_address == 4)\
533 .where(t.c.rank_search == 4)\
534 .where(t.c.linked_place_id == None)\
535 .order_by('distance')\
538 if self.has_geometries():
539 sql = self._add_geometry_columns(_country_base_query(), t.c.geometry)
541 sql = sa.lambda_stmt(_country_base_query)
543 address_row = (await self.conn.execute(sql, self.bind_params)).one_or_none()
545 if address_row is None:
546 # finally fall back to country table
547 t = self.conn.t.country_name
548 tgrid = self.conn.t.country_grid
550 sql = sa.select(tgrid.c.country_code,
551 tgrid.c.geometry.ST_Centroid().ST_Collect().ST_Centroid()
553 tgrid.c.geometry.ST_Collect().ST_Expand(0).label('bbox'))\
554 .where(tgrid.c.country_code.in_(ccodes))\
555 .group_by(tgrid.c.country_code)
557 sub = sql.subquery('grid')
558 sql = sa.select(t.c.country_code,
559 t.c.name.merge(t.c.derived_name).label('name'),
560 sub.c.centroid, sub.c.bbox)\
561 .join(sub, t.c.country_code == sub.c.country_code)\
562 .order_by(t.c.country_code)\
565 sql = self._add_geometry_columns(sql, sub.c.centroid)
567 address_row = (await self.conn.execute(sql, self.bind_params)).one_or_none()
568 row_func = nres.create_from_country_row
570 return address_row, row_func
572 async def lookup(self, coord: AnyPoint) -> Optional[nres.ReverseResult]:
573 """ Look up a single coordinate. Returns the place information,
574 if a place was found near the coordinates or None otherwise.
576 log().function('reverse_lookup', coord=coord, params=self.params)
578 self.bind_params['wkt'] = f'POINT({coord[0]} {coord[1]})'
580 row: Optional[SaRow] = None
581 row_func: RowFunc = nres.create_from_placex_row
583 if self.max_rank >= 26:
584 row, tmp_row_func = await self.lookup_street_poi()
586 row_func = tmp_row_func
589 if self.restrict_to_country_areas:
590 ccodes = await self.lookup_country_codes()
596 if self.max_rank > 4:
597 row = await self.lookup_area()
598 if row is None and self.layer_enabled(DataLayer.ADDRESS):
599 row, row_func = await self.lookup_country(ccodes)
604 result = row_func(row, nres.ReverseResult)
605 result.distance = getattr(row, 'distance', 0)
606 if hasattr(row, 'bbox'):
607 result.bbox = Bbox.from_wkb(row.bbox)
608 await nres.add_result_details(self.conn, [result], self.params)