1 # SPDX-License-Identifier: GPL-3.0-or-later
3 # This file is part of Nominatim. (https://nominatim.org)
5 # Copyright (C) 2023 by the Nominatim developer community.
6 # For a full list of authors see the git log.
8 Convertion from token assignment to an abstract DB search.
10 from typing import Optional, List, Tuple, Iterator
13 from nominatim.api.types import SearchDetails, DataLayer
14 from nominatim.api.search.query import QueryStruct, TokenType, TokenRange, BreakType
15 from nominatim.api.search.token_assignment import TokenAssignment
16 import nominatim.api.search.db_search_fields as dbf
17 import nominatim.api.search.db_searches as dbs
18 from nominatim.api.logging import log
21 def wrap_near_search(categories: List[Tuple[str, str]],
22 search: dbs.AbstractSearch) -> dbs.NearSearch:
23 """ Create a new search that wraps the given search in a search
24 for near places of the given category.
26 return dbs.NearSearch(penalty=search.penalty,
27 categories=dbf.WeightedCategories(categories,
28 [0.0] * len(categories)),
32 def build_poi_search(category: List[Tuple[str, str]],
33 countries: Optional[List[str]]) -> dbs.PoiSearch:
34 """ Create a new search for places by the given category, possibly
35 constraint to the given countries.
38 ccs = dbf.WeightedStrings(countries, [0.0] * len(countries))
40 ccs = dbf.WeightedStrings([], [])
42 class _PoiData(dbf.SearchData):
44 qualifiers = dbf.WeightedCategories(category, [0.0] * len(category))
47 return dbs.PoiSearch(_PoiData())
51 """ Build the abstract search queries from token assignments.
54 def __init__(self, query: QueryStruct, details: SearchDetails) -> None:
56 self.details = details
60 def configured_for_country(self) -> bool:
61 """ Return true if the search details are configured to
62 allow countries in the result.
64 return self.details.min_rank <= 4 and self.details.max_rank >= 4 \
65 and self.details.layer_enabled(DataLayer.ADDRESS)
69 def configured_for_postcode(self) -> bool:
70 """ Return true if the search details are configured to
71 allow postcodes in the result.
73 return self.details.min_rank <= 5 and self.details.max_rank >= 11\
74 and self.details.layer_enabled(DataLayer.ADDRESS)
78 def configured_for_housenumbers(self) -> bool:
79 """ Return true if the search details are configured to
80 allow addresses in the result.
82 return self.details.max_rank >= 30 \
83 and self.details.layer_enabled(DataLayer.ADDRESS)
86 def build(self, assignment: TokenAssignment) -> Iterator[dbs.AbstractSearch]:
87 """ Yield all possible abstract searches for the given token assignment.
89 sdata = self.get_search_data(assignment)
93 categories = self.get_search_categories(assignment)
95 if assignment.name is None:
96 if categories and not sdata.postcodes:
97 sdata.qualifiers = categories
99 builder = self.build_poi_search(sdata)
101 builder = self.build_special_search(sdata, assignment.address,
104 builder = self.build_name_search(sdata, assignment.name, assignment.address,
108 penalty = min(categories.penalties)
109 categories.penalties = [p - penalty for p in categories.penalties]
110 for search in builder:
111 yield dbs.NearSearch(penalty, categories, search)
116 def build_poi_search(self, sdata: dbf.SearchData) -> Iterator[dbs.AbstractSearch]:
117 """ Build abstract search query for a simple category search.
118 This kind of search requires an additional geographic constraint.
120 if not sdata.housenumbers \
121 and ((self.details.viewbox and self.details.bounded_viewbox) or self.details.near):
122 yield dbs.PoiSearch(sdata)
125 def build_special_search(self, sdata: dbf.SearchData,
126 address: List[TokenRange],
127 is_category: bool) -> Iterator[dbs.AbstractSearch]:
128 """ Build abstract search queries for searches that do not involve
131 if sdata.qualifiers or sdata.housenumbers:
132 # No special searches over housenumbers or qualifiers supported.
135 if sdata.countries and not address and not sdata.postcodes \
136 and self.configured_for_country:
137 yield dbs.CountrySearch(sdata)
139 if sdata.postcodes and (is_category or self.configured_for_postcode):
141 sdata.lookups = [dbf.FieldLookup('nameaddress_vector',
142 [t.token for r in address
143 for t in self.query.get_partials_list(r)],
145 yield dbs.PostcodeSearch(0.4, sdata)
148 def build_name_search(self, sdata: dbf.SearchData,
149 name: TokenRange, address: List[TokenRange],
150 is_category: bool) -> Iterator[dbs.AbstractSearch]:
151 """ Build abstract search queries for simple name or address searches.
153 if is_category or not sdata.housenumbers or self.configured_for_housenumbers:
154 ranking = self.get_name_ranking(name)
155 name_penalty = ranking.normalize_penalty()
157 sdata.rankings.append(ranking)
158 for penalty, count, lookup in self.yield_lookups(name, address):
159 sdata.lookups = lookup
160 yield dbs.PlaceSearch(penalty + name_penalty, sdata, count)
163 def yield_lookups(self, name: TokenRange, address: List[TokenRange])\
164 -> Iterator[Tuple[float, int, List[dbf.FieldLookup]]]:
165 """ Yield all variants how the given name and address should best
166 be searched for. This takes into account how frequent the terms
167 are and tries to find a lookup that optimizes index use.
169 penalty = 0.0 # extra penalty currently unused
171 name_partials = self.query.get_partials_list(name)
172 exp_name_count = min(t.count for t in name_partials)
174 for trange in address:
175 addr_partials.extend(self.query.get_partials_list(trange))
176 addr_tokens = [t.token for t in addr_partials]
177 partials_indexed = all(t.is_indexed for t in name_partials) \
178 and all(t.is_indexed for t in addr_partials)
180 if (len(name_partials) > 3 or exp_name_count < 1000) and partials_indexed:
181 # Lookup by name partials, use address partials to restrict results.
182 lookup = [dbf.FieldLookup('name_vector',
183 [t.token for t in name_partials], 'lookup_all')]
185 lookup.append(dbf.FieldLookup('nameaddress_vector', addr_tokens, 'restrict'))
186 yield penalty, exp_name_count, lookup
189 exp_addr_count = min(t.count for t in addr_partials) if addr_partials else exp_name_count
190 if exp_addr_count < 1000 and partials_indexed:
191 # Lookup by address partials and restrict results through name terms.
192 yield penalty, exp_addr_count,\
193 [dbf.FieldLookup('name_vector', [t.token for t in name_partials], 'restrict'),
194 dbf.FieldLookup('nameaddress_vector', addr_tokens, 'lookup_all')]
197 # Partial term to frequent. Try looking up by rare full names first.
198 name_fulls = self.query.get_tokens(name, TokenType.WORD)
199 rare_names = list(filter(lambda t: t.count < 1000, name_fulls))
200 # At this point drop unindexed partials from the address.
201 # This might yield wrong results, nothing we can do about that.
202 if not partials_indexed:
203 addr_tokens = [t.token for t in addr_partials if t.is_indexed]
204 log().var_dump('before', penalty)
205 penalty += 1.2 * sum(t.penalty for t in addr_partials if not t.is_indexed)
206 log().var_dump('after', penalty)
208 # Any of the full names applies with all of the partials from the address
209 lookup = [dbf.FieldLookup('name_vector', [t.token for t in rare_names], 'lookup_any')]
211 lookup.append(dbf.FieldLookup('nameaddress_vector', addr_tokens, 'restrict'))
212 yield penalty, sum(t.count for t in rare_names), lookup
214 # To catch remaining results, lookup by name and address
215 if all(t.is_indexed for t in name_partials):
216 lookup = [dbf.FieldLookup('name_vector',
217 [t.token for t in name_partials], 'lookup_all')]
219 # we don't have the partials, try with the non-rare names
220 non_rare_names = [t.token for t in name_fulls if t.count >= 1000]
221 if not non_rare_names:
223 lookup = [dbf.FieldLookup('name_vector', non_rare_names, 'lookup_any')]
225 lookup.append(dbf.FieldLookup('nameaddress_vector', addr_tokens, 'lookup_all'))
226 yield penalty + 0.1 * max(0, 5 - len(name_partials) - len(addr_tokens)),\
227 min(exp_name_count, exp_addr_count), lookup
230 def get_name_ranking(self, trange: TokenRange) -> dbf.FieldRanking:
231 """ Create a ranking expression for a name term in the given range.
233 name_fulls = self.query.get_tokens(trange, TokenType.WORD)
234 ranks = [dbf.RankedTokens(t.penalty, [t.token]) for t in name_fulls]
235 ranks.sort(key=lambda r: r.penalty)
236 # Fallback, sum of penalty for partials
237 name_partials = self.query.get_partials_list(trange)
238 default = sum(t.penalty for t in name_partials) + 0.2
239 return dbf.FieldRanking('name_vector', default, ranks)
242 def get_addr_ranking(self, trange: TokenRange) -> dbf.FieldRanking:
243 """ Create a list of ranking expressions for an address term
244 for the given ranges.
246 todo: List[Tuple[int, int, dbf.RankedTokens]] = []
247 heapq.heappush(todo, (0, trange.start, dbf.RankedTokens(0.0, [])))
248 ranks: List[dbf.RankedTokens] = []
250 while todo: # pylint: disable=too-many-nested-blocks
251 neglen, pos, rank = heapq.heappop(todo)
252 for tlist in self.query.nodes[pos].starting:
253 if tlist.ttype in (TokenType.PARTIAL, TokenType.WORD):
254 if tlist.end < trange.end:
255 chgpenalty = PENALTY_WORDCHANGE[self.query.nodes[tlist.end].btype]
256 if tlist.ttype == TokenType.PARTIAL:
257 penalty = rank.penalty + chgpenalty \
258 + max(t.penalty for t in tlist.tokens)
259 heapq.heappush(todo, (neglen - 1, tlist.end,
260 dbf.RankedTokens(penalty, rank.tokens)))
262 for t in tlist.tokens:
263 heapq.heappush(todo, (neglen - 1, tlist.end,
264 rank.with_token(t, chgpenalty)))
265 elif tlist.end == trange.end:
266 if tlist.ttype == TokenType.PARTIAL:
267 ranks.append(dbf.RankedTokens(rank.penalty
268 + max(t.penalty for t in tlist.tokens),
271 ranks.extend(rank.with_token(t, 0.0) for t in tlist.tokens)
273 # Too many variants, bail out and only add
274 # Worst-case Fallback: sum of penalty of partials
275 name_partials = self.query.get_partials_list(trange)
276 default = sum(t.penalty for t in name_partials) + 0.2
277 ranks.append(dbf.RankedTokens(rank.penalty + default, []))
278 # Bail out of outer loop
282 ranks.sort(key=lambda r: len(r.tokens))
283 default = ranks[0].penalty + 0.3
285 ranks.sort(key=lambda r: r.penalty)
287 return dbf.FieldRanking('nameaddress_vector', default, ranks)
290 def get_search_data(self, assignment: TokenAssignment) -> Optional[dbf.SearchData]:
291 """ Collect the tokens for the non-name search fields in the
294 sdata = dbf.SearchData()
295 sdata.penalty = assignment.penalty
296 if assignment.country:
297 tokens = self.query.get_tokens(assignment.country, TokenType.COUNTRY)
298 if self.details.countries:
299 tokens = [t for t in tokens if t.lookup_word in self.details.countries]
302 sdata.set_strings('countries', tokens)
303 elif self.details.countries:
304 sdata.countries = dbf.WeightedStrings(self.details.countries,
305 [0.0] * len(self.details.countries))
306 if assignment.housenumber:
307 sdata.set_strings('housenumbers',
308 self.query.get_tokens(assignment.housenumber,
309 TokenType.HOUSENUMBER))
310 if assignment.postcode:
311 sdata.set_strings('postcodes',
312 self.query.get_tokens(assignment.postcode,
314 if assignment.qualifier:
315 sdata.set_qualifiers(self.query.get_tokens(assignment.qualifier,
316 TokenType.QUALIFIER))
318 if assignment.address:
319 sdata.set_ranking([self.get_addr_ranking(r) for r in assignment.address])
326 def get_search_categories(self,
327 assignment: TokenAssignment) -> Optional[dbf.WeightedCategories]:
328 """ Collect tokens for category search or use the categories
329 requested per parameter.
330 Returns None if no category search is requested.
332 if assignment.category:
333 tokens = [t for t in self.query.get_tokens(assignment.category,
335 if not self.details.categories
336 or t.get_category() in self.details.categories]
337 return dbf.WeightedCategories([t.get_category() for t in tokens],
338 [t.penalty for t in tokens])
340 if self.details.categories:
341 return dbf.WeightedCategories(self.details.categories,
342 [0.0] * len(self.details.categories))
347 PENALTY_WORDCHANGE = {
348 BreakType.START: 0.0,
350 BreakType.PHRASE: 0.0,