1 # SPDX-License-Identifier: GPL-3.0-or-later
3 # This file is part of Nominatim. (https://nominatim.org)
5 # Copyright (C) 2023 by the Nominatim developer community.
6 # For a full list of authors see the git log.
8 Implementation of reverse geocoding.
10 from typing import Optional, List, Callable, Type, Tuple, Dict, Any, cast
13 import sqlalchemy as sa
15 from nominatim.typing import SaColumn, SaSelect, SaFromClause, SaLabel, SaRow,\
16 SaBind, SaLambdaSelect
17 from nominatim.api.connection import SearchConnection
18 import nominatim.api.results as nres
19 from nominatim.api.logging import log
20 from nominatim.api.types import AnyPoint, DataLayer, ReverseDetails, GeometryFormat, Bbox
21 from nominatim.db.sqlalchemy_types import Geometry
22 import nominatim.db.sqlalchemy_functions as snfn
24 # In SQLAlchemy expression which compare with NULL need to be expressed with
26 # pylint: disable=singleton-comparison
28 RowFunc = Callable[[Optional[SaRow], Type[nres.ReverseResult]], Optional[nres.ReverseResult]]
30 WKT_PARAM: SaBind = sa.bindparam('wkt', type_=Geometry)
31 MAX_RANK_PARAM: SaBind = sa.bindparam('max_rank')
33 def no_index(expr: SaColumn) -> SaColumn:
34 """ Wrap the given expression, so that the query planner will
35 refrain from using the expression for index lookup.
37 return sa.func.coalesce(sa.null(), expr) # pylint: disable=not-callable
40 def _select_from_placex(t: SaFromClause, use_wkt: bool = True) -> SaSelect:
41 """ Create a select statement with the columns relevant for reverse
45 distance = t.c.distance
46 centroid = t.c.centroid
48 distance = t.c.geometry.ST_Distance(WKT_PARAM)
49 centroid = sa.case((t.c.geometry.is_line_like(), t.c.geometry.ST_ClosestPoint(WKT_PARAM)),
50 else_=t.c.centroid).label('centroid')
53 return sa.select(t.c.place_id, t.c.osm_type, t.c.osm_id, t.c.name,
55 t.c.address, t.c.extratags,
56 t.c.housenumber, t.c.postcode, t.c.country_code,
57 t.c.importance, t.c.wikipedia,
58 t.c.parent_place_id, t.c.rank_address, t.c.rank_search,
60 distance.label('distance'),
61 t.c.geometry.ST_Expand(0).label('bbox'))
64 def _interpolated_housenumber(table: SaFromClause) -> SaLabel:
65 return sa.cast(table.c.startnumber
66 + sa.func.round(((table.c.endnumber - table.c.startnumber) * table.c.position)
67 / table.c.step) * table.c.step,
68 sa.Integer).label('housenumber')
71 def _interpolated_position(table: SaFromClause) -> SaLabel:
72 fac = sa.cast(table.c.step, sa.Float) / (table.c.endnumber - table.c.startnumber)
73 rounded_pos = sa.func.round(table.c.position / fac) * fac
75 (table.c.endnumber == table.c.startnumber, table.c.linegeo.ST_Centroid()),
76 else_=table.c.linegeo.ST_LineInterpolatePoint(rounded_pos)).label('centroid')
79 def _locate_interpolation(table: SaFromClause) -> SaLabel:
80 """ Given a position, locate the closest point on the line.
82 return sa.case((table.c.linegeo.is_line_like(),
83 table.c.linegeo.ST_LineLocatePoint(WKT_PARAM)),
84 else_=0).label('position')
87 def _is_address_point(table: SaFromClause) -> SaColumn:
88 return sa.and_(table.c.rank_address == 30,
89 sa.or_(table.c.housenumber != None,
90 table.c.name.has_key('housename')))
93 def _get_closest(*rows: Optional[SaRow]) -> Optional[SaRow]:
94 return min(rows, key=lambda row: 1000 if row is None else row.distance)
97 class ReverseGeocoder:
98 """ Class implementing the logic for looking up a place from a
102 def __init__(self, conn: SearchConnection, params: ReverseDetails) -> None:
106 self.bind_params: Dict[str, Any] = {'max_rank': params.max_rank}
110 def max_rank(self) -> int:
111 """ Return the maximum configured rank.
113 return self.params.max_rank
116 def has_geometries(self) -> bool:
117 """ Check if any geometries are requested.
119 return bool(self.params.geometry_output)
122 def layer_enabled(self, *layer: DataLayer) -> bool:
123 """ Return true when any of the given layer types are requested.
125 return any(self.params.layers & l for l in layer)
128 def layer_disabled(self, *layer: DataLayer) -> bool:
129 """ Return true when none of the given layer types is requested.
131 return not any(self.params.layers & l for l in layer)
134 def has_feature_layers(self) -> bool:
135 """ Return true if any layer other than ADDRESS or POI is requested.
137 return self.layer_enabled(DataLayer.RAILWAY, DataLayer.MANMADE, DataLayer.NATURAL)
140 def _add_geometry_columns(self, sql: SaLambdaSelect, col: SaColumn) -> SaSelect:
143 if self.params.geometry_simplification > 0.0:
144 col = sa.func.ST_SimplifyPreserveTopology(col, self.params.geometry_simplification)
146 if self.params.geometry_output & GeometryFormat.GEOJSON:
147 out.append(sa.func.ST_AsGeoJSON(col).label('geometry_geojson'))
148 if self.params.geometry_output & GeometryFormat.TEXT:
149 out.append(sa.func.ST_AsText(col).label('geometry_text'))
150 if self.params.geometry_output & GeometryFormat.KML:
151 out.append(sa.func.ST_AsKML(col).label('geometry_kml'))
152 if self.params.geometry_output & GeometryFormat.SVG:
153 out.append(sa.func.ST_AsSVG(col).label('geometry_svg'))
155 return sql.add_columns(*out)
158 def _filter_by_layer(self, table: SaFromClause) -> SaColumn:
159 if self.layer_enabled(DataLayer.MANMADE):
161 if self.layer_disabled(DataLayer.RAILWAY):
162 exclude.append('railway')
163 if self.layer_disabled(DataLayer.NATURAL):
164 exclude.extend(('natural', 'water', 'waterway'))
165 return table.c.class_.not_in(tuple(exclude))
168 if self.layer_enabled(DataLayer.RAILWAY):
169 include.append('railway')
170 if self.layer_enabled(DataLayer.NATURAL):
171 include.extend(('natural', 'water', 'waterway'))
172 return table.c.class_.in_(tuple(include))
175 async def _find_closest_street_or_poi(self, distance: float) -> Optional[SaRow]:
176 """ Look up the closest rank 26+ place in the database, which
177 is closer than the given distance.
179 t = self.conn.t.placex
181 # PostgreSQL must not get the distance as a parameter because
182 # there is a danger it won't be able to proberly estimate index use
183 # when used with prepared statements
184 diststr = sa.text(f"{distance}")
186 sql: SaLambdaSelect = sa.lambda_stmt(lambda: _select_from_placex(t)
187 .where(t.c.geometry.ST_DWithin(WKT_PARAM, diststr))
188 .where(t.c.indexed_status == 0)
189 .where(t.c.linked_place_id == None)
190 .where(sa.or_(sa.not_(t.c.geometry.is_area()),
191 t.c.centroid.ST_Distance(WKT_PARAM) < diststr))
192 .order_by('distance')
195 if self.has_geometries():
196 sql = self._add_geometry_columns(sql, t.c.geometry)
198 restrict: List[SaColumn] = []
200 if self.layer_enabled(DataLayer.ADDRESS):
201 restrict.append(no_index(t.c.rank_address).between(26, min(29, self.max_rank)))
202 if self.max_rank == 30:
203 restrict.append(_is_address_point(t))
204 if self.layer_enabled(DataLayer.POI) and self.max_rank == 30:
205 restrict.append(sa.and_(no_index(t.c.rank_search) == 30,
206 t.c.class_.not_in(('place', 'building')),
207 sa.not_(t.c.geometry.is_line_like())))
208 if self.has_feature_layers():
209 restrict.append(sa.and_(no_index(t.c.rank_search).between(26, MAX_RANK_PARAM),
210 no_index(t.c.rank_address) == 0,
211 self._filter_by_layer(t)))
216 sql = sql.where(sa.or_(*restrict))
218 return (await self.conn.execute(sql, self.bind_params)).one_or_none()
221 async def _find_housenumber_for_street(self, parent_place_id: int) -> Optional[SaRow]:
222 t = self.conn.t.placex
224 sql: SaLambdaSelect = sa.lambda_stmt(lambda: _select_from_placex(t)
225 .where(t.c.geometry.ST_DWithin(WKT_PARAM, 0.001))
226 .where(t.c.parent_place_id == parent_place_id)
227 .where(_is_address_point(t))
228 .where(t.c.indexed_status == 0)
229 .where(t.c.linked_place_id == None)
230 .order_by('distance')
233 if self.has_geometries():
234 sql = self._add_geometry_columns(sql, t.c.geometry)
236 return (await self.conn.execute(sql, self.bind_params)).one_or_none()
239 async def _find_interpolation_for_street(self, parent_place_id: Optional[int],
240 distance: float) -> Optional[SaRow]:
241 t = self.conn.t.osmline
243 sql: Any = sa.lambda_stmt(lambda:
245 t.c.linegeo.ST_Distance(WKT_PARAM).label('distance'),
246 _locate_interpolation(t))
247 .where(t.c.linegeo.ST_DWithin(WKT_PARAM, distance))
248 .where(t.c.startnumber != None)
249 .order_by('distance')
252 if parent_place_id is not None:
253 sql += lambda s: s.where(t.c.parent_place_id == parent_place_id)
255 def _wrap_query(base_sql: SaLambdaSelect) -> SaSelect:
256 inner = base_sql.subquery('ipol')
258 return sa.select(inner.c.place_id, inner.c.osm_id,
259 inner.c.parent_place_id, inner.c.address,
260 _interpolated_housenumber(inner),
261 _interpolated_position(inner),
262 inner.c.postcode, inner.c.country_code,
267 if self.has_geometries():
268 sub = sql.subquery('geom')
269 sql = self._add_geometry_columns(sa.select(sub), sub.c.centroid)
271 return (await self.conn.execute(sql, self.bind_params)).one_or_none()
274 async def _find_tiger_number_for_street(self, parent_place_id: int) -> Optional[SaRow]:
275 t = self.conn.t.tiger
277 def _base_query() -> SaSelect:
279 t.c.linegeo.ST_Distance(WKT_PARAM).label('distance'),
280 _locate_interpolation(t))\
281 .where(t.c.linegeo.ST_DWithin(WKT_PARAM, 0.001))\
282 .where(t.c.parent_place_id == parent_place_id)\
283 .order_by('distance')\
287 return sa.select(inner.c.place_id,
288 inner.c.parent_place_id,
289 _interpolated_housenumber(inner),
290 _interpolated_position(inner),
294 sql: SaLambdaSelect = sa.lambda_stmt(_base_query)
296 if self.has_geometries():
297 sub = sql.subquery('geom')
298 sql = self._add_geometry_columns(sa.select(sub), sub.c.centroid)
300 return (await self.conn.execute(sql, self.bind_params)).one_or_none()
303 async def lookup_street_poi(self) -> Tuple[Optional[SaRow], RowFunc]:
304 """ Find a street or POI/address for the given WKT point.
306 log().section('Reverse lookup on street/address level')
308 parent_place_id = None
310 row = await self._find_closest_street_or_poi(distance)
311 row_func: RowFunc = nres.create_from_placex_row
312 log().var_dump('Result (street/building)', row)
314 # If the closest result was a street, but an address was requested,
315 # check for a housenumber nearby which is part of the street.
317 if self.max_rank > 27 \
318 and self.layer_enabled(DataLayer.ADDRESS) \
319 and row.rank_address <= 27:
321 parent_place_id = row.place_id
322 log().comment('Find housenumber for street')
323 addr_row = await self._find_housenumber_for_street(parent_place_id)
324 log().var_dump('Result (street housenumber)', addr_row)
326 if addr_row is not None:
328 row_func = nres.create_from_placex_row
329 distance = addr_row.distance
330 elif row.country_code == 'us' and parent_place_id is not None:
331 log().comment('Find TIGER housenumber for street')
332 addr_row = await self._find_tiger_number_for_street(parent_place_id)
333 log().var_dump('Result (street Tiger housenumber)', addr_row)
335 if addr_row is not None:
336 row_func = cast(RowFunc,
337 functools.partial(nres.create_from_tiger_row,
338 osm_type=row.osm_type,
342 distance = row.distance
344 # Check for an interpolation that is either closer than our result
345 # or belongs to a close street found.
346 if self.max_rank > 27 and self.layer_enabled(DataLayer.ADDRESS):
347 log().comment('Find interpolation for street')
348 addr_row = await self._find_interpolation_for_street(parent_place_id,
350 log().var_dump('Result (street interpolation)', addr_row)
351 if addr_row is not None:
353 row_func = nres.create_from_osmline_row
358 async def _lookup_area_address(self) -> Optional[SaRow]:
359 """ Lookup large addressable areas for the given WKT point.
361 log().comment('Reverse lookup by larger address area features')
362 t = self.conn.t.placex
364 def _base_query() -> SaSelect:
365 # The inner SQL brings results in the right order, so that
366 # later only a minimum of results needs to be checked with ST_Contains.
367 inner = sa.select(t, sa.literal(0.0).label('distance'))\
368 .where(t.c.rank_search.between(5, MAX_RANK_PARAM))\
369 .where(t.c.geometry.intersects(WKT_PARAM))\
370 .where(snfn.select_index_placex_geometry_reverse_lookuppolygon('placex'))\
371 .order_by(sa.desc(t.c.rank_search))\
375 return _select_from_placex(inner, False)\
376 .where(inner.c.geometry.ST_Contains(WKT_PARAM))\
377 .order_by(sa.desc(inner.c.rank_search))\
380 sql: SaLambdaSelect = sa.lambda_stmt(_base_query)
381 if self.has_geometries():
382 sql = self._add_geometry_columns(sql, sa.literal_column('area.geometry'))
384 address_row = (await self.conn.execute(sql, self.bind_params)).one_or_none()
385 log().var_dump('Result (area)', address_row)
387 if address_row is not None and address_row.rank_search < self.max_rank:
388 log().comment('Search for better matching place nodes inside the area')
390 address_rank = address_row.rank_search
391 address_id = address_row.place_id
393 def _place_inside_area_query() -> SaSelect:
396 t.c.geometry.ST_Distance(WKT_PARAM).label('distance'))\
397 .where(t.c.rank_search > address_rank)\
398 .where(t.c.rank_search <= MAX_RANK_PARAM)\
399 .where(t.c.indexed_status == 0)\
400 .where(snfn.select_index_placex_geometry_reverse_lookupplacenode('placex'))\
402 .ST_Buffer(sa.func.reverse_place_diameter(t.c.rank_search))
403 .intersects(WKT_PARAM))\
404 .order_by(sa.desc(t.c.rank_search))\
408 touter = t.alias('outer')
409 return _select_from_placex(inner, False)\
410 .join(touter, touter.c.geometry.ST_Contains(inner.c.geometry))\
411 .where(touter.c.place_id == address_id)\
412 .where(inner.c.distance < sa.func.reverse_place_diameter(inner.c.rank_search))\
413 .order_by(sa.desc(inner.c.rank_search), inner.c.distance)\
416 sql = sa.lambda_stmt(_place_inside_area_query)
417 if self.has_geometries():
418 sql = self._add_geometry_columns(sql, sa.literal_column('places.geometry'))
420 place_address_row = (await self.conn.execute(sql, self.bind_params)).one_or_none()
421 log().var_dump('Result (place node)', place_address_row)
423 if place_address_row is not None:
424 return place_address_row
429 async def _lookup_area_others(self) -> Optional[SaRow]:
430 t = self.conn.t.placex
432 inner = sa.select(t, t.c.geometry.ST_Distance(WKT_PARAM).label('distance'))\
433 .where(t.c.rank_address == 0)\
434 .where(t.c.rank_search.between(5, MAX_RANK_PARAM))\
435 .where(t.c.name != None)\
436 .where(t.c.indexed_status == 0)\
437 .where(t.c.linked_place_id == None)\
438 .where(self._filter_by_layer(t))\
440 .ST_Buffer(sa.func.reverse_place_diameter(t.c.rank_search))
441 .intersects(WKT_PARAM))\
442 .order_by(sa.desc(t.c.rank_search))\
446 sql = _select_from_placex(inner, False)\
447 .where(sa.or_(sa.not_(inner.c.geometry.is_area()),
448 inner.c.geometry.ST_Contains(WKT_PARAM)))\
449 .order_by(sa.desc(inner.c.rank_search), inner.c.distance)\
452 if self.has_geometries():
453 sql = self._add_geometry_columns(sql, inner.c.geometry)
455 row = (await self.conn.execute(sql, self.bind_params)).one_or_none()
456 log().var_dump('Result (non-address feature)', row)
461 async def lookup_area(self) -> Optional[SaRow]:
462 """ Lookup large areas for the current search.
464 log().section('Reverse lookup by larger area features')
466 if self.layer_enabled(DataLayer.ADDRESS):
467 address_row = await self._lookup_area_address()
471 if self.has_feature_layers():
472 other_row = await self._lookup_area_others()
476 return _get_closest(address_row, other_row)
479 async def lookup_country(self) -> Optional[SaRow]:
480 """ Lookup the country for the current search.
482 log().section('Reverse lookup by country code')
483 t = self.conn.t.country_grid
484 sql: SaLambdaSelect = sa.select(t.c.country_code).distinct()\
485 .where(t.c.geometry.ST_Contains(WKT_PARAM))
487 ccodes = tuple((r[0] for r in await self.conn.execute(sql, self.bind_params)))
488 log().var_dump('Country codes', ccodes)
493 t = self.conn.t.placex
494 if self.max_rank > 4:
495 log().comment('Search for place nodes in country')
497 def _base_query() -> SaSelect:
500 t.c.geometry.ST_Distance(WKT_PARAM).label('distance'))\
501 .where(t.c.rank_search > 4)\
502 .where(t.c.rank_search <= MAX_RANK_PARAM)\
503 .where(t.c.indexed_status == 0)\
504 .where(t.c.country_code.in_(ccodes))\
505 .where(snfn.select_index_placex_geometry_reverse_lookupplacenode('placex'))\
507 .ST_Buffer(sa.func.reverse_place_diameter(t.c.rank_search))
508 .intersects(WKT_PARAM))\
509 .order_by(sa.desc(t.c.rank_search))\
513 return _select_from_placex(inner, False)\
514 .where(inner.c.distance < sa.func.reverse_place_diameter(inner.c.rank_search))\
515 .order_by(sa.desc(inner.c.rank_search), inner.c.distance)\
518 sql = sa.lambda_stmt(_base_query)
519 if self.has_geometries():
520 sql = self._add_geometry_columns(sql, sa.literal_column('area.geometry'))
522 address_row = (await self.conn.execute(sql, self.bind_params)).one_or_none()
523 log().var_dump('Result (addressable place node)', address_row)
527 if address_row is None:
528 # Still nothing, then return a country with the appropriate country code.
529 sql = sa.lambda_stmt(lambda: _select_from_placex(t)\
530 .where(t.c.country_code.in_(ccodes))\
531 .where(t.c.rank_address == 4)\
532 .where(t.c.rank_search == 4)\
533 .where(t.c.linked_place_id == None)\
534 .order_by('distance')\
537 if self.has_geometries():
538 sql = self._add_geometry_columns(sql, t.c.geometry)
540 address_row = (await self.conn.execute(sql, self.bind_params)).one_or_none()
545 async def lookup(self, coord: AnyPoint) -> Optional[nres.ReverseResult]:
546 """ Look up a single coordinate. Returns the place information,
547 if a place was found near the coordinates or None otherwise.
549 log().function('reverse_lookup', coord=coord, params=self.params)
552 self.bind_params['wkt'] = f'POINT({coord[0]} {coord[1]})'
554 row: Optional[SaRow] = None
555 row_func: RowFunc = nres.create_from_placex_row
557 if self.max_rank >= 26:
558 row, tmp_row_func = await self.lookup_street_poi()
560 row_func = tmp_row_func
561 if row is None and self.max_rank > 4:
562 row = await self.lookup_area()
563 if row is None and self.layer_enabled(DataLayer.ADDRESS):
564 row = await self.lookup_country()
566 result = row_func(row, nres.ReverseResult)
567 if result is not None:
568 assert row is not None
569 result.distance = row.distance
570 if hasattr(row, 'bbox'):
571 result.bbox = Bbox.from_wkb(row.bbox)
572 await nres.add_result_details(self.conn, [result], self.params)