From 11d624e92abc8d2d3aa5f53f1f64fedf4efb1aee Mon Sep 17 00:00:00 2001 From: Sarah Hoffmann Date: Tue, 1 Jul 2025 22:57:04 +0200 Subject: [PATCH] split db_searches moving each class in its own file --- src/nominatim_api/search/db_searches.py | 867 ------------------ .../search/db_searches/__init__.py | 16 + src/nominatim_api/search/db_searches/base.py | 144 +++ .../search/db_searches/country_search.py | 119 +++ .../search/db_searches/near_search.py | 136 +++ .../search/db_searches/place_search.py | 385 ++++++++ .../search/db_searches/poi_search.py | 114 +++ .../search/db_searches/postcode_search.py | 129 +++ 8 files changed, 1043 insertions(+), 867 deletions(-) delete mode 100644 src/nominatim_api/search/db_searches.py create mode 100644 src/nominatim_api/search/db_searches/__init__.py create mode 100644 src/nominatim_api/search/db_searches/base.py create mode 100644 src/nominatim_api/search/db_searches/country_search.py create mode 100644 src/nominatim_api/search/db_searches/near_search.py create mode 100644 src/nominatim_api/search/db_searches/place_search.py create mode 100644 src/nominatim_api/search/db_searches/poi_search.py create mode 100644 src/nominatim_api/search/db_searches/postcode_search.py diff --git a/src/nominatim_api/search/db_searches.py b/src/nominatim_api/search/db_searches.py deleted file mode 100644 index 8e37c748..00000000 --- a/src/nominatim_api/search/db_searches.py +++ /dev/null @@ -1,867 +0,0 @@ -# SPDX-License-Identifier: GPL-3.0-or-later -# -# This file is part of Nominatim. (https://nominatim.org) -# -# Copyright (C) 2024 by the Nominatim developer community. -# For a full list of authors see the git log. -""" -Implementation of the actual database accesses for forward search. -""" -from typing import List, Tuple, AsyncIterator, Dict, Any, Callable, cast -import abc - -import sqlalchemy as sa - -from ..typing import SaFromClause, SaScalarSelect, SaColumn, \ - SaExpression, SaSelect, SaLambdaSelect, SaRow, SaBind -from ..sql.sqlalchemy_types import Geometry, IntArray -from ..connection import SearchConnection -from ..types import SearchDetails, DataLayer, GeometryFormat, Bbox -from .. import results as nres -from .db_search_fields import SearchData, WeightedCategories - - -def no_index(expr: SaColumn) -> SaColumn: - """ Wrap the given expression, so that the query planner will - refrain from using the expression for index lookup. - """ - return sa.func.coalesce(sa.null(), expr) - - -def _details_to_bind_params(details: SearchDetails) -> Dict[str, Any]: - """ Create a dictionary from search parameters that can be used - as bind parameter for SQL execute. - """ - return {'limit': details.max_results, - 'min_rank': details.min_rank, - 'max_rank': details.max_rank, - 'viewbox': details.viewbox, - 'viewbox2': details.viewbox_x2, - 'near': details.near, - 'near_radius': details.near_radius, - 'excluded': details.excluded, - 'countries': details.countries} - - -LIMIT_PARAM: SaBind = sa.bindparam('limit') -MIN_RANK_PARAM: SaBind = sa.bindparam('min_rank') -MAX_RANK_PARAM: SaBind = sa.bindparam('max_rank') -VIEWBOX_PARAM: SaBind = sa.bindparam('viewbox', type_=Geometry) -VIEWBOX2_PARAM: SaBind = sa.bindparam('viewbox2', type_=Geometry) -NEAR_PARAM: SaBind = sa.bindparam('near', type_=Geometry) -NEAR_RADIUS_PARAM: SaBind = sa.bindparam('near_radius') -COUNTRIES_PARAM: SaBind = sa.bindparam('countries') - - -def filter_by_area(sql: SaSelect, t: SaFromClause, - details: SearchDetails, avoid_index: bool = False) -> SaSelect: - """ Apply SQL statements for filtering by viewbox and near point, - if applicable. - """ - if details.near is not None and details.near_radius is not None: - if details.near_radius < 0.1 and not avoid_index: - sql = sql.where(t.c.geometry.within_distance(NEAR_PARAM, NEAR_RADIUS_PARAM)) - else: - sql = sql.where(t.c.geometry.ST_Distance(NEAR_PARAM) <= NEAR_RADIUS_PARAM) - if details.viewbox is not None and details.bounded_viewbox: - sql = sql.where(t.c.geometry.intersects(VIEWBOX_PARAM, - use_index=not avoid_index and - details.viewbox.area < 0.2)) - - return sql - - -def _exclude_places(t: SaFromClause) -> Callable[[], SaExpression]: - return lambda: t.c.place_id.not_in(sa.bindparam('excluded')) - - -def _select_placex(t: SaFromClause) -> SaSelect: - return sa.select(t.c.place_id, t.c.osm_type, t.c.osm_id, t.c.name, - t.c.class_, t.c.type, - t.c.address, t.c.extratags, - t.c.housenumber, t.c.postcode, t.c.country_code, - t.c.wikipedia, - t.c.parent_place_id, t.c.rank_address, t.c.rank_search, - t.c.linked_place_id, t.c.admin_level, - t.c.centroid, - t.c.geometry.ST_Expand(0).label('bbox')) - - -def _add_geometry_columns(sql: SaLambdaSelect, col: SaColumn, details: SearchDetails) -> SaSelect: - out = [] - - if details.geometry_simplification > 0.0: - col = sa.func.ST_SimplifyPreserveTopology(col, details.geometry_simplification) - - if details.geometry_output & GeometryFormat.GEOJSON: - out.append(sa.func.ST_AsGeoJSON(col, 7).label('geometry_geojson')) - if details.geometry_output & GeometryFormat.TEXT: - out.append(sa.func.ST_AsText(col).label('geometry_text')) - if details.geometry_output & GeometryFormat.KML: - out.append(sa.func.ST_AsKML(col, 7).label('geometry_kml')) - if details.geometry_output & GeometryFormat.SVG: - out.append(sa.func.ST_AsSVG(col, 0, 7).label('geometry_svg')) - - return sql.add_columns(*out) - - -def _make_interpolation_subquery(table: SaFromClause, inner: SaFromClause, - numerals: List[int], details: SearchDetails) -> SaScalarSelect: - all_ids = sa.func.ArrayAgg(table.c.place_id) - sql = sa.select(all_ids).where(table.c.parent_place_id == inner.c.place_id) - - if len(numerals) == 1: - sql = sql.where(sa.between(numerals[0], table.c.startnumber, table.c.endnumber))\ - .where((numerals[0] - table.c.startnumber) % table.c.step == 0) - else: - sql = sql.where(sa.or_( - *(sa.and_(sa.between(n, table.c.startnumber, table.c.endnumber), - (n - table.c.startnumber) % table.c.step == 0) - for n in numerals))) - - if details.excluded: - sql = sql.where(_exclude_places(table)) - - return sql.scalar_subquery() - - -def _filter_by_layer(table: SaFromClause, layers: DataLayer) -> SaColumn: - orexpr: List[SaExpression] = [] - if layers & DataLayer.ADDRESS and layers & DataLayer.POI: - orexpr.append(no_index(table.c.rank_address).between(1, 30)) - elif layers & DataLayer.ADDRESS: - orexpr.append(no_index(table.c.rank_address).between(1, 29)) - orexpr.append(sa.func.IsAddressPoint(table)) - elif layers & DataLayer.POI: - orexpr.append(sa.and_(no_index(table.c.rank_address) == 30, - table.c.class_.not_in(('place', 'building')))) - - if layers & DataLayer.MANMADE: - exclude = [] - if not layers & DataLayer.RAILWAY: - exclude.append('railway') - if not layers & DataLayer.NATURAL: - exclude.extend(('natural', 'water', 'waterway')) - orexpr.append(sa.and_(table.c.class_.not_in(tuple(exclude)), - no_index(table.c.rank_address) == 0)) - else: - include = [] - if layers & DataLayer.RAILWAY: - include.append('railway') - if layers & DataLayer.NATURAL: - include.extend(('natural', 'water', 'waterway')) - orexpr.append(sa.and_(table.c.class_.in_(tuple(include)), - no_index(table.c.rank_address) == 0)) - - if len(orexpr) == 1: - return orexpr[0] - - return sa.or_(*orexpr) - - -def _interpolated_position(table: SaFromClause, nr: SaColumn) -> SaColumn: - pos = sa.cast(nr - table.c.startnumber, sa.Float) / (table.c.endnumber - table.c.startnumber) - return sa.case( - (table.c.endnumber == table.c.startnumber, table.c.linegeo.ST_Centroid()), - else_=table.c.linegeo.ST_LineInterpolatePoint(pos)).label('centroid') - - -async def _get_placex_housenumbers(conn: SearchConnection, - place_ids: List[int], - details: SearchDetails) -> AsyncIterator[nres.SearchResult]: - t = conn.t.placex - sql = _select_placex(t).add_columns(t.c.importance)\ - .where(t.c.place_id.in_(place_ids)) - - if details.geometry_output: - sql = _add_geometry_columns(sql, t.c.geometry, details) - - for row in await conn.execute(sql): - result = nres.create_from_placex_row(row, nres.SearchResult) - assert result - result.bbox = Bbox.from_wkb(row.bbox) - yield result - - -def _int_list_to_subquery(inp: List[int]) -> 'sa.Subquery': - """ Create a subselect that returns the given list of integers - as rows in the column 'nr'. - """ - vtab = sa.func.JsonArrayEach(sa.type_coerce(inp, sa.JSON))\ - .table_valued(sa.column('value', type_=sa.JSON)) - return sa.select(sa.cast(sa.cast(vtab.c.value, sa.Text), sa.Integer).label('nr')).subquery() - - -async def _get_osmline(conn: SearchConnection, place_ids: List[int], - numerals: List[int], - details: SearchDetails) -> AsyncIterator[nres.SearchResult]: - t = conn.t.osmline - - values = _int_list_to_subquery(numerals) - sql = sa.select(t.c.place_id, t.c.osm_id, - t.c.parent_place_id, t.c.address, - values.c.nr.label('housenumber'), - _interpolated_position(t, values.c.nr), - t.c.postcode, t.c.country_code)\ - .where(t.c.place_id.in_(place_ids))\ - .join(values, values.c.nr.between(t.c.startnumber, t.c.endnumber)) - - if details.geometry_output: - sub = sql.subquery() - sql = _add_geometry_columns(sa.select(sub), sub.c.centroid, details) - - for row in await conn.execute(sql): - result = nres.create_from_osmline_row(row, nres.SearchResult) - assert result - yield result - - -async def _get_tiger(conn: SearchConnection, place_ids: List[int], - numerals: List[int], osm_id: int, - details: SearchDetails) -> AsyncIterator[nres.SearchResult]: - t = conn.t.tiger - values = _int_list_to_subquery(numerals) - sql = sa.select(t.c.place_id, t.c.parent_place_id, - sa.literal('W').label('osm_type'), - sa.literal(osm_id).label('osm_id'), - values.c.nr.label('housenumber'), - _interpolated_position(t, values.c.nr), - t.c.postcode)\ - .where(t.c.place_id.in_(place_ids))\ - .join(values, values.c.nr.between(t.c.startnumber, t.c.endnumber)) - - if details.geometry_output: - sub = sql.subquery() - sql = _add_geometry_columns(sa.select(sub), sub.c.centroid, details) - - for row in await conn.execute(sql): - result = nres.create_from_tiger_row(row, nres.SearchResult) - assert result - yield result - - -class AbstractSearch(abc.ABC): - """ Encapuslation of a single lookup in the database. - """ - SEARCH_PRIO: int = 2 - - def __init__(self, penalty: float) -> None: - self.penalty = penalty - - @abc.abstractmethod - async def lookup(self, conn: SearchConnection, - details: SearchDetails) -> nres.SearchResults: - """ Find results for the search in the database. - """ - - -class NearSearch(AbstractSearch): - """ Category search of a place type near the result of another search. - """ - def __init__(self, penalty: float, categories: WeightedCategories, - search: AbstractSearch) -> None: - super().__init__(penalty) - self.search = search - self.categories = categories - - async def lookup(self, conn: SearchConnection, - details: SearchDetails) -> nres.SearchResults: - """ Find results for the search in the database. - """ - results = nres.SearchResults() - base = await self.search.lookup(conn, details) - - if not base: - return results - - base.sort(key=lambda r: (r.accuracy, r.rank_search)) - max_accuracy = base[0].accuracy + 0.5 - if base[0].rank_address == 0: - min_rank = 0 - max_rank = 0 - elif base[0].rank_address < 26: - min_rank = 1 - max_rank = min(25, base[0].rank_address + 4) - else: - min_rank = 26 - max_rank = 30 - base = nres.SearchResults(r for r in base - if (r.source_table == nres.SourceTable.PLACEX - and r.accuracy <= max_accuracy - and r.bbox and r.bbox.area < 20 - and r.rank_address >= min_rank - and r.rank_address <= max_rank)) - - if base: - baseids = [b.place_id for b in base[:5] if b.place_id] - - for category, penalty in self.categories: - await self.lookup_category(results, conn, baseids, category, penalty, details) - if len(results) >= details.max_results: - break - - return results - - async def lookup_category(self, results: nres.SearchResults, - conn: SearchConnection, ids: List[int], - category: Tuple[str, str], penalty: float, - details: SearchDetails) -> None: - """ Find places of the given category near the list of - place ids and add the results to 'results'. - """ - table = await conn.get_class_table(*category) - - tgeom = conn.t.placex.alias('pgeom') - - if table is None: - # No classtype table available, do a simplified lookup in placex. - table = conn.t.placex - sql = sa.select(table.c.place_id, - sa.func.min(tgeom.c.centroid.ST_Distance(table.c.centroid)) - .label('dist'))\ - .join(tgeom, table.c.geometry.intersects(tgeom.c.centroid.ST_Expand(0.01)))\ - .where(table.c.class_ == category[0])\ - .where(table.c.type == category[1]) - else: - # Use classtype table. We can afford to use a larger - # radius for the lookup. - sql = sa.select(table.c.place_id, - sa.func.min(tgeom.c.centroid.ST_Distance(table.c.centroid)) - .label('dist'))\ - .join(tgeom, - table.c.centroid.ST_CoveredBy( - sa.case((sa.and_(tgeom.c.rank_address > 9, - tgeom.c.geometry.is_area()), - tgeom.c.geometry), - else_=tgeom.c.centroid.ST_Expand(0.05)))) - - inner = sql.where(tgeom.c.place_id.in_(ids))\ - .group_by(table.c.place_id).subquery() - - t = conn.t.placex - sql = _select_placex(t).add_columns((-inner.c.dist).label('importance'))\ - .join(inner, inner.c.place_id == t.c.place_id)\ - .order_by(inner.c.dist) - - sql = sql.where(no_index(t.c.rank_address).between(MIN_RANK_PARAM, MAX_RANK_PARAM)) - if details.countries: - sql = sql.where(t.c.country_code.in_(COUNTRIES_PARAM)) - if details.excluded: - sql = sql.where(_exclude_places(t)) - if details.layers is not None: - sql = sql.where(_filter_by_layer(t, details.layers)) - - sql = sql.limit(LIMIT_PARAM) - for row in await conn.execute(sql, _details_to_bind_params(details)): - result = nres.create_from_placex_row(row, nres.SearchResult) - assert result - result.accuracy = self.penalty + penalty - result.bbox = Bbox.from_wkb(row.bbox) - results.append(result) - - -class PoiSearch(AbstractSearch): - """ Category search in a geographic area. - """ - def __init__(self, sdata: SearchData) -> None: - super().__init__(sdata.penalty) - self.qualifiers = sdata.qualifiers - self.countries = sdata.countries - - async def lookup(self, conn: SearchConnection, - details: SearchDetails) -> nres.SearchResults: - """ Find results for the search in the database. - """ - bind_params = _details_to_bind_params(details) - t = conn.t.placex - - rows: List[SaRow] = [] - - if details.near and details.near_radius is not None and details.near_radius < 0.2: - # simply search in placex table - def _base_query() -> SaSelect: - return _select_placex(t) \ - .add_columns((-t.c.centroid.ST_Distance(NEAR_PARAM)) - .label('importance'))\ - .where(t.c.linked_place_id == None) \ - .where(t.c.geometry.within_distance(NEAR_PARAM, NEAR_RADIUS_PARAM)) \ - .order_by(t.c.centroid.ST_Distance(NEAR_PARAM)) \ - .limit(LIMIT_PARAM) - - classtype = self.qualifiers.values - if len(classtype) == 1: - cclass, ctype = classtype[0] - sql: SaLambdaSelect = sa.lambda_stmt( - lambda: _base_query().where(t.c.class_ == cclass) - .where(t.c.type == ctype)) - else: - sql = _base_query().where(sa.or_(*(sa.and_(t.c.class_ == cls, t.c.type == typ) - for cls, typ in classtype))) - - if self.countries: - sql = sql.where(t.c.country_code.in_(self.countries.values)) - - if details.viewbox is not None and details.bounded_viewbox: - sql = sql.where(t.c.geometry.intersects(VIEWBOX_PARAM)) - - rows.extend(await conn.execute(sql, bind_params)) - else: - # use the class type tables - for category in self.qualifiers.values: - table = await conn.get_class_table(*category) - if table is not None: - sql = _select_placex(t)\ - .add_columns(t.c.importance)\ - .join(table, t.c.place_id == table.c.place_id)\ - .where(t.c.class_ == category[0])\ - .where(t.c.type == category[1]) - - if details.viewbox is not None and details.bounded_viewbox: - sql = sql.where(table.c.centroid.intersects(VIEWBOX_PARAM)) - - if details.near and details.near_radius is not None: - sql = sql.order_by(table.c.centroid.ST_Distance(NEAR_PARAM))\ - .where(table.c.centroid.within_distance(NEAR_PARAM, - NEAR_RADIUS_PARAM)) - - if self.countries: - sql = sql.where(t.c.country_code.in_(self.countries.values)) - - sql = sql.limit(LIMIT_PARAM) - rows.extend(await conn.execute(sql, bind_params)) - - results = nres.SearchResults() - for row in rows: - result = nres.create_from_placex_row(row, nres.SearchResult) - assert result - result.accuracy = self.penalty + self.qualifiers.get_penalty((row.class_, row.type)) - result.bbox = Bbox.from_wkb(row.bbox) - results.append(result) - - return results - - -class CountrySearch(AbstractSearch): - """ Search for a country name or country code. - """ - SEARCH_PRIO = 0 - - def __init__(self, sdata: SearchData) -> None: - super().__init__(sdata.penalty) - self.countries = sdata.countries - - async def lookup(self, conn: SearchConnection, - details: SearchDetails) -> nres.SearchResults: - """ Find results for the search in the database. - """ - t = conn.t.placex - - ccodes = self.countries.values - sql = _select_placex(t)\ - .add_columns(t.c.importance)\ - .where(t.c.country_code.in_(ccodes))\ - .where(t.c.rank_address == 4) - - if details.geometry_output: - sql = _add_geometry_columns(sql, t.c.geometry, details) - - if details.excluded: - sql = sql.where(_exclude_places(t)) - - sql = filter_by_area(sql, t, details) - - results = nres.SearchResults() - for row in await conn.execute(sql, _details_to_bind_params(details)): - result = nres.create_from_placex_row(row, nres.SearchResult) - assert result - result.accuracy = self.penalty + self.countries.get_penalty(row.country_code, 5.0) - result.bbox = Bbox.from_wkb(row.bbox) - results.append(result) - - if not results: - results = await self.lookup_in_country_table(conn, details) - - if results: - details.min_rank = min(5, details.max_rank) - details.max_rank = min(25, details.max_rank) - - return results - - async def lookup_in_country_table(self, conn: SearchConnection, - details: SearchDetails) -> nres.SearchResults: - """ Look up the country in the fallback country tables. - """ - # Avoid the fallback search when this is a more search. Country results - # usually are in the first batch of results and it is not possible - # to exclude these fallbacks. - if details.excluded: - return nres.SearchResults() - - t = conn.t.country_name - tgrid = conn.t.country_grid - - sql = sa.select(tgrid.c.country_code, - tgrid.c.geometry.ST_Centroid().ST_Collect().ST_Centroid() - .label('centroid'), - tgrid.c.geometry.ST_Collect().ST_Expand(0).label('bbox'))\ - .where(tgrid.c.country_code.in_(self.countries.values))\ - .group_by(tgrid.c.country_code) - - sql = filter_by_area(sql, tgrid, details, avoid_index=True) - - sub = sql.subquery('grid') - - sql = sa.select(t.c.country_code, - t.c.name.merge(t.c.derived_name).label('name'), - sub.c.centroid, sub.c.bbox)\ - .join(sub, t.c.country_code == sub.c.country_code) - - if details.geometry_output: - sql = _add_geometry_columns(sql, sub.c.centroid, details) - - results = nres.SearchResults() - for row in await conn.execute(sql, _details_to_bind_params(details)): - result = nres.create_from_country_row(row, nres.SearchResult) - assert result - result.bbox = Bbox.from_wkb(row.bbox) - result.accuracy = self.penalty + self.countries.get_penalty(row.country_code, 5.0) - results.append(result) - - return results - - -class PostcodeSearch(AbstractSearch): - """ Search for a postcode. - """ - def __init__(self, extra_penalty: float, sdata: SearchData) -> None: - super().__init__(sdata.penalty + extra_penalty) - self.countries = sdata.countries - self.postcodes = sdata.postcodes - self.lookups = sdata.lookups - self.rankings = sdata.rankings - - async def lookup(self, conn: SearchConnection, - details: SearchDetails) -> nres.SearchResults: - """ Find results for the search in the database. - """ - t = conn.t.postcode - pcs = self.postcodes.values - - sql = sa.select(t.c.place_id, t.c.parent_place_id, - t.c.rank_search, t.c.rank_address, - t.c.postcode, t.c.country_code, - t.c.geometry.label('centroid'))\ - .where(t.c.postcode.in_(pcs)) - - if details.geometry_output: - sql = _add_geometry_columns(sql, t.c.geometry, details) - - penalty: SaExpression = sa.literal(self.penalty) - - if details.viewbox is not None and not details.bounded_viewbox: - penalty += sa.case((t.c.geometry.intersects(VIEWBOX_PARAM), 0.0), - (t.c.geometry.intersects(VIEWBOX2_PARAM), 0.5), - else_=1.0) - - if details.near is not None: - sql = sql.order_by(t.c.geometry.ST_Distance(NEAR_PARAM)) - - sql = filter_by_area(sql, t, details) - - if self.countries: - sql = sql.where(t.c.country_code.in_(self.countries.values)) - - if details.excluded: - sql = sql.where(_exclude_places(t)) - - if self.lookups: - assert len(self.lookups) == 1 - tsearch = conn.t.search_name - sql = sql.where(tsearch.c.place_id == t.c.parent_place_id)\ - .where((tsearch.c.name_vector + tsearch.c.nameaddress_vector) - .contains(sa.type_coerce(self.lookups[0].tokens, - IntArray))) - # Do NOT add rerank penalties based on the address terms. - # The standard rerank penalty only checks the address vector - # while terms may appear in name and address vector. This would - # lead to overly high penalties. - # We assume that a postcode is precise enough to not require - # additional full name matches. - - penalty += sa.case(*((t.c.postcode == v, p) for v, p in self.postcodes), - else_=1.0) - - sql = sql.add_columns(penalty.label('accuracy')) - sql = sql.order_by('accuracy').limit(LIMIT_PARAM) - - results = nres.SearchResults() - for row in await conn.execute(sql, _details_to_bind_params(details)): - p = conn.t.placex - placex_sql = _select_placex(p)\ - .add_columns(p.c.importance)\ - .where(sa.text("""class = 'boundary' - AND type = 'postal_code' - AND osm_type = 'R'"""))\ - .where(p.c.country_code == row.country_code)\ - .where(p.c.postcode == row.postcode)\ - .limit(1) - - if details.geometry_output: - placex_sql = _add_geometry_columns(placex_sql, p.c.geometry, details) - - for prow in await conn.execute(placex_sql, _details_to_bind_params(details)): - result = nres.create_from_placex_row(prow, nres.SearchResult) - if result is not None: - result.bbox = Bbox.from_wkb(prow.bbox) - break - else: - result = nres.create_from_postcode_row(row, nres.SearchResult) - - assert result - if result.place_id not in details.excluded: - result.accuracy = row.accuracy - results.append(result) - - return results - - -class PlaceSearch(AbstractSearch): - """ Generic search for an address or named place. - """ - SEARCH_PRIO = 1 - - def __init__(self, extra_penalty: float, sdata: SearchData, expected_count: int) -> None: - super().__init__(sdata.penalty + extra_penalty) - self.countries = sdata.countries - self.postcodes = sdata.postcodes - self.housenumbers = sdata.housenumbers - self.qualifiers = sdata.qualifiers - self.lookups = sdata.lookups - self.rankings = sdata.rankings - self.expected_count = expected_count - - def _inner_search_name_cte(self, conn: SearchConnection, - details: SearchDetails) -> 'sa.CTE': - """ Create a subquery that preselects the rows in the search_name - table. - """ - t = conn.t.search_name - - penalty: SaExpression = sa.literal(self.penalty) - for ranking in self.rankings: - penalty += ranking.sql_penalty(t) - - sql = sa.select(t.c.place_id, t.c.search_rank, t.c.address_rank, - t.c.country_code, t.c.centroid, - t.c.name_vector, t.c.nameaddress_vector, - sa.case((t.c.importance > 0, t.c.importance), - else_=0.40001-(sa.cast(t.c.search_rank, sa.Float())/75)) - .label('importance'), - penalty.label('penalty')) - - for lookup in self.lookups: - sql = sql.where(lookup.sql_condition(t)) - - if self.countries: - sql = sql.where(t.c.country_code.in_(self.countries.values)) - - if self.postcodes: - # if a postcode is given, don't search for state or country level objects - sql = sql.where(t.c.address_rank > 9) - if self.expected_count > 10000: - # Many results expected. Restrict by postcode. - tpc = conn.t.postcode - sql = sql.where(sa.select(tpc.c.postcode) - .where(tpc.c.postcode.in_(self.postcodes.values)) - .where(t.c.centroid.within_distance(tpc.c.geometry, 0.4)) - .exists()) - - if details.viewbox is not None: - if details.bounded_viewbox: - sql = sql.where(t.c.centroid - .intersects(VIEWBOX_PARAM, - use_index=details.viewbox.area < 0.2)) - elif not self.postcodes and not self.housenumbers and self.expected_count >= 10000: - sql = sql.where(t.c.centroid - .intersects(VIEWBOX2_PARAM, - use_index=details.viewbox.area < 0.5)) - - if details.near is not None and details.near_radius is not None: - if details.near_radius < 0.1: - sql = sql.where(t.c.centroid.within_distance(NEAR_PARAM, - NEAR_RADIUS_PARAM)) - else: - sql = sql.where(t.c.centroid - .ST_Distance(NEAR_PARAM) < NEAR_RADIUS_PARAM) - - if self.housenumbers: - sql = sql.where(t.c.address_rank.between(16, 30)) - else: - if details.excluded: - sql = sql.where(_exclude_places(t)) - if details.min_rank > 0: - sql = sql.where(sa.or_(t.c.address_rank >= MIN_RANK_PARAM, - t.c.search_rank >= MIN_RANK_PARAM)) - if details.max_rank < 30: - sql = sql.where(sa.or_(t.c.address_rank <= MAX_RANK_PARAM, - t.c.search_rank <= MAX_RANK_PARAM)) - - inner = sql.limit(10000).order_by(sa.desc(sa.text('importance'))).subquery() - - sql = sa.select(inner.c.place_id, inner.c.search_rank, inner.c.address_rank, - inner.c.country_code, inner.c.centroid, inner.c.importance, - inner.c.penalty) - - # If the query is not an address search or has a geographic preference, - # preselect most important items to restrict the number of places - # that need to be looked up in placex. - if not self.housenumbers\ - and (details.viewbox is None or details.bounded_viewbox)\ - and (details.near is None or details.near_radius is not None)\ - and not self.qualifiers: - sql = sql.add_columns(sa.func.first_value(inner.c.penalty - inner.c.importance) - .over(order_by=inner.c.penalty - inner.c.importance) - .label('min_penalty')) - - inner = sql.subquery() - - sql = sa.select(inner.c.place_id, inner.c.search_rank, inner.c.address_rank, - inner.c.country_code, inner.c.centroid, inner.c.importance, - inner.c.penalty)\ - .where(inner.c.penalty - inner.c.importance < inner.c.min_penalty + 0.5) - - return sql.cte('searches') - - async def lookup(self, conn: SearchConnection, - details: SearchDetails) -> nres.SearchResults: - """ Find results for the search in the database. - """ - t = conn.t.placex - tsearch = self._inner_search_name_cte(conn, details) - - sql = _select_placex(t).join(tsearch, t.c.place_id == tsearch.c.place_id) - - if details.geometry_output: - sql = _add_geometry_columns(sql, t.c.geometry, details) - - penalty: SaExpression = tsearch.c.penalty - - if self.postcodes: - tpc = conn.t.postcode - pcs = self.postcodes.values - - pc_near = sa.select(sa.func.min(tpc.c.geometry.ST_Distance(t.c.centroid)))\ - .where(tpc.c.postcode.in_(pcs))\ - .scalar_subquery() - penalty += sa.case((t.c.postcode.in_(pcs), 0.0), - else_=sa.func.coalesce(pc_near, cast(SaColumn, 2.0))) - - if details.viewbox is not None and not details.bounded_viewbox: - penalty += sa.case((t.c.geometry.intersects(VIEWBOX_PARAM, use_index=False), 0.0), - (t.c.geometry.intersects(VIEWBOX2_PARAM, use_index=False), 0.5), - else_=1.0) - - if details.near is not None: - sql = sql.add_columns((-tsearch.c.centroid.ST_Distance(NEAR_PARAM)) - .label('importance')) - sql = sql.order_by(sa.desc(sa.text('importance'))) - else: - sql = sql.order_by(penalty - tsearch.c.importance) - sql = sql.add_columns(tsearch.c.importance) - - sql = sql.add_columns(penalty.label('accuracy'))\ - .order_by(sa.text('accuracy')) - - if self.housenumbers: - hnr_list = '|'.join(self.housenumbers.values) - inner = sql.where(sa.or_(tsearch.c.address_rank < 30, - sa.func.RegexpWord(hnr_list, t.c.housenumber)))\ - .subquery() - - # Housenumbers from placex - thnr = conn.t.placex.alias('hnr') - pid_list = sa.func.ArrayAgg(thnr.c.place_id) - place_sql = sa.select(pid_list)\ - .where(thnr.c.parent_place_id == inner.c.place_id)\ - .where(sa.func.RegexpWord(hnr_list, thnr.c.housenumber))\ - .where(thnr.c.linked_place_id == None)\ - .where(thnr.c.indexed_status == 0) - - if details.excluded: - place_sql = place_sql.where(thnr.c.place_id.not_in(sa.bindparam('excluded'))) - if self.qualifiers: - place_sql = place_sql.where(self.qualifiers.sql_restrict(thnr)) - - numerals = [int(n) for n in self.housenumbers.values - if n.isdigit() and len(n) < 8] - interpol_sql: SaColumn - tiger_sql: SaColumn - if numerals and \ - (not self.qualifiers or ('place', 'house') in self.qualifiers.values): - # Housenumbers from interpolations - interpol_sql = _make_interpolation_subquery(conn.t.osmline, inner, - numerals, details) - # Housenumbers from Tiger - tiger_sql = sa.case((inner.c.country_code == 'us', - _make_interpolation_subquery(conn.t.tiger, inner, - numerals, details) - ), else_=None) - else: - interpol_sql = sa.null() - tiger_sql = sa.null() - - unsort = sa.select(inner, place_sql.scalar_subquery().label('placex_hnr'), - interpol_sql.label('interpol_hnr'), - tiger_sql.label('tiger_hnr')).subquery('unsort') - sql = sa.select(unsort)\ - .order_by(sa.case((unsort.c.placex_hnr != None, 1), - (unsort.c.interpol_hnr != None, 2), - (unsort.c.tiger_hnr != None, 3), - else_=4), - unsort.c.accuracy) - else: - sql = sql.where(t.c.linked_place_id == None)\ - .where(t.c.indexed_status == 0) - if self.qualifiers: - sql = sql.where(self.qualifiers.sql_restrict(t)) - if details.layers is not None: - sql = sql.where(_filter_by_layer(t, details.layers)) - - sql = sql.limit(LIMIT_PARAM) - - results = nres.SearchResults() - for row in await conn.execute(sql, _details_to_bind_params(details)): - result = nres.create_from_placex_row(row, nres.SearchResult) - assert result - result.bbox = Bbox.from_wkb(row.bbox) - result.accuracy = row.accuracy - if self.housenumbers and row.rank_address < 30: - if row.placex_hnr: - subs = _get_placex_housenumbers(conn, row.placex_hnr, details) - elif row.interpol_hnr: - subs = _get_osmline(conn, row.interpol_hnr, numerals, details) - elif row.tiger_hnr: - subs = _get_tiger(conn, row.tiger_hnr, numerals, row.osm_id, details) - else: - subs = None - - if subs is not None: - async for sub in subs: - assert sub.housenumber - sub.accuracy = result.accuracy - if not any(nr in self.housenumbers.values - for nr in sub.housenumber.split(';')): - sub.accuracy += 0.6 - results.append(sub) - - # Only add the street as a result, if it meets all other - # filter conditions. - if (not details.excluded or result.place_id not in details.excluded)\ - and (not self.qualifiers or result.category in self.qualifiers.values)\ - and result.rank_address >= details.min_rank: - result.accuracy += 1.0 # penalty for missing housenumber - results.append(result) - else: - results.append(result) - - return results diff --git a/src/nominatim_api/search/db_searches/__init__.py b/src/nominatim_api/search/db_searches/__init__.py new file mode 100644 index 00000000..fb63d874 --- /dev/null +++ b/src/nominatim_api/search/db_searches/__init__.py @@ -0,0 +1,16 @@ +# SPDX-License-Identifier: GPL-3.0-or-later +# +# This file is part of Nominatim. (https://nominatim.org) +# +# Copyright (C) 2025 by the Nominatim developer community. +# For a full list of authors see the git log. +""" +Module implementing the actual database accesses for forward search. +""" + +from .base import AbstractSearch as AbstractSearch +from .near_search import NearSearch as NearSearch +from .poi_search import PoiSearch as PoiSearch +from .country_search import CountrySearch as CountrySearch +from .postcode_search import PostcodeSearch as PostcodeSearch +from .place_search import PlaceSearch as PlaceSearch diff --git a/src/nominatim_api/search/db_searches/base.py b/src/nominatim_api/search/db_searches/base.py new file mode 100644 index 00000000..f8823087 --- /dev/null +++ b/src/nominatim_api/search/db_searches/base.py @@ -0,0 +1,144 @@ +# SPDX-License-Identifier: GPL-3.0-or-later +# +# This file is part of Nominatim. (https://nominatim.org) +# +# Copyright (C) 2025 by the Nominatim developer community. +# For a full list of authors see the git log. +""" +Interface for classes implementing a database search. +""" +from typing import Callable, List +import abc + +import sqlalchemy as sa + +from ...typing import SaFromClause, SaSelect, SaColumn, SaExpression, SaLambdaSelect +from ...sql.sqlalchemy_types import Geometry +from ...connection import SearchConnection +from ...types import SearchDetails, DataLayer, GeometryFormat +from ...results import SearchResults + + +class AbstractSearch(abc.ABC): + """ Encapuslation of a single lookup in the database. + """ + SEARCH_PRIO: int = 2 + + def __init__(self, penalty: float) -> None: + self.penalty = penalty + + @abc.abstractmethod + async def lookup(self, conn: SearchConnection, details: SearchDetails) -> SearchResults: + """ Find results for the search in the database. + """ + + +def select_placex(t: SaFromClause) -> SaSelect: + """ Return the basic select query for placex which returns all + fields necessary to fill a Nominatim result. 't' must either be + the placex table or a subquery returning appropriate fields from + a placex-related query. + """ + return sa.select(t.c.place_id, t.c.osm_type, t.c.osm_id, t.c.name, + t.c.class_, t.c.type, + t.c.address, t.c.extratags, + t.c.housenumber, t.c.postcode, t.c.country_code, + t.c.wikipedia, + t.c.parent_place_id, t.c.rank_address, t.c.rank_search, + t.c.linked_place_id, t.c.admin_level, + t.c.centroid, + t.c.geometry.ST_Expand(0).label('bbox')) + + +def exclude_places(t: SaFromClause) -> Callable[[], SaExpression]: + """ Return an expression to exclude place IDs from the list in the + SearchDetails. + + Requires the excluded IDs to be supplied as a bind parameter in SQL. + """ + return lambda: t.c.place_id.not_in(sa.bindparam('excluded')) + + +def filter_by_layer(table: SaFromClause, layers: DataLayer) -> SaColumn: + """ Return an expression that filters the given table by layers. + """ + orexpr: List[SaExpression] = [] + if layers & DataLayer.ADDRESS and layers & DataLayer.POI: + orexpr.append(no_index(table.c.rank_address).between(1, 30)) + elif layers & DataLayer.ADDRESS: + orexpr.append(no_index(table.c.rank_address).between(1, 29)) + orexpr.append(sa.func.IsAddressPoint(table)) + elif layers & DataLayer.POI: + orexpr.append(sa.and_(no_index(table.c.rank_address) == 30, + table.c.class_.not_in(('place', 'building')))) + + if layers & DataLayer.MANMADE: + exclude = [] + if not layers & DataLayer.RAILWAY: + exclude.append('railway') + if not layers & DataLayer.NATURAL: + exclude.extend(('natural', 'water', 'waterway')) + orexpr.append(sa.and_(table.c.class_.not_in(tuple(exclude)), + no_index(table.c.rank_address) == 0)) + else: + include = [] + if layers & DataLayer.RAILWAY: + include.append('railway') + if layers & DataLayer.NATURAL: + include.extend(('natural', 'water', 'waterway')) + orexpr.append(sa.and_(table.c.class_.in_(tuple(include)), + no_index(table.c.rank_address) == 0)) + + if len(orexpr) == 1: + return orexpr[0] + + return sa.or_(*orexpr) + + +def no_index(expr: SaColumn) -> SaColumn: + """ Wrap the given expression, so that the query planner will + refrain from using the expression for index lookup. + """ + return sa.func.coalesce(sa.null(), expr) + + +def filter_by_area(sql: SaSelect, t: SaFromClause, + details: SearchDetails, avoid_index: bool = False) -> SaSelect: + """ Apply SQL statements for filtering by viewbox and near point, + if applicable. + """ + if details.near is not None and details.near_radius is not None: + if details.near_radius < 0.1 and not avoid_index: + sql = sql.where( + t.c.geometry.within_distance(sa.bindparam('near', type_=Geometry), + sa.bindparam('near_radius'))) + else: + sql = sql.where( + t.c.geometry.ST_Distance( + sa.bindparam('near', type_=Geometry)) <= sa.bindparam('near_radius')) + if details.viewbox is not None and details.bounded_viewbox: + sql = sql.where(t.c.geometry.intersects(sa.bindparam('viewbox', type_=Geometry), + use_index=not avoid_index and + details.viewbox.area < 0.2)) + + return sql + + +def add_geometry_columns(sql: SaLambdaSelect, col: SaColumn, details: SearchDetails) -> SaSelect: + """ Add columns for requested geometry formats and return the new query. + """ + out = [] + + if details.geometry_simplification > 0.0: + col = sa.func.ST_SimplifyPreserveTopology(col, details.geometry_simplification) + + if details.geometry_output & GeometryFormat.GEOJSON: + out.append(sa.func.ST_AsGeoJSON(col, 7).label('geometry_geojson')) + if details.geometry_output & GeometryFormat.TEXT: + out.append(sa.func.ST_AsText(col).label('geometry_text')) + if details.geometry_output & GeometryFormat.KML: + out.append(sa.func.ST_AsKML(col, 7).label('geometry_kml')) + if details.geometry_output & GeometryFormat.SVG: + out.append(sa.func.ST_AsSVG(col, 0, 7).label('geometry_svg')) + + return sql.add_columns(*out) diff --git a/src/nominatim_api/search/db_searches/country_search.py b/src/nominatim_api/search/db_searches/country_search.py new file mode 100644 index 00000000..30d85f3b --- /dev/null +++ b/src/nominatim_api/search/db_searches/country_search.py @@ -0,0 +1,119 @@ +# SPDX-License-Identifier: GPL-3.0-or-later +# +# This file is part of Nominatim. (https://nominatim.org) +# +# Copyright (C) 2025 by the Nominatim developer community. +# For a full list of authors see the git log. +""" +Implementation of searches for a country. +""" + +import sqlalchemy as sa + +from . import base +from ..db_search_fields import SearchData +from ... import results as nres +from ...connection import SearchConnection +from ...types import SearchDetails, Bbox + + +class CountrySearch(base.AbstractSearch): + """ Search for a country name or country code. + """ + SEARCH_PRIO = 0 + + def __init__(self, sdata: SearchData) -> None: + super().__init__(sdata.penalty) + self.countries = sdata.countries + + async def lookup(self, conn: SearchConnection, + details: SearchDetails) -> nres.SearchResults: + """ Find results for the search in the database. + """ + t = conn.t.placex + + ccodes = self.countries.values + sql = base.select_placex(t)\ + .add_columns(t.c.importance)\ + .where(t.c.country_code.in_(ccodes))\ + .where(t.c.rank_address == 4) + + if details.geometry_output: + sql = base.add_geometry_columns(sql, t.c.geometry, details) + + if details.excluded: + sql = sql.where(base.exclude_places(t)) + + sql = base.filter_by_area(sql, t, details) + + bind_params = { + 'excluded': details.excluded, + 'viewbox': details.viewbox, + 'near': details.near, + 'near_radius': details.near_radius + } + + results = nres.SearchResults() + for row in await conn.execute(sql, bind_params): + result = nres.create_from_placex_row(row, nres.SearchResult) + assert result + result.accuracy = self.penalty + self.countries.get_penalty(row.country_code, 5.0) + result.bbox = Bbox.from_wkb(row.bbox) + results.append(result) + + if not results: + results = await self.lookup_in_country_table(conn, details) + + if results: + details.min_rank = min(5, details.max_rank) + details.max_rank = min(25, details.max_rank) + + return results + + async def lookup_in_country_table(self, conn: SearchConnection, + details: SearchDetails) -> nres.SearchResults: + """ Look up the country in the fallback country tables. + """ + # Avoid the fallback search when this is a more search. Country results + # usually are in the first batch of results and it is not possible + # to exclude these fallbacks. + if details.excluded: + return nres.SearchResults() + + t = conn.t.country_name + tgrid = conn.t.country_grid + + sql = sa.select(tgrid.c.country_code, + tgrid.c.geometry.ST_Centroid().ST_Collect().ST_Centroid() + .label('centroid'), + tgrid.c.geometry.ST_Collect().ST_Expand(0).label('bbox'))\ + .where(tgrid.c.country_code.in_(self.countries.values))\ + .group_by(tgrid.c.country_code) + + sql = base.filter_by_area(sql, tgrid, details, avoid_index=True) + + sub = sql.subquery('grid') + + sql = sa.select(t.c.country_code, + t.c.name.merge(t.c.derived_name).label('name'), + sub.c.centroid, sub.c.bbox)\ + .join(sub, t.c.country_code == sub.c.country_code) + + if details.geometry_output: + sql = base.add_geometry_columns(sql, sub.c.centroid, details) + + bind_params = { + 'viewbox': details.viewbox, + 'near': details.near, + 'near_radius': details.near_radius + } + + results = nres.SearchResults() + for row in await conn.execute(sql, bind_params): + result = nres.create_from_country_row(row, nres.SearchResult) + assert result + result.bbox = Bbox.from_wkb(row.bbox) + result.accuracy = self.penalty + self.countries.get_penalty(row.country_code, 5.0) + results.append(result) + + return results diff --git a/src/nominatim_api/search/db_searches/near_search.py b/src/nominatim_api/search/db_searches/near_search.py new file mode 100644 index 00000000..37f99a6a --- /dev/null +++ b/src/nominatim_api/search/db_searches/near_search.py @@ -0,0 +1,136 @@ +# SPDX-License-Identifier: GPL-3.0-or-later +# +# This file is part of Nominatim. (https://nominatim.org) +# +# Copyright (C) 2025 by the Nominatim developer community. +# For a full list of authors see the git log. +""" +Implementation of a category search around a place. +""" +from typing import List, Tuple + +import sqlalchemy as sa + +from . import base +from ...typing import SaBind +from ...types import SearchDetails, Bbox +from ...connection import SearchConnection +from ... import results as nres +from ..db_search_fields import WeightedCategories + + +LIMIT_PARAM: SaBind = sa.bindparam('limit') +MIN_RANK_PARAM: SaBind = sa.bindparam('min_rank') +MAX_RANK_PARAM: SaBind = sa.bindparam('max_rank') +COUNTRIES_PARAM: SaBind = sa.bindparam('countries') + + +class NearSearch(base.AbstractSearch): + """ Category search of a place type near the result of another search. + """ + def __init__(self, penalty: float, categories: WeightedCategories, + search: base.AbstractSearch) -> None: + super().__init__(penalty) + self.search = search + self.categories = categories + + async def lookup(self, conn: SearchConnection, + details: SearchDetails) -> nres.SearchResults: + """ Find results for the search in the database. + """ + results = nres.SearchResults() + base = await self.search.lookup(conn, details) + + if not base: + return results + + base.sort(key=lambda r: (r.accuracy, r.rank_search)) + max_accuracy = base[0].accuracy + 0.5 + if base[0].rank_address == 0: + min_rank = 0 + max_rank = 0 + elif base[0].rank_address < 26: + min_rank = 1 + max_rank = min(25, base[0].rank_address + 4) + else: + min_rank = 26 + max_rank = 30 + base = nres.SearchResults(r for r in base + if (r.source_table == nres.SourceTable.PLACEX + and r.accuracy <= max_accuracy + and r.bbox and r.bbox.area < 20 + and r.rank_address >= min_rank + and r.rank_address <= max_rank)) + + if base: + baseids = [b.place_id for b in base[:5] if b.place_id] + + for category, penalty in self.categories: + await self.lookup_category(results, conn, baseids, category, penalty, details) + if len(results) >= details.max_results: + break + + return results + + async def lookup_category(self, results: nres.SearchResults, + conn: SearchConnection, ids: List[int], + category: Tuple[str, str], penalty: float, + details: SearchDetails) -> None: + """ Find places of the given category near the list of + place ids and add the results to 'results'. + """ + table = await conn.get_class_table(*category) + + tgeom = conn.t.placex.alias('pgeom') + + if table is None: + # No classtype table available, do a simplified lookup in placex. + table = conn.t.placex + sql = sa.select(table.c.place_id, + sa.func.min(tgeom.c.centroid.ST_Distance(table.c.centroid)) + .label('dist'))\ + .join(tgeom, table.c.geometry.intersects(tgeom.c.centroid.ST_Expand(0.01)))\ + .where(table.c.class_ == category[0])\ + .where(table.c.type == category[1]) + else: + # Use classtype table. We can afford to use a larger + # radius for the lookup. + sql = sa.select(table.c.place_id, + sa.func.min(tgeom.c.centroid.ST_Distance(table.c.centroid)) + .label('dist'))\ + .join(tgeom, + table.c.centroid.ST_CoveredBy( + sa.case((sa.and_(tgeom.c.rank_address > 9, + tgeom.c.geometry.is_area()), + tgeom.c.geometry), + else_=tgeom.c.centroid.ST_Expand(0.05)))) + + inner = sql.where(tgeom.c.place_id.in_(ids))\ + .group_by(table.c.place_id).subquery() + + t = conn.t.placex + sql = base.select_placex(t).add_columns((-inner.c.dist).label('importance'))\ + .join(inner, inner.c.place_id == t.c.place_id)\ + .order_by(inner.c.dist) + + sql = sql.where(base.no_index(t.c.rank_address).between(MIN_RANK_PARAM, MAX_RANK_PARAM)) + if details.countries: + sql = sql.where(t.c.country_code.in_(COUNTRIES_PARAM)) + if details.excluded: + sql = sql.where(base.exclude_places(t)) + if details.layers is not None: + sql = sql.where(base.filter_by_layer(t, details.layers)) + + sql = sql.limit(LIMIT_PARAM) + + bind_params = {'limit': details.max_results, + 'min_rank': details.min_rank, + 'max_rank': details.max_rank, + 'excluded': details.excluded, + 'countries': details.countries} + for row in await conn.execute(sql, bind_params): + result = nres.create_from_placex_row(row, nres.SearchResult) + assert result + result.accuracy = self.penalty + penalty + result.bbox = Bbox.from_wkb(row.bbox) + results.append(result) diff --git a/src/nominatim_api/search/db_searches/place_search.py b/src/nominatim_api/search/db_searches/place_search.py new file mode 100644 index 00000000..8e9e6335 --- /dev/null +++ b/src/nominatim_api/search/db_searches/place_search.py @@ -0,0 +1,385 @@ +# SPDX-License-Identifier: GPL-3.0-or-later +# +# This file is part of Nominatim. (https://nominatim.org) +# +# Copyright (C) 2025 by the Nominatim developer community. +# For a full list of authors see the git log. +""" +Implementation of search for a named place. +""" +from typing import cast, List, AsyncIterator + +import sqlalchemy as sa + +from . import base +from ...typing import SaBind, SaExpression, SaColumn, SaFromClause, SaScalarSelect +from ...types import SearchDetails, Bbox +from ...sql.sqlalchemy_types import Geometry +from ...connection import SearchConnection +from ... import results as nres +from ..db_search_fields import SearchData + + +LIMIT_PARAM: SaBind = sa.bindparam('limit') +MIN_RANK_PARAM: SaBind = sa.bindparam('min_rank') +MAX_RANK_PARAM: SaBind = sa.bindparam('max_rank') +VIEWBOX_PARAM: SaBind = sa.bindparam('viewbox', type_=Geometry) +VIEWBOX2_PARAM: SaBind = sa.bindparam('viewbox2', type_=Geometry) +NEAR_PARAM: SaBind = sa.bindparam('near', type_=Geometry) +NEAR_RADIUS_PARAM: SaBind = sa.bindparam('near_radius') +COUNTRIES_PARAM: SaBind = sa.bindparam('countries') + + +def _int_list_to_subquery(inp: List[int]) -> 'sa.Subquery': + """ Create a subselect that returns the given list of integers + as rows in the column 'nr'. + """ + vtab = sa.func.JsonArrayEach(sa.type_coerce(inp, sa.JSON))\ + .table_valued(sa.column('value', type_=sa.JSON)) + return sa.select(sa.cast(sa.cast(vtab.c.value, sa.Text), sa.Integer).label('nr')).subquery() + + +def _interpolated_position(table: SaFromClause, nr: SaColumn) -> SaColumn: + pos = sa.cast(nr - table.c.startnumber, sa.Float) / (table.c.endnumber - table.c.startnumber) + return sa.case( + (table.c.endnumber == table.c.startnumber, table.c.linegeo.ST_Centroid()), + else_=table.c.linegeo.ST_LineInterpolatePoint(pos)).label('centroid') + + +def _make_interpolation_subquery(table: SaFromClause, inner: SaFromClause, + numerals: List[int], details: SearchDetails) -> SaScalarSelect: + all_ids = sa.func.ArrayAgg(table.c.place_id) + sql = sa.select(all_ids).where(table.c.parent_place_id == inner.c.place_id) + + if len(numerals) == 1: + sql = sql.where(sa.between(numerals[0], table.c.startnumber, table.c.endnumber))\ + .where((numerals[0] - table.c.startnumber) % table.c.step == 0) + else: + sql = sql.where(sa.or_( + *(sa.and_(sa.between(n, table.c.startnumber, table.c.endnumber), + (n - table.c.startnumber) % table.c.step == 0) + for n in numerals))) + + if details.excluded: + sql = sql.where(base.exclude_places(table)) + + return sql.scalar_subquery() + + +async def _get_placex_housenumbers(conn: SearchConnection, + place_ids: List[int], + details: SearchDetails) -> AsyncIterator[nres.SearchResult]: + t = conn.t.placex + sql = base.select_placex(t).add_columns(t.c.importance)\ + .where(t.c.place_id.in_(place_ids)) + + if details.geometry_output: + sql = base.add_geometry_columns(sql, t.c.geometry, details) + + for row in await conn.execute(sql): + result = nres.create_from_placex_row(row, nres.SearchResult) + assert result + result.bbox = Bbox.from_wkb(row.bbox) + yield result + + +async def _get_osmline(conn: SearchConnection, place_ids: List[int], + numerals: List[int], + details: SearchDetails) -> AsyncIterator[nres.SearchResult]: + t = conn.t.osmline + + values = _int_list_to_subquery(numerals) + sql = sa.select(t.c.place_id, t.c.osm_id, + t.c.parent_place_id, t.c.address, + values.c.nr.label('housenumber'), + _interpolated_position(t, values.c.nr), + t.c.postcode, t.c.country_code)\ + .where(t.c.place_id.in_(place_ids))\ + .join(values, values.c.nr.between(t.c.startnumber, t.c.endnumber)) + + if details.geometry_output: + sub = sql.subquery() + sql = base.add_geometry_columns(sa.select(sub), sub.c.centroid, details) + + for row in await conn.execute(sql): + result = nres.create_from_osmline_row(row, nres.SearchResult) + assert result + yield result + + +async def _get_tiger(conn: SearchConnection, place_ids: List[int], + numerals: List[int], osm_id: int, + details: SearchDetails) -> AsyncIterator[nres.SearchResult]: + t = conn.t.tiger + values = _int_list_to_subquery(numerals) + sql = sa.select(t.c.place_id, t.c.parent_place_id, + sa.literal('W').label('osm_type'), + sa.literal(osm_id).label('osm_id'), + values.c.nr.label('housenumber'), + _interpolated_position(t, values.c.nr), + t.c.postcode)\ + .where(t.c.place_id.in_(place_ids))\ + .join(values, values.c.nr.between(t.c.startnumber, t.c.endnumber)) + + if details.geometry_output: + sub = sql.subquery() + sql = base.add_geometry_columns(sa.select(sub), sub.c.centroid, details) + + for row in await conn.execute(sql): + result = nres.create_from_tiger_row(row, nres.SearchResult) + assert result + yield result + + +class PlaceSearch(base.AbstractSearch): + """ Generic search for an address or named place. + """ + SEARCH_PRIO = 1 + + def __init__(self, extra_penalty: float, sdata: SearchData, expected_count: int) -> None: + super().__init__(sdata.penalty + extra_penalty) + self.countries = sdata.countries + self.postcodes = sdata.postcodes + self.housenumbers = sdata.housenumbers + self.qualifiers = sdata.qualifiers + self.lookups = sdata.lookups + self.rankings = sdata.rankings + self.expected_count = expected_count + + def _inner_search_name_cte(self, conn: SearchConnection, + details: SearchDetails) -> 'sa.CTE': + """ Create a subquery that preselects the rows in the search_name + table. + """ + t = conn.t.search_name + + penalty: SaExpression = sa.literal(self.penalty) + for ranking in self.rankings: + penalty += ranking.sql_penalty(t) + + sql = sa.select(t.c.place_id, t.c.search_rank, t.c.address_rank, + t.c.country_code, t.c.centroid, + t.c.name_vector, t.c.nameaddress_vector, + sa.case((t.c.importance > 0, t.c.importance), + else_=0.40001-(sa.cast(t.c.search_rank, sa.Float())/75)) + .label('importance'), + penalty.label('penalty')) + + for lookup in self.lookups: + sql = sql.where(lookup.sql_condition(t)) + + if self.countries: + sql = sql.where(t.c.country_code.in_(self.countries.values)) + + if self.postcodes: + # if a postcode is given, don't search for state or country level objects + sql = sql.where(t.c.address_rank > 9) + if self.expected_count > 10000: + # Many results expected. Restrict by postcode. + tpc = conn.t.postcode + sql = sql.where(sa.select(tpc.c.postcode) + .where(tpc.c.postcode.in_(self.postcodes.values)) + .where(t.c.centroid.within_distance(tpc.c.geometry, 0.4)) + .exists()) + + if details.viewbox is not None: + if details.bounded_viewbox: + sql = sql.where(t.c.centroid + .intersects(VIEWBOX_PARAM, + use_index=details.viewbox.area < 0.2)) + elif not self.postcodes and not self.housenumbers and self.expected_count >= 10000: + sql = sql.where(t.c.centroid + .intersects(VIEWBOX2_PARAM, + use_index=details.viewbox.area < 0.5)) + + if details.near is not None and details.near_radius is not None: + if details.near_radius < 0.1: + sql = sql.where(t.c.centroid.within_distance(NEAR_PARAM, + NEAR_RADIUS_PARAM)) + else: + sql = sql.where(t.c.centroid + .ST_Distance(NEAR_PARAM) < NEAR_RADIUS_PARAM) + + if self.housenumbers: + sql = sql.where(t.c.address_rank.between(16, 30)) + else: + if details.excluded: + sql = sql.where(base.exclude_places(t)) + if details.min_rank > 0: + sql = sql.where(sa.or_(t.c.address_rank >= MIN_RANK_PARAM, + t.c.search_rank >= MIN_RANK_PARAM)) + if details.max_rank < 30: + sql = sql.where(sa.or_(t.c.address_rank <= MAX_RANK_PARAM, + t.c.search_rank <= MAX_RANK_PARAM)) + + inner = sql.limit(10000).order_by(sa.desc(sa.text('importance'))).subquery() + + sql = sa.select(inner.c.place_id, inner.c.search_rank, inner.c.address_rank, + inner.c.country_code, inner.c.centroid, inner.c.importance, + inner.c.penalty) + + # If the query is not an address search or has a geographic preference, + # preselect most important items to restrict the number of places + # that need to be looked up in placex. + if not self.housenumbers\ + and (details.viewbox is None or details.bounded_viewbox)\ + and (details.near is None or details.near_radius is not None)\ + and not self.qualifiers: + sql = sql.add_columns(sa.func.first_value(inner.c.penalty - inner.c.importance) + .over(order_by=inner.c.penalty - inner.c.importance) + .label('min_penalty')) + + inner = sql.subquery() + + sql = sa.select(inner.c.place_id, inner.c.search_rank, inner.c.address_rank, + inner.c.country_code, inner.c.centroid, inner.c.importance, + inner.c.penalty)\ + .where(inner.c.penalty - inner.c.importance < inner.c.min_penalty + 0.5) + + return sql.cte('searches') + + async def lookup(self, conn: SearchConnection, + details: SearchDetails) -> nres.SearchResults: + """ Find results for the search in the database. + """ + t = conn.t.placex + tsearch = self._inner_search_name_cte(conn, details) + + sql = base.select_placex(t).join(tsearch, t.c.place_id == tsearch.c.place_id) + + if details.geometry_output: + sql = base.add_geometry_columns(sql, t.c.geometry, details) + + penalty: SaExpression = tsearch.c.penalty + + if self.postcodes: + tpc = conn.t.postcode + pcs = self.postcodes.values + + pc_near = sa.select(sa.func.min(tpc.c.geometry.ST_Distance(t.c.centroid)))\ + .where(tpc.c.postcode.in_(pcs))\ + .scalar_subquery() + penalty += sa.case((t.c.postcode.in_(pcs), 0.0), + else_=sa.func.coalesce(pc_near, cast(SaColumn, 2.0))) + + if details.viewbox is not None and not details.bounded_viewbox: + penalty += sa.case((t.c.geometry.intersects(VIEWBOX_PARAM, use_index=False), 0.0), + (t.c.geometry.intersects(VIEWBOX2_PARAM, use_index=False), 0.5), + else_=1.0) + + if details.near is not None: + sql = sql.add_columns((-tsearch.c.centroid.ST_Distance(NEAR_PARAM)) + .label('importance')) + sql = sql.order_by(sa.desc(sa.text('importance'))) + else: + sql = sql.order_by(penalty - tsearch.c.importance) + sql = sql.add_columns(tsearch.c.importance) + + sql = sql.add_columns(penalty.label('accuracy'))\ + .order_by(sa.text('accuracy')) + + if self.housenumbers: + hnr_list = '|'.join(self.housenumbers.values) + inner = sql.where(sa.or_(tsearch.c.address_rank < 30, + sa.func.RegexpWord(hnr_list, t.c.housenumber)))\ + .subquery() + + # Housenumbers from placex + thnr = conn.t.placex.alias('hnr') + pid_list = sa.func.ArrayAgg(thnr.c.place_id) + place_sql = sa.select(pid_list)\ + .where(thnr.c.parent_place_id == inner.c.place_id)\ + .where(sa.func.RegexpWord(hnr_list, thnr.c.housenumber))\ + .where(thnr.c.linked_place_id == None)\ + .where(thnr.c.indexed_status == 0) + + if details.excluded: + place_sql = place_sql.where(thnr.c.place_id.not_in(sa.bindparam('excluded'))) + if self.qualifiers: + place_sql = place_sql.where(self.qualifiers.sql_restrict(thnr)) + + numerals = [int(n) for n in self.housenumbers.values + if n.isdigit() and len(n) < 8] + interpol_sql: SaColumn + tiger_sql: SaColumn + if numerals and \ + (not self.qualifiers or ('place', 'house') in self.qualifiers.values): + # Housenumbers from interpolations + interpol_sql = _make_interpolation_subquery(conn.t.osmline, inner, + numerals, details) + # Housenumbers from Tiger + tiger_sql = sa.case((inner.c.country_code == 'us', + _make_interpolation_subquery(conn.t.tiger, inner, + numerals, details) + ), else_=None) + else: + interpol_sql = sa.null() + tiger_sql = sa.null() + + unsort = sa.select(inner, place_sql.scalar_subquery().label('placex_hnr'), + interpol_sql.label('interpol_hnr'), + tiger_sql.label('tiger_hnr')).subquery('unsort') + sql = sa.select(unsort)\ + .order_by(sa.case((unsort.c.placex_hnr != None, 1), + (unsort.c.interpol_hnr != None, 2), + (unsort.c.tiger_hnr != None, 3), + else_=4), + unsort.c.accuracy) + else: + sql = sql.where(t.c.linked_place_id == None)\ + .where(t.c.indexed_status == 0) + if self.qualifiers: + sql = sql.where(self.qualifiers.sql_restrict(t)) + if details.layers is not None: + sql = sql.where(base.filter_by_layer(t, details.layers)) + + sql = sql.limit(LIMIT_PARAM) + + bind_params = { + 'limit': details.max_results, + 'min_rank': details.min_rank, + 'max_rank': details.max_rank, + 'viewbox': details.viewbox, + 'viewbox2': details.viewbox_x2, + 'near': details.near, + 'near_radius': details.near_radius, + 'excluded': details.excluded, + 'countries': details.countries + } + + results = nres.SearchResults() + for row in await conn.execute(sql, bind_params): + result = nres.create_from_placex_row(row, nres.SearchResult) + assert result + result.bbox = Bbox.from_wkb(row.bbox) + result.accuracy = row.accuracy + if self.housenumbers and row.rank_address < 30: + if row.placex_hnr: + subs = _get_placex_housenumbers(conn, row.placex_hnr, details) + elif row.interpol_hnr: + subs = _get_osmline(conn, row.interpol_hnr, numerals, details) + elif row.tiger_hnr: + subs = _get_tiger(conn, row.tiger_hnr, numerals, row.osm_id, details) + else: + subs = None + + if subs is not None: + async for sub in subs: + assert sub.housenumber + sub.accuracy = result.accuracy + if not any(nr in self.housenumbers.values + for nr in sub.housenumber.split(';')): + sub.accuracy += 0.6 + results.append(sub) + + # Only add the street as a result, if it meets all other + # filter conditions. + if (not details.excluded or result.place_id not in details.excluded)\ + and (not self.qualifiers or result.category in self.qualifiers.values)\ + and result.rank_address >= details.min_rank: + result.accuracy += 1.0 # penalty for missing housenumber + results.append(result) + else: + results.append(result) + + return results diff --git a/src/nominatim_api/search/db_searches/poi_search.py b/src/nominatim_api/search/db_searches/poi_search.py new file mode 100644 index 00000000..fd7cdce8 --- /dev/null +++ b/src/nominatim_api/search/db_searches/poi_search.py @@ -0,0 +1,114 @@ +# SPDX-License-Identifier: GPL-3.0-or-later +# +# This file is part of Nominatim. (https://nominatim.org) +# +# Copyright (C) 2025 by the Nominatim developer community. +# For a full list of authors see the git log. +""" +Implementation of category search. +""" +from typing import List + +import sqlalchemy as sa + +from . import base +from ..db_search_fields import SearchData +from ... import results as nres +from ...typing import SaBind, SaRow, SaSelect, SaLambdaSelect +from ...sql.sqlalchemy_types import Geometry +from ...connection import SearchConnection +from ...types import SearchDetails, Bbox + + +LIMIT_PARAM: SaBind = sa.bindparam('limit') +VIEWBOX_PARAM: SaBind = sa.bindparam('viewbox', type_=Geometry) +NEAR_PARAM: SaBind = sa.bindparam('near', type_=Geometry) +NEAR_RADIUS_PARAM: SaBind = sa.bindparam('near_radius') + + +class PoiSearch(base.AbstractSearch): + """ Category search in a geographic area. + """ + def __init__(self, sdata: SearchData) -> None: + super().__init__(sdata.penalty) + self.qualifiers = sdata.qualifiers + self.countries = sdata.countries + + async def lookup(self, conn: SearchConnection, + details: SearchDetails) -> nres.SearchResults: + """ Find results for the search in the database. + """ + bind_params = { + 'limit': details.max_results, + 'viewbox': details.viewbox, + 'near': details.near, + 'near_radius': details.near_radius, + 'excluded': details.excluded + } + + t = conn.t.placex + + rows: List[SaRow] = [] + + if details.near and details.near_radius is not None and details.near_radius < 0.2: + # simply search in placex table + def _base_query() -> SaSelect: + return base.select_placex(t) \ + .add_columns((-t.c.centroid.ST_Distance(NEAR_PARAM)) + .label('importance'))\ + .where(t.c.linked_place_id == None) \ + .where(t.c.geometry.within_distance(NEAR_PARAM, NEAR_RADIUS_PARAM)) \ + .order_by(t.c.centroid.ST_Distance(NEAR_PARAM)) \ + .limit(LIMIT_PARAM) + + classtype = self.qualifiers.values + if len(classtype) == 1: + cclass, ctype = classtype[0] + sql: SaLambdaSelect = sa.lambda_stmt( + lambda: _base_query().where(t.c.class_ == cclass) + .where(t.c.type == ctype)) + else: + sql = _base_query().where(sa.or_(*(sa.and_(t.c.class_ == cls, t.c.type == typ) + for cls, typ in classtype))) + + if self.countries: + sql = sql.where(t.c.country_code.in_(self.countries.values)) + + if details.viewbox is not None and details.bounded_viewbox: + sql = sql.where(t.c.geometry.intersects(VIEWBOX_PARAM)) + + rows.extend(await conn.execute(sql, bind_params)) + else: + # use the class type tables + for category in self.qualifiers.values: + table = await conn.get_class_table(*category) + if table is not None: + sql = base.select_placex(t)\ + .add_columns(t.c.importance)\ + .join(table, t.c.place_id == table.c.place_id)\ + .where(t.c.class_ == category[0])\ + .where(t.c.type == category[1]) + + if details.viewbox is not None and details.bounded_viewbox: + sql = sql.where(table.c.centroid.intersects(VIEWBOX_PARAM)) + + if details.near and details.near_radius is not None: + sql = sql.order_by(table.c.centroid.ST_Distance(NEAR_PARAM))\ + .where(table.c.centroid.within_distance(NEAR_PARAM, + NEAR_RADIUS_PARAM)) + + if self.countries: + sql = sql.where(t.c.country_code.in_(self.countries.values)) + + sql = sql.limit(LIMIT_PARAM) + rows.extend(await conn.execute(sql, bind_params)) + + results = nres.SearchResults() + for row in rows: + result = nres.create_from_placex_row(row, nres.SearchResult) + assert result + result.accuracy = self.penalty + self.qualifiers.get_penalty((row.class_, row.type)) + result.bbox = Bbox.from_wkb(row.bbox) + results.append(result) + + return results diff --git a/src/nominatim_api/search/db_searches/postcode_search.py b/src/nominatim_api/search/db_searches/postcode_search.py new file mode 100644 index 00000000..c88147d4 --- /dev/null +++ b/src/nominatim_api/search/db_searches/postcode_search.py @@ -0,0 +1,129 @@ +# SPDX-License-Identifier: GPL-3.0-or-later +# +# This file is part of Nominatim. (https://nominatim.org) +# +# Copyright (C) 2025 by the Nominatim developer community. +# For a full list of authors see the git log. +""" +Implementation of search for a postcode. +""" + +import sqlalchemy as sa + +from . import base +from ...typing import SaBind, SaExpression +from ...sql.sqlalchemy_types import Geometry, IntArray +from ...connection import SearchConnection +from ...types import SearchDetails, Bbox +from ... import results as nres +from ..db_search_fields import SearchData + + +LIMIT_PARAM: SaBind = sa.bindparam('limit') +VIEWBOX_PARAM: SaBind = sa.bindparam('viewbox', type_=Geometry) +VIEWBOX2_PARAM: SaBind = sa.bindparam('viewbox2', type_=Geometry) +NEAR_PARAM: SaBind = sa.bindparam('near', type_=Geometry) + + +class PostcodeSearch(base.AbstractSearch): + """ Search for a postcode. + """ + def __init__(self, extra_penalty: float, sdata: SearchData) -> None: + super().__init__(sdata.penalty + extra_penalty) + self.countries = sdata.countries + self.postcodes = sdata.postcodes + self.lookups = sdata.lookups + self.rankings = sdata.rankings + + async def lookup(self, conn: SearchConnection, + details: SearchDetails) -> nres.SearchResults: + """ Find results for the search in the database. + """ + t = conn.t.postcode + pcs = self.postcodes.values + + sql = sa.select(t.c.place_id, t.c.parent_place_id, + t.c.rank_search, t.c.rank_address, + t.c.postcode, t.c.country_code, + t.c.geometry.label('centroid'))\ + .where(t.c.postcode.in_(pcs)) + + if details.geometry_output: + sql = base.add_geometry_columns(sql, t.c.geometry, details) + + penalty: SaExpression = sa.literal(self.penalty) + + if details.viewbox is not None and not details.bounded_viewbox: + penalty += sa.case((t.c.geometry.intersects(VIEWBOX_PARAM), 0.0), + (t.c.geometry.intersects(VIEWBOX2_PARAM), 0.5), + else_=1.0) + + if details.near is not None: + sql = sql.order_by(t.c.geometry.ST_Distance(NEAR_PARAM)) + + sql = base.filter_by_area(sql, t, details) + + if self.countries: + sql = sql.where(t.c.country_code.in_(self.countries.values)) + + if details.excluded: + sql = sql.where(base.exclude_places(t)) + + if self.lookups: + assert len(self.lookups) == 1 + tsearch = conn.t.search_name + sql = sql.where(tsearch.c.place_id == t.c.parent_place_id)\ + .where((tsearch.c.name_vector + tsearch.c.nameaddress_vector) + .contains(sa.type_coerce(self.lookups[0].tokens, + IntArray))) + # Do NOT add rerank penalties based on the address terms. + # The standard rerank penalty only checks the address vector + # while terms may appear in name and address vector. This would + # lead to overly high penalties. + # We assume that a postcode is precise enough to not require + # additional full name matches. + + penalty += sa.case(*((t.c.postcode == v, p) for v, p in self.postcodes), + else_=1.0) + + sql = sql.add_columns(penalty.label('accuracy')) + sql = sql.order_by('accuracy').limit(LIMIT_PARAM) + + bind_params = { + 'limit': details.max_results, + 'viewbox': details.viewbox, + 'viewbox2': details.viewbox_x2, + 'near': details.near, + 'near_radius': details.near_radius, + 'excluded': details.excluded + } + + results = nres.SearchResults() + for row in await conn.execute(sql, bind_params): + p = conn.t.placex + placex_sql = base.select_placex(p)\ + .add_columns(p.c.importance)\ + .where(sa.text("""class = 'boundary' + AND type = 'postal_code' + AND osm_type = 'R'"""))\ + .where(p.c.country_code == row.country_code)\ + .where(p.c.postcode == row.postcode)\ + .limit(1) + + if details.geometry_output: + placex_sql = base.add_geometry_columns(placex_sql, p.c.geometry, details) + + for prow in await conn.execute(placex_sql, bind_params): + result = nres.create_from_placex_row(prow, nres.SearchResult) + if result is not None: + result.bbox = Bbox.from_wkb(prow.bbox) + break + else: + result = nres.create_from_postcode_row(row, nres.SearchResult) + + assert result + if result.place_id not in details.excluded: + result.accuracy = row.accuracy + results.append(result) + + return results -- 2.39.5