]> git.openstreetmap.org Git - nominatim.git/blob - nominatim/api/search/db_searches.py
Merge branch 'osm-search:master' into check-database-on-frozen-database
[nominatim.git] / nominatim / api / search / db_searches.py
1 # SPDX-License-Identifier: GPL-3.0-or-later
2 #
3 # This file is part of Nominatim. (https://nominatim.org)
4 #
5 # Copyright (C) 2023 by the Nominatim developer community.
6 # For a full list of authors see the git log.
7 """
8 Implementation of the acutal database accesses for forward search.
9 """
10 from typing import List, Tuple, AsyncIterator
11 import abc
12
13 import sqlalchemy as sa
14 from sqlalchemy.dialects.postgresql import ARRAY, array_agg
15
16 from nominatim.typing import SaFromClause, SaScalarSelect, SaColumn, \
17                              SaExpression, SaSelect, SaRow
18 from nominatim.api.connection import SearchConnection
19 from nominatim.api.types import SearchDetails, DataLayer, GeometryFormat, Bbox
20 import nominatim.api.results as nres
21 from nominatim.api.search.db_search_fields import SearchData, WeightedCategories
22
23 #pylint: disable=singleton-comparison,not-callable
24 #pylint: disable=too-many-branches,too-many-arguments,too-many-locals,too-many-statements
25
26 def _select_placex(t: SaFromClause) -> SaSelect:
27     return sa.select(t.c.place_id, t.c.osm_type, t.c.osm_id, t.c.name,
28                      t.c.class_, t.c.type,
29                      t.c.address, t.c.extratags,
30                      t.c.housenumber, t.c.postcode, t.c.country_code,
31                      t.c.importance, t.c.wikipedia,
32                      t.c.parent_place_id, t.c.rank_address, t.c.rank_search,
33                      t.c.centroid,
34                      t.c.geometry.ST_Expand(0).label('bbox'))
35
36
37 def _add_geometry_columns(sql: SaSelect, col: SaColumn, details: SearchDetails) -> SaSelect:
38     if not details.geometry_output:
39         return sql
40
41     out = []
42
43     if details.geometry_simplification > 0.0:
44         col = col.ST_SimplifyPreserveTopology(details.geometry_simplification)
45
46     if details.geometry_output & GeometryFormat.GEOJSON:
47         out.append(col.ST_AsGeoJSON().label('geometry_geojson'))
48     if details.geometry_output & GeometryFormat.TEXT:
49         out.append(col.ST_AsText().label('geometry_text'))
50     if details.geometry_output & GeometryFormat.KML:
51         out.append(col.ST_AsKML().label('geometry_kml'))
52     if details.geometry_output & GeometryFormat.SVG:
53         out.append(col.ST_AsSVG().label('geometry_svg'))
54
55     return sql.add_columns(*out)
56
57
58 def _make_interpolation_subquery(table: SaFromClause, inner: SaFromClause,
59                                  numerals: List[int], details: SearchDetails) -> SaScalarSelect:
60     all_ids = array_agg(table.c.place_id) # type: ignore[no-untyped-call]
61     sql = sa.select(all_ids).where(table.c.parent_place_id == inner.c.place_id)
62
63     if len(numerals) == 1:
64         sql = sql.where(sa.between(numerals[0], table.c.startnumber, table.c.endnumber))\
65                  .where((numerals[0] - table.c.startnumber) % table.c.step == 0)
66     else:
67         sql = sql.where(sa.or_(
68                 *(sa.and_(sa.between(n, table.c.startnumber, table.c.endnumber),
69                           (n - table.c.startnumber) % table.c.step == 0)
70                   for n in numerals)))
71
72     if details.excluded:
73         sql = sql.where(table.c.place_id.not_in(details.excluded))
74
75     return sql.scalar_subquery()
76
77
78 def _filter_by_layer(table: SaFromClause, layers: DataLayer) -> SaColumn:
79     orexpr: List[SaExpression] = []
80     if layers & DataLayer.ADDRESS and layers & DataLayer.POI:
81         orexpr.append(table.c.rank_address.between(1, 30))
82     elif layers & DataLayer.ADDRESS:
83         orexpr.append(table.c.rank_address.between(1, 29))
84         orexpr.append(sa.and_(table.c.rank_address == 30,
85                               sa.or_(table.c.housenumber != None,
86                                      table.c.address.has_key('housename'))))
87     elif layers & DataLayer.POI:
88         orexpr.append(sa.and_(table.c.rank_address == 30,
89                               table.c.class_.not_in(('place', 'building'))))
90
91     if layers & DataLayer.MANMADE:
92         exclude = []
93         if not layers & DataLayer.RAILWAY:
94             exclude.append('railway')
95         if not layers & DataLayer.NATURAL:
96             exclude.extend(('natural', 'water', 'waterway'))
97         orexpr.append(sa.and_(table.c.class_.not_in(tuple(exclude)),
98                               table.c.rank_address == 0))
99     else:
100         include = []
101         if layers & DataLayer.RAILWAY:
102             include.append('railway')
103         if layers & DataLayer.NATURAL:
104             include.extend(('natural', 'water', 'waterway'))
105         orexpr.append(sa.and_(table.c.class_.in_(tuple(include)),
106                               table.c.rank_address == 0))
107
108     if len(orexpr) == 1:
109         return orexpr[0]
110
111     return sa.or_(*orexpr)
112
113
114 def _interpolated_position(table: SaFromClause, nr: SaColumn) -> SaColumn:
115     pos = sa.cast(nr - table.c.startnumber, sa.Float) / (table.c.endnumber - table.c.startnumber)
116     return sa.case(
117             (table.c.endnumber == table.c.startnumber, table.c.linegeo.ST_Centroid()),
118             else_=table.c.linegeo.ST_LineInterpolatePoint(pos)).label('centroid')
119
120
121 async def _get_placex_housenumbers(conn: SearchConnection,
122                                    place_ids: List[int],
123                                    details: SearchDetails) -> AsyncIterator[nres.SearchResult]:
124     t = conn.t.placex
125     sql = _select_placex(t).where(t.c.place_id.in_(place_ids))
126
127     sql = _add_geometry_columns(sql, t.c.geometry, details)
128
129     for row in await conn.execute(sql):
130         result = nres.create_from_placex_row(row, nres.SearchResult)
131         assert result
132         result.bbox = Bbox.from_wkb(row.bbox.data)
133         yield result
134
135
136 async def _get_osmline(conn: SearchConnection, place_ids: List[int],
137                        numerals: List[int],
138                        details: SearchDetails) -> AsyncIterator[nres.SearchResult]:
139     t = conn.t.osmline
140     values = sa.values(sa.Column('nr', sa.Integer()), name='housenumber')\
141                .data([(n,) for n in numerals])
142     sql = sa.select(t.c.place_id, t.c.osm_id,
143                     t.c.parent_place_id, t.c.address,
144                     values.c.nr.label('housenumber'),
145                     _interpolated_position(t, values.c.nr),
146                     t.c.postcode, t.c.country_code)\
147             .where(t.c.place_id.in_(place_ids))\
148             .join(values, values.c.nr.between(t.c.startnumber, t.c.endnumber))
149
150     if details.geometry_output:
151         sub = sql.subquery()
152         sql = _add_geometry_columns(sa.select(sub), sub.c.centroid, details)
153
154     for row in await conn.execute(sql):
155         result = nres.create_from_osmline_row(row, nres.SearchResult)
156         assert result
157         yield result
158
159
160 async def _get_tiger(conn: SearchConnection, place_ids: List[int],
161                      numerals: List[int], osm_id: int,
162                      details: SearchDetails) -> AsyncIterator[nres.SearchResult]:
163     t = conn.t.tiger
164     values = sa.values(sa.Column('nr', sa.Integer()), name='housenumber')\
165                .data([(n,) for n in numerals])
166     sql = sa.select(t.c.place_id, t.c.parent_place_id,
167                     sa.literal('W').label('osm_type'),
168                     sa.literal(osm_id).label('osm_id'),
169                     values.c.nr.label('housenumber'),
170                     _interpolated_position(t, values.c.nr),
171                     t.c.postcode)\
172             .where(t.c.place_id.in_(place_ids))\
173             .join(values, values.c.nr.between(t.c.startnumber, t.c.endnumber))
174
175     if details.geometry_output:
176         sub = sql.subquery()
177         sql = _add_geometry_columns(sa.select(sub), sub.c.centroid, details)
178
179     for row in await conn.execute(sql):
180         result = nres.create_from_tiger_row(row, nres.SearchResult)
181         assert result
182         yield result
183
184
185 class AbstractSearch(abc.ABC):
186     """ Encapuslation of a single lookup in the database.
187     """
188
189     def __init__(self, penalty: float) -> None:
190         self.penalty = penalty
191
192     @abc.abstractmethod
193     async def lookup(self, conn: SearchConnection,
194                      details: SearchDetails) -> nres.SearchResults:
195         """ Find results for the search in the database.
196         """
197
198
199 class NearSearch(AbstractSearch):
200     """ Category search of a place type near the result of another search.
201     """
202     def __init__(self, penalty: float, categories: WeightedCategories,
203                  search: AbstractSearch) -> None:
204         super().__init__(penalty)
205         self.search = search
206         self.categories = categories
207
208
209     async def lookup(self, conn: SearchConnection,
210                      details: SearchDetails) -> nres.SearchResults:
211         """ Find results for the search in the database.
212         """
213         results = nres.SearchResults()
214         base = await self.search.lookup(conn, details)
215
216         if not base:
217             return results
218
219         base.sort(key=lambda r: (r.accuracy, r.rank_search))
220         max_accuracy = base[0].accuracy + 0.5
221         base = nres.SearchResults(r for r in base if r.source_table == nres.SourceTable.PLACEX
222                                                      and r.accuracy <= max_accuracy
223                                                      and r.bbox and r.bbox.area < 20)
224
225         if base:
226             baseids = [b.place_id for b in base[:5] if b.place_id]
227
228             for category, penalty in self.categories:
229                 await self.lookup_category(results, conn, baseids, category, penalty, details)
230                 if len(results) >= details.max_results:
231                     break
232
233         return results
234
235
236     async def lookup_category(self, results: nres.SearchResults,
237                               conn: SearchConnection, ids: List[int],
238                               category: Tuple[str, str], penalty: float,
239                               details: SearchDetails) -> None:
240         """ Find places of the given category near the list of
241             place ids and add the results to 'results'.
242         """
243         table = await conn.get_class_table(*category)
244
245         t = conn.t.placex.alias('p')
246         tgeom = conn.t.placex.alias('pgeom')
247
248         sql = _select_placex(t).where(tgeom.c.place_id.in_(ids))\
249                                .where(t.c.class_ == category[0])\
250                                .where(t.c.type == category[1])
251
252         if table is None:
253             # No classtype table available, do a simplified lookup in placex.
254             sql = sql.join(tgeom, t.c.geometry.ST_DWithin(tgeom.c.centroid, 0.01))\
255                      .order_by(tgeom.c.centroid.ST_Distance(t.c.centroid))
256         else:
257             # Use classtype table. We can afford to use a larger
258             # radius for the lookup.
259             sql = sql.join(table, t.c.place_id == table.c.place_id)\
260                      .join(tgeom,
261                            sa.case((sa.and_(tgeom.c.rank_address < 9,
262                                             tgeom.c.geometry.ST_GeometryType().in_(
263                                                 ('ST_Polygon', 'ST_MultiPolygon'))),
264                                     tgeom.c.geometry.ST_Contains(table.c.centroid)),
265                                    else_ = tgeom.c.centroid.ST_DWithin(table.c.centroid, 0.05)))\
266                      .order_by(tgeom.c.centroid.ST_Distance(table.c.centroid))
267
268         if details.countries:
269             sql = sql.where(t.c.country_code.in_(details.countries))
270         if details.min_rank > 0:
271             sql = sql.where(t.c.rank_address >= details.min_rank)
272         if details.max_rank < 30:
273             sql = sql.where(t.c.rank_address <= details.max_rank)
274         if details.excluded:
275             sql = sql.where(t.c.place_id.not_in(details.excluded))
276         if details.layers is not None:
277             sql = sql.where(_filter_by_layer(t, details.layers))
278
279         for row in await conn.execute(sql.limit(details.max_results)):
280             result = nres.create_from_placex_row(row, nres.SearchResult)
281             assert result
282             result.accuracy = self.penalty + penalty
283             result.bbox = Bbox.from_wkb(row.bbox.data)
284             results.append(result)
285
286
287
288 class PoiSearch(AbstractSearch):
289     """ Category search in a geographic area.
290     """
291     def __init__(self, sdata: SearchData) -> None:
292         super().__init__(sdata.penalty)
293         self.categories = sdata.qualifiers
294         self.countries = sdata.countries
295
296
297     async def lookup(self, conn: SearchConnection,
298                      details: SearchDetails) -> nres.SearchResults:
299         """ Find results for the search in the database.
300         """
301         t = conn.t.placex
302
303         rows: List[SaRow] = []
304
305         if details.near and details.near_radius is not None and details.near_radius < 0.2:
306             # simply search in placex table
307             sql = _select_placex(t) \
308                       .where(t.c.linked_place_id == None) \
309                       .where(t.c.geometry.ST_DWithin(details.near.sql_value(),
310                                                      details.near_radius)) \
311                       .order_by(t.c.centroid.ST_Distance(details.near.sql_value()))
312
313             if self.countries:
314                 sql = sql.where(t.c.country_code.in_(self.countries.values))
315
316             if details.viewbox is not None and details.bounded_viewbox:
317                 sql = sql.where(t.c.geometry.intersects(details.viewbox.sql_value()))
318
319             classtype = self.categories.values
320             if len(classtype) == 1:
321                 sql = sql.where(t.c.class_ == classtype[0][0]) \
322                          .where(t.c.type == classtype[0][1])
323             else:
324                 sql = sql.where(sa.or_(*(sa.and_(t.c.class_ == cls, t.c.type == typ)
325                                          for cls, typ in classtype)))
326
327             rows.extend(await conn.execute(sql.limit(details.max_results)))
328         else:
329             # use the class type tables
330             for category in self.categories.values:
331                 table = await conn.get_class_table(*category)
332                 if table is not None:
333                     sql = _select_placex(t)\
334                                .join(table, t.c.place_id == table.c.place_id)\
335                                .where(t.c.class_ == category[0])\
336                                .where(t.c.type == category[1])
337
338                     if details.viewbox is not None and details.bounded_viewbox:
339                         sql = sql.where(table.c.centroid.intersects(details.viewbox.sql_value()))
340
341                     if details.near:
342                         sql = sql.order_by(table.c.centroid.ST_Distance(details.near.sql_value()))\
343                                  .where(table.c.centroid.ST_DWithin(details.near.sql_value(),
344                                                                     details.near_radius or 0.5))
345
346                     if self.countries:
347                         sql = sql.where(t.c.country_code.in_(self.countries.values))
348
349                     rows.extend(await conn.execute(sql.limit(details.max_results)))
350
351         results = nres.SearchResults()
352         for row in rows:
353             result = nres.create_from_placex_row(row, nres.SearchResult)
354             assert result
355             result.accuracy = self.penalty + self.categories.get_penalty((row.class_, row.type))
356             result.bbox = Bbox.from_wkb(row.bbox.data)
357             results.append(result)
358
359         return results
360
361
362 class CountrySearch(AbstractSearch):
363     """ Search for a country name or country code.
364     """
365     def __init__(self, sdata: SearchData) -> None:
366         super().__init__(sdata.penalty)
367         self.countries = sdata.countries
368
369
370     async def lookup(self, conn: SearchConnection,
371                      details: SearchDetails) -> nres.SearchResults:
372         """ Find results for the search in the database.
373         """
374         t = conn.t.placex
375
376         sql = _select_placex(t)\
377                 .where(t.c.country_code.in_(self.countries.values))\
378                 .where(t.c.rank_address == 4)
379
380         sql = _add_geometry_columns(sql, t.c.geometry, details)
381
382         if details.excluded:
383             sql = sql.where(t.c.place_id.not_in(details.excluded))
384
385         if details.viewbox is not None and details.bounded_viewbox:
386             sql = sql.where(t.c.geometry.intersects(details.viewbox.sql_value()))
387
388         if details.near is not None and details.near_radius is not None:
389             sql = sql.where(t.c.geometry.ST_DWithin(details.near.sql_value(),
390                                                     details.near_radius))
391
392         results = nres.SearchResults()
393         for row in await conn.execute(sql):
394             result = nres.create_from_placex_row(row, nres.SearchResult)
395             assert result
396             result.accuracy = self.penalty + self.countries.get_penalty(row.country_code, 5.0)
397             results.append(result)
398
399         return results or await self.lookup_in_country_table(conn, details)
400
401
402     async def lookup_in_country_table(self, conn: SearchConnection,
403                                       details: SearchDetails) -> nres.SearchResults:
404         """ Look up the country in the fallback country tables.
405         """
406         # Avoid the fallback search when this is a more search. Country results
407         # usually are in the first batch of results and it is not possible
408         # to exclude these fallbacks.
409         if details.excluded:
410             return nres.SearchResults()
411
412         t = conn.t.country_name
413         tgrid = conn.t.country_grid
414
415         sql = sa.select(tgrid.c.country_code,
416                         tgrid.c.geometry.ST_Centroid().ST_Collect().ST_Centroid()
417                               .label('centroid'))\
418                 .where(tgrid.c.country_code.in_(self.countries.values))\
419                 .group_by(tgrid.c.country_code)
420
421         if details.viewbox is not None and details.bounded_viewbox:
422             sql = sql.where(tgrid.c.geometry.intersects(details.viewbox.sql_value()))
423         if details.near is not None and details.near_radius is not None:
424             sql = sql.where(tgrid.c.geometry.ST_DWithin(details.near.sql_value(),
425                                                         details.near_radius))
426
427         sub = sql.subquery('grid')
428
429         sql = sa.select(t.c.country_code,
430                         (t.c.name
431                          + sa.func.coalesce(t.c.derived_name,
432                                             sa.cast('', type_=conn.t.types.Composite))
433                         ).label('name'),
434                         sub.c.centroid)\
435                 .join(sub, t.c.country_code == sub.c.country_code)
436
437         results = nres.SearchResults()
438         for row in await conn.execute(sql):
439             result = nres.create_from_country_row(row, nres.SearchResult)
440             assert result
441             result.accuracy = self.penalty + self.countries.get_penalty(row.country_code, 5.0)
442             results.append(result)
443
444         return results
445
446
447
448 class PostcodeSearch(AbstractSearch):
449     """ Search for a postcode.
450     """
451     def __init__(self, extra_penalty: float, sdata: SearchData) -> None:
452         super().__init__(sdata.penalty + extra_penalty)
453         self.countries = sdata.countries
454         self.postcodes = sdata.postcodes
455         self.lookups = sdata.lookups
456         self.rankings = sdata.rankings
457
458
459     async def lookup(self, conn: SearchConnection,
460                      details: SearchDetails) -> nres.SearchResults:
461         """ Find results for the search in the database.
462         """
463         t = conn.t.postcode
464
465         sql = sa.select(t.c.place_id, t.c.parent_place_id,
466                         t.c.rank_search, t.c.rank_address,
467                         t.c.postcode, t.c.country_code,
468                         t.c.geometry.label('centroid'))\
469                 .where(t.c.postcode.in_(self.postcodes.values))
470
471         sql = _add_geometry_columns(sql, t.c.geometry, details)
472
473         penalty: SaExpression = sa.literal(self.penalty)
474
475         if details.viewbox is not None:
476             if details.bounded_viewbox:
477                 sql = sql.where(t.c.geometry.intersects(details.viewbox.sql_value()))
478             else:
479                 penalty += sa.case((t.c.geometry.intersects(details.viewbox.sql_value()), 0.0),
480                                    (t.c.geometry.intersects(details.viewbox_x2.sql_value()), 1.0),
481                                    else_=2.0)
482
483         if details.near is not None:
484             if details.near_radius is not None:
485                 sql = sql.where(t.c.geometry.ST_DWithin(details.near.sql_value(),
486                                                         details.near_radius))
487             sql = sql.order_by(t.c.geometry.ST_Distance(details.near.sql_value()))
488
489         if self.countries:
490             sql = sql.where(t.c.country_code.in_(self.countries.values))
491
492         if details.excluded:
493             sql = sql.where(t.c.place_id.not_in(details.excluded))
494
495         if self.lookups:
496             assert len(self.lookups) == 1
497             assert self.lookups[0].lookup_type == 'restrict'
498             tsearch = conn.t.search_name
499             sql = sql.where(tsearch.c.place_id == t.c.parent_place_id)\
500                      .where(sa.func.array_cat(tsearch.c.name_vector,
501                                               tsearch.c.nameaddress_vector,
502                                               type_=ARRAY(sa.Integer))
503                                     .contains(self.lookups[0].tokens))
504
505         for ranking in self.rankings:
506             penalty += ranking.sql_penalty(conn.t.search_name)
507         penalty += sa.case(*((t.c.postcode == v, p) for v, p in self.postcodes),
508                        else_=1.0)
509
510
511         sql = sql.add_columns(penalty.label('accuracy'))
512         sql = sql.order_by('accuracy')
513
514         results = nres.SearchResults()
515         for row in await conn.execute(sql.limit(details.max_results)):
516             result = nres.create_from_postcode_row(row, nres.SearchResult)
517             assert result
518             result.accuracy = row.accuracy
519             results.append(result)
520
521         return results
522
523
524
525 class PlaceSearch(AbstractSearch):
526     """ Generic search for an address or named place.
527     """
528     def __init__(self, extra_penalty: float, sdata: SearchData, expected_count: int) -> None:
529         super().__init__(sdata.penalty + extra_penalty)
530         self.countries = sdata.countries
531         self.postcodes = sdata.postcodes
532         self.housenumbers = sdata.housenumbers
533         self.qualifiers = sdata.qualifiers
534         self.lookups = sdata.lookups
535         self.rankings = sdata.rankings
536         self.expected_count = expected_count
537
538
539     async def lookup(self, conn: SearchConnection,
540                      details: SearchDetails) -> nres.SearchResults:
541         """ Find results for the search in the database.
542         """
543         t = conn.t.placex.alias('p')
544         tsearch = conn.t.search_name.alias('s')
545         limit = details.max_results
546
547         sql = sa.select(t.c.place_id, t.c.osm_type, t.c.osm_id, t.c.name,
548                         t.c.class_, t.c.type,
549                         t.c.address, t.c.extratags,
550                         t.c.housenumber, t.c.postcode, t.c.country_code,
551                         t.c.wikipedia,
552                         t.c.parent_place_id, t.c.rank_address, t.c.rank_search,
553                         t.c.centroid,
554                         t.c.geometry.ST_Expand(0).label('bbox'))\
555                 .where(t.c.place_id == tsearch.c.place_id)
556
557
558         sql = _add_geometry_columns(sql, t.c.geometry, details)
559
560         penalty: SaExpression = sa.literal(self.penalty)
561         for ranking in self.rankings:
562             penalty += ranking.sql_penalty(tsearch)
563
564         for lookup in self.lookups:
565             sql = sql.where(lookup.sql_condition(tsearch))
566
567         if self.countries:
568             sql = sql.where(tsearch.c.country_code.in_(self.countries.values))
569
570         if self.postcodes:
571             # if a postcode is given, don't search for state or country level objects
572             sql = sql.where(tsearch.c.address_rank > 9)
573             tpc = conn.t.postcode
574             if self.expected_count > 1000:
575                 # Many results expected. Restrict by postcode.
576                 sql = sql.where(sa.select(tpc.c.postcode)
577                                   .where(tpc.c.postcode.in_(self.postcodes.values))
578                                   .where(tsearch.c.centroid.ST_DWithin(tpc.c.geometry, 0.12))
579                                   .exists())
580
581             # Less results, only have a preference for close postcodes
582             pc_near = sa.select(sa.func.min(tpc.c.geometry.ST_Distance(tsearch.c.centroid)))\
583                       .where(tpc.c.postcode.in_(self.postcodes.values))\
584                       .scalar_subquery()
585             penalty += sa.case((t.c.postcode.in_(self.postcodes.values), 0.0),
586                                else_=sa.func.coalesce(pc_near, 2.0))
587
588         if details.viewbox is not None:
589             if details.bounded_viewbox:
590                 sql = sql.where(tsearch.c.centroid.intersects(details.viewbox.sql_value()))
591             else:
592                 penalty += sa.case((t.c.geometry.intersects(details.viewbox.sql_value()), 0.0),
593                                    (t.c.geometry.intersects(details.viewbox_x2.sql_value()), 1.0),
594                                    else_=2.0)
595
596         if details.near is not None:
597             if details.near_radius is not None:
598                 sql = sql.where(tsearch.c.centroid.ST_DWithin(details.near.sql_value(),
599                                                          details.near_radius))
600             sql = sql.add_columns(-tsearch.c.centroid.ST_Distance(details.near.sql_value())
601                                       .label('importance'))
602             sql = sql.order_by(sa.desc(sa.text('importance')))
603         else:
604             sql = sql.order_by(penalty - sa.case((tsearch.c.importance > 0, tsearch.c.importance),
605                                   else_=0.75001-(sa.cast(tsearch.c.search_rank, sa.Float())/40)))
606             sql = sql.add_columns(t.c.importance)
607
608
609         sql = sql.add_columns(penalty.label('accuracy'))\
610                  .order_by(sa.text('accuracy'))
611
612         if self.housenumbers:
613             hnr_regexp = f"\\m({'|'.join(self.housenumbers.values)})\\M"
614             sql = sql.where(tsearch.c.address_rank.between(16, 30))\
615                      .where(sa.or_(tsearch.c.address_rank < 30,
616                                   t.c.housenumber.regexp_match(hnr_regexp, flags='i')))
617
618             # Cross check for housenumbers, need to do that on a rather large
619             # set. Worst case there are 40.000 main streets in OSM.
620             inner = sql.limit(10000).subquery()
621
622             # Housenumbers from placex
623             thnr = conn.t.placex.alias('hnr')
624             pid_list = array_agg(thnr.c.place_id) # type: ignore[no-untyped-call]
625             place_sql = sa.select(pid_list)\
626                           .where(thnr.c.parent_place_id == inner.c.place_id)\
627                           .where(thnr.c.housenumber.regexp_match(hnr_regexp, flags='i'))\
628                           .where(thnr.c.linked_place_id == None)\
629                           .where(thnr.c.indexed_status == 0)
630
631             if details.excluded:
632                 place_sql = place_sql.where(thnr.c.place_id.not_in(details.excluded))
633             if self.qualifiers:
634                 place_sql = place_sql.where(self.qualifiers.sql_restrict(thnr))
635
636             numerals = [int(n) for n in self.housenumbers.values if n.isdigit()]
637             interpol_sql: SaExpression
638             tiger_sql: SaExpression
639             if numerals and \
640                (not self.qualifiers or ('place', 'house') in self.qualifiers.values):
641                 # Housenumbers from interpolations
642                 interpol_sql = _make_interpolation_subquery(conn.t.osmline, inner,
643                                                             numerals, details)
644                 # Housenumbers from Tiger
645                 tiger_sql = sa.case((inner.c.country_code == 'us',
646                                      _make_interpolation_subquery(conn.t.tiger, inner,
647                                                                   numerals, details)
648                                     ), else_=None)
649             else:
650                 interpol_sql = sa.literal_column('NULL')
651                 tiger_sql = sa.literal_column('NULL')
652
653             unsort = sa.select(inner, place_sql.scalar_subquery().label('placex_hnr'),
654                                interpol_sql.label('interpol_hnr'),
655                                tiger_sql.label('tiger_hnr')).subquery('unsort')
656             sql = sa.select(unsort)\
657                     .order_by(sa.case((unsort.c.placex_hnr != None, 1),
658                                       (unsort.c.interpol_hnr != None, 2),
659                                       (unsort.c.tiger_hnr != None, 3),
660                                       else_=4),
661                               unsort.c.accuracy)
662         else:
663             sql = sql.where(t.c.linked_place_id == None)\
664                      .where(t.c.indexed_status == 0)
665             if self.qualifiers:
666                 sql = sql.where(self.qualifiers.sql_restrict(t))
667             if details.excluded:
668                 sql = sql.where(tsearch.c.place_id.not_in(details.excluded))
669             if details.min_rank > 0:
670                 sql = sql.where(sa.or_(tsearch.c.address_rank >= details.min_rank,
671                                        tsearch.c.search_rank >= details.min_rank))
672             if details.max_rank < 30:
673                 sql = sql.where(sa.or_(tsearch.c.address_rank <= details.max_rank,
674                                        tsearch.c.search_rank <= details.max_rank))
675             if details.layers is not None:
676                 sql = sql.where(_filter_by_layer(t, details.layers))
677
678
679         results = nres.SearchResults()
680         for row in await conn.execute(sql.limit(limit)):
681             result = nres.create_from_placex_row(row, nres.SearchResult)
682             assert result
683             result.bbox = Bbox.from_wkb(row.bbox.data)
684             result.accuracy = row.accuracy
685             if not details.excluded or not result.place_id in details.excluded:
686                 results.append(result)
687
688             if self.housenumbers and row.rank_address < 30:
689                 if row.placex_hnr:
690                     subs = _get_placex_housenumbers(conn, row.placex_hnr, details)
691                 elif row.interpol_hnr:
692                     subs = _get_osmline(conn, row.interpol_hnr, numerals, details)
693                 elif row.tiger_hnr:
694                     subs = _get_tiger(conn, row.tiger_hnr, numerals, row.osm_id, details)
695                 else:
696                     subs = None
697
698                 if subs is not None:
699                     async for sub in subs:
700                         assert sub.housenumber
701                         sub.accuracy = result.accuracy
702                         if not any(nr in self.housenumbers.values
703                                    for nr in sub.housenumber.split(';')):
704                             sub.accuracy += 0.6
705                         results.append(sub)
706
707                 result.accuracy += 1.0 # penalty for missing housenumber
708
709         return results