1 # SPDX-License-Identifier: GPL-3.0-or-later
3 # This file is part of Nominatim. (https://nominatim.org)
5 # Copyright (C) 2023 by the Nominatim developer community.
6 # For a full list of authors see the git log.
8 Convertion from token assignment to an abstract DB search.
10 from typing import Optional, List, Tuple, Iterator
13 from nominatim.api.types import SearchDetails, DataLayer
14 from nominatim.api.search.query import QueryStruct, Token, TokenType, TokenRange, BreakType
15 from nominatim.api.search.token_assignment import TokenAssignment
16 import nominatim.api.search.db_search_fields as dbf
17 import nominatim.api.search.db_searches as dbs
18 from nominatim.api.logging import log
21 def wrap_near_search(categories: List[Tuple[str, str]],
22 search: dbs.AbstractSearch) -> dbs.NearSearch:
23 """ Create a new search that wraps the given search in a search
24 for near places of the given category.
26 return dbs.NearSearch(penalty=search.penalty,
27 categories=dbf.WeightedCategories(categories,
28 [0.0] * len(categories)),
32 def build_poi_search(category: List[Tuple[str, str]],
33 countries: Optional[List[str]]) -> dbs.PoiSearch:
34 """ Create a new search for places by the given category, possibly
35 constraint to the given countries.
38 ccs = dbf.WeightedStrings(countries, [0.0] * len(countries))
40 ccs = dbf.WeightedStrings([], [])
42 class _PoiData(dbf.SearchData):
44 qualifiers = dbf.WeightedCategories(category, [0.0] * len(category))
47 return dbs.PoiSearch(_PoiData())
51 """ Build the abstract search queries from token assignments.
54 def __init__(self, query: QueryStruct, details: SearchDetails) -> None:
56 self.details = details
60 def configured_for_country(self) -> bool:
61 """ Return true if the search details are configured to
62 allow countries in the result.
64 return self.details.min_rank <= 4 and self.details.max_rank >= 4 \
65 and self.details.layer_enabled(DataLayer.ADDRESS)
69 def configured_for_postcode(self) -> bool:
70 """ Return true if the search details are configured to
71 allow postcodes in the result.
73 return self.details.min_rank <= 5 and self.details.max_rank >= 11\
74 and self.details.layer_enabled(DataLayer.ADDRESS)
78 def configured_for_housenumbers(self) -> bool:
79 """ Return true if the search details are configured to
80 allow addresses in the result.
82 return self.details.max_rank >= 30 \
83 and self.details.layer_enabled(DataLayer.ADDRESS)
86 def build(self, assignment: TokenAssignment) -> Iterator[dbs.AbstractSearch]:
87 """ Yield all possible abstract searches for the given token assignment.
89 sdata = self.get_search_data(assignment)
93 categories = self.get_search_categories(assignment)
95 if assignment.name is None:
96 if categories and not sdata.postcodes:
97 sdata.qualifiers = categories
99 builder = self.build_poi_search(sdata)
100 elif assignment.housenumber:
101 hnr_tokens = self.query.get_tokens(assignment.housenumber,
102 TokenType.HOUSENUMBER)
103 builder = self.build_housenumber_search(sdata, hnr_tokens, assignment.address)
105 builder = self.build_special_search(sdata, assignment.address,
108 builder = self.build_name_search(sdata, assignment.name, assignment.address,
112 penalty = min(categories.penalties)
113 categories.penalties = [p - penalty for p in categories.penalties]
114 for search in builder:
115 yield dbs.NearSearch(penalty, categories, search)
120 def build_poi_search(self, sdata: dbf.SearchData) -> Iterator[dbs.AbstractSearch]:
121 """ Build abstract search query for a simple category search.
122 This kind of search requires an additional geographic constraint.
124 if not sdata.housenumbers \
125 and ((self.details.viewbox and self.details.bounded_viewbox) or self.details.near):
126 yield dbs.PoiSearch(sdata)
129 def build_special_search(self, sdata: dbf.SearchData,
130 address: List[TokenRange],
131 is_category: bool) -> Iterator[dbs.AbstractSearch]:
132 """ Build abstract search queries for searches that do not involve
136 # No special searches over qualifiers supported.
139 if sdata.countries and not address and not sdata.postcodes \
140 and self.configured_for_country:
141 yield dbs.CountrySearch(sdata)
143 if sdata.postcodes and (is_category or self.configured_for_postcode):
144 penalty = 0.0 if sdata.countries else 0.1
146 sdata.lookups = [dbf.FieldLookup('nameaddress_vector',
147 [t.token for r in address
148 for t in self.query.get_partials_list(r)],
151 yield dbs.PostcodeSearch(penalty, sdata)
154 def build_housenumber_search(self, sdata: dbf.SearchData, hnrs: List[Token],
155 address: List[TokenRange]) -> Iterator[dbs.AbstractSearch]:
156 """ Build a simple address search for special entries where the
157 housenumber is the main name token.
159 partial_tokens: List[int] = []
160 for trange in address:
161 partial_tokens.extend(t.token for t in self.query.get_partials_list(trange))
163 sdata.lookups = [dbf.FieldLookup('name_vector', [t.token for t in hnrs], 'lookup_any'),
164 dbf.FieldLookup('nameaddress_vector', partial_tokens, 'lookup_all')
166 sdata.housenumbers = dbf.WeightedStrings([], [])
167 yield dbs.PlaceSearch(0.05, sdata, sum(t.count for t in hnrs))
170 def build_name_search(self, sdata: dbf.SearchData,
171 name: TokenRange, address: List[TokenRange],
172 is_category: bool) -> Iterator[dbs.AbstractSearch]:
173 """ Build abstract search queries for simple name or address searches.
175 if is_category or not sdata.housenumbers or self.configured_for_housenumbers:
176 ranking = self.get_name_ranking(name)
177 name_penalty = ranking.normalize_penalty()
179 sdata.rankings.append(ranking)
180 for penalty, count, lookup in self.yield_lookups(name, address):
181 sdata.lookups = lookup
182 yield dbs.PlaceSearch(penalty + name_penalty, sdata, count)
185 def yield_lookups(self, name: TokenRange, address: List[TokenRange])\
186 -> Iterator[Tuple[float, int, List[dbf.FieldLookup]]]:
187 """ Yield all variants how the given name and address should best
188 be searched for. This takes into account how frequent the terms
189 are and tries to find a lookup that optimizes index use.
191 penalty = 0.0 # extra penalty currently unused
193 name_partials = self.query.get_partials_list(name)
194 exp_name_count = min(t.count for t in name_partials)
196 for trange in address:
197 addr_partials.extend(self.query.get_partials_list(trange))
198 addr_tokens = [t.token for t in addr_partials]
199 partials_indexed = all(t.is_indexed for t in name_partials) \
200 and all(t.is_indexed for t in addr_partials)
202 if (len(name_partials) > 3 or exp_name_count < 1000) and partials_indexed:
203 # Lookup by name partials, use address partials to restrict results.
204 lookup = [dbf.FieldLookup('name_vector',
205 [t.token for t in name_partials], 'lookup_all')]
207 lookup.append(dbf.FieldLookup('nameaddress_vector', addr_tokens, 'restrict'))
208 yield penalty, exp_name_count, lookup
211 exp_addr_count = min(t.count for t in addr_partials) if addr_partials else exp_name_count
212 if exp_addr_count < 1000 and partials_indexed:
213 # Lookup by address partials and restrict results through name terms.
214 # Give this a small penalty because lookups in the address index are
216 yield penalty + exp_addr_count/5000, exp_addr_count,\
217 [dbf.FieldLookup('name_vector', [t.token for t in name_partials], 'restrict'),
218 dbf.FieldLookup('nameaddress_vector', addr_tokens, 'lookup_all')]
221 # Partial term to frequent. Try looking up by rare full names first.
222 name_fulls = self.query.get_tokens(name, TokenType.WORD)
223 rare_names = list(filter(lambda t: t.count < 10000, name_fulls))
224 # At this point drop unindexed partials from the address.
225 # This might yield wrong results, nothing we can do about that.
226 if not partials_indexed:
227 addr_tokens = [t.token for t in addr_partials if t.is_indexed]
228 penalty += 1.2 * sum(t.penalty for t in addr_partials if not t.is_indexed)
230 # Any of the full names applies with all of the partials from the address
231 lookup = [dbf.FieldLookup('name_vector', [t.token for t in rare_names], 'lookup_any')]
233 lookup.append(dbf.FieldLookup('nameaddress_vector', addr_tokens, 'restrict'))
234 yield penalty, sum(t.count for t in rare_names), lookup
236 # To catch remaining results, lookup by name and address
237 # We only do this if there is a reasonable number of results expected.
238 if min(exp_name_count, exp_addr_count) < 10000:
239 if all(t.is_indexed for t in name_partials):
240 lookup = [dbf.FieldLookup('name_vector',
241 [t.token for t in name_partials], 'lookup_all')]
243 # we don't have the partials, try with the non-rare names
244 non_rare_names = [t.token for t in name_fulls if t.count >= 1000]
245 if not non_rare_names:
247 lookup = [dbf.FieldLookup('name_vector', non_rare_names, 'lookup_any')]
249 lookup.append(dbf.FieldLookup('nameaddress_vector', addr_tokens, 'lookup_all'))
250 penalty += 0.1 * max(0, 5 - len(name_partials) - len(addr_tokens))
251 if len(rare_names) == len(name_fulls):
252 # if there already was a search for all full tokens,
253 # avoid this if anything has been found
255 yield penalty, min(exp_name_count, exp_addr_count), lookup
258 def get_name_ranking(self, trange: TokenRange) -> dbf.FieldRanking:
259 """ Create a ranking expression for a name term in the given range.
261 name_fulls = self.query.get_tokens(trange, TokenType.WORD)
262 ranks = [dbf.RankedTokens(t.penalty, [t.token]) for t in name_fulls]
263 ranks.sort(key=lambda r: r.penalty)
264 # Fallback, sum of penalty for partials
265 name_partials = self.query.get_partials_list(trange)
266 default = sum(t.penalty for t in name_partials) + 0.2
267 return dbf.FieldRanking('name_vector', default, ranks)
270 def get_addr_ranking(self, trange: TokenRange) -> dbf.FieldRanking:
271 """ Create a list of ranking expressions for an address term
272 for the given ranges.
274 todo: List[Tuple[int, int, dbf.RankedTokens]] = []
275 heapq.heappush(todo, (0, trange.start, dbf.RankedTokens(0.0, [])))
276 ranks: List[dbf.RankedTokens] = []
278 while todo: # pylint: disable=too-many-nested-blocks
279 neglen, pos, rank = heapq.heappop(todo)
280 for tlist in self.query.nodes[pos].starting:
281 if tlist.ttype in (TokenType.PARTIAL, TokenType.WORD):
282 if tlist.end < trange.end:
283 chgpenalty = PENALTY_WORDCHANGE[self.query.nodes[tlist.end].btype]
284 if tlist.ttype == TokenType.PARTIAL:
285 penalty = rank.penalty + chgpenalty \
286 + max(t.penalty for t in tlist.tokens)
287 heapq.heappush(todo, (neglen - 1, tlist.end,
288 dbf.RankedTokens(penalty, rank.tokens)))
290 for t in tlist.tokens:
291 heapq.heappush(todo, (neglen - 1, tlist.end,
292 rank.with_token(t, chgpenalty)))
293 elif tlist.end == trange.end:
294 if tlist.ttype == TokenType.PARTIAL:
295 ranks.append(dbf.RankedTokens(rank.penalty
296 + max(t.penalty for t in tlist.tokens),
299 ranks.extend(rank.with_token(t, 0.0) for t in tlist.tokens)
301 # Too many variants, bail out and only add
302 # Worst-case Fallback: sum of penalty of partials
303 name_partials = self.query.get_partials_list(trange)
304 default = sum(t.penalty for t in name_partials) + 0.2
305 ranks.append(dbf.RankedTokens(rank.penalty + default, []))
306 # Bail out of outer loop
310 ranks.sort(key=lambda r: len(r.tokens))
311 default = ranks[0].penalty + 0.3
313 ranks.sort(key=lambda r: r.penalty)
315 return dbf.FieldRanking('nameaddress_vector', default, ranks)
318 def get_search_data(self, assignment: TokenAssignment) -> Optional[dbf.SearchData]:
319 """ Collect the tokens for the non-name search fields in the
322 sdata = dbf.SearchData()
323 sdata.penalty = assignment.penalty
324 if assignment.country:
325 tokens = self.query.get_tokens(assignment.country, TokenType.COUNTRY)
326 if self.details.countries:
327 tokens = [t for t in tokens if t.lookup_word in self.details.countries]
330 sdata.set_strings('countries', tokens)
331 elif self.details.countries:
332 sdata.countries = dbf.WeightedStrings(self.details.countries,
333 [0.0] * len(self.details.countries))
334 if assignment.housenumber:
335 sdata.set_strings('housenumbers',
336 self.query.get_tokens(assignment.housenumber,
337 TokenType.HOUSENUMBER))
338 if assignment.postcode:
339 sdata.set_strings('postcodes',
340 self.query.get_tokens(assignment.postcode,
342 if assignment.qualifier:
343 sdata.set_qualifiers(self.query.get_tokens(assignment.qualifier,
344 TokenType.QUALIFIER))
346 if assignment.address:
347 sdata.set_ranking([self.get_addr_ranking(r) for r in assignment.address])
354 def get_search_categories(self,
355 assignment: TokenAssignment) -> Optional[dbf.WeightedCategories]:
356 """ Collect tokens for category search or use the categories
357 requested per parameter.
358 Returns None if no category search is requested.
360 if assignment.category:
361 tokens = [t for t in self.query.get_tokens(assignment.category,
363 if not self.details.categories
364 or t.get_category() in self.details.categories]
365 return dbf.WeightedCategories([t.get_category() for t in tokens],
366 [t.penalty for t in tokens])
368 if self.details.categories:
369 return dbf.WeightedCategories(self.details.categories,
370 [0.0] * len(self.details.categories))
375 PENALTY_WORDCHANGE = {
376 BreakType.START: 0.0,
378 BreakType.PHRASE: 0.0,