1 # SPDX-License-Identifier: GPL-3.0-or-later
3 # This file is part of Nominatim. (https://nominatim.org)
5 # Copyright (C) 2023 by the Nominatim developer community.
6 # For a full list of authors see the git log.
8 Convertion from token assignment to an abstract DB search.
10 from typing import Optional, List, Tuple, Iterator, Dict
13 from nominatim.api.types import SearchDetails, DataLayer
14 from nominatim.api.search.query import QueryStruct, Token, TokenType, TokenRange, BreakType
15 from nominatim.api.search.token_assignment import TokenAssignment
16 import nominatim.api.search.db_search_fields as dbf
17 import nominatim.api.search.db_searches as dbs
18 import nominatim.api.search.db_search_lookups as lookups
21 def wrap_near_search(categories: List[Tuple[str, str]],
22 search: dbs.AbstractSearch) -> dbs.NearSearch:
23 """ Create a new search that wraps the given search in a search
24 for near places of the given category.
26 return dbs.NearSearch(penalty=search.penalty,
27 categories=dbf.WeightedCategories(categories,
28 [0.0] * len(categories)),
32 def build_poi_search(category: List[Tuple[str, str]],
33 countries: Optional[List[str]]) -> dbs.PoiSearch:
34 """ Create a new search for places by the given category, possibly
35 constraint to the given countries.
38 ccs = dbf.WeightedStrings(countries, [0.0] * len(countries))
40 ccs = dbf.WeightedStrings([], [])
42 class _PoiData(dbf.SearchData):
44 qualifiers = dbf.WeightedCategories(category, [0.0] * len(category))
47 return dbs.PoiSearch(_PoiData())
51 """ Build the abstract search queries from token assignments.
54 def __init__(self, query: QueryStruct, details: SearchDetails) -> None:
56 self.details = details
60 def configured_for_country(self) -> bool:
61 """ Return true if the search details are configured to
62 allow countries in the result.
64 return self.details.min_rank <= 4 and self.details.max_rank >= 4 \
65 and self.details.layer_enabled(DataLayer.ADDRESS)
69 def configured_for_postcode(self) -> bool:
70 """ Return true if the search details are configured to
71 allow postcodes in the result.
73 return self.details.min_rank <= 5 and self.details.max_rank >= 11\
74 and self.details.layer_enabled(DataLayer.ADDRESS)
78 def configured_for_housenumbers(self) -> bool:
79 """ Return true if the search details are configured to
80 allow addresses in the result.
82 return self.details.max_rank >= 30 \
83 and self.details.layer_enabled(DataLayer.ADDRESS)
86 def build(self, assignment: TokenAssignment) -> Iterator[dbs.AbstractSearch]:
87 """ Yield all possible abstract searches for the given token assignment.
89 sdata = self.get_search_data(assignment)
93 near_items = self.get_near_items(assignment)
94 if near_items is not None and not near_items:
95 return # impossible compbination of near items and category parameter
97 if assignment.name is None:
98 if near_items and not sdata.postcodes:
99 sdata.qualifiers = near_items
101 builder = self.build_poi_search(sdata)
102 elif assignment.housenumber:
103 hnr_tokens = self.query.get_tokens(assignment.housenumber,
104 TokenType.HOUSENUMBER)
105 builder = self.build_housenumber_search(sdata, hnr_tokens, assignment.address)
107 builder = self.build_special_search(sdata, assignment.address,
110 builder = self.build_name_search(sdata, assignment.name, assignment.address,
114 penalty = min(near_items.penalties)
115 near_items.penalties = [p - penalty for p in near_items.penalties]
116 for search in builder:
117 search_penalty = search.penalty
119 yield dbs.NearSearch(penalty + assignment.penalty + search_penalty,
122 for search in builder:
123 search.penalty += assignment.penalty
127 def build_poi_search(self, sdata: dbf.SearchData) -> Iterator[dbs.AbstractSearch]:
128 """ Build abstract search query for a simple category search.
129 This kind of search requires an additional geographic constraint.
131 if not sdata.housenumbers \
132 and ((self.details.viewbox and self.details.bounded_viewbox) or self.details.near):
133 yield dbs.PoiSearch(sdata)
136 def build_special_search(self, sdata: dbf.SearchData,
137 address: List[TokenRange],
138 is_category: bool) -> Iterator[dbs.AbstractSearch]:
139 """ Build abstract search queries for searches that do not involve
143 # No special searches over qualifiers supported.
146 if sdata.countries and not address and not sdata.postcodes \
147 and self.configured_for_country:
148 yield dbs.CountrySearch(sdata)
150 if sdata.postcodes and (is_category or self.configured_for_postcode):
151 penalty = 0.0 if sdata.countries else 0.1
153 sdata.lookups = [dbf.FieldLookup('nameaddress_vector',
154 [t.token for r in address
155 for t in self.query.get_partials_list(r)],
158 yield dbs.PostcodeSearch(penalty, sdata)
161 def build_housenumber_search(self, sdata: dbf.SearchData, hnrs: List[Token],
162 address: List[TokenRange]) -> Iterator[dbs.AbstractSearch]:
163 """ Build a simple address search for special entries where the
164 housenumber is the main name token.
166 sdata.lookups = [dbf.FieldLookup('name_vector', [t.token for t in hnrs], lookups.LookupAny)]
167 expected_count = sum(t.count for t in hnrs)
169 partials = {t.token: t.count for trange in address
170 for t in self.query.get_partials_list(trange)}
172 if expected_count < 8000:
173 sdata.lookups.append(dbf.FieldLookup('nameaddress_vector',
174 list(partials), lookups.Restrict))
175 elif len(partials) != 1 or list(partials.values())[0] < 10000:
176 sdata.lookups.append(dbf.FieldLookup('nameaddress_vector',
177 list(partials), lookups.LookupAll))
179 sdata.lookups.append(
180 dbf.FieldLookup('nameaddress_vector',
182 in self.query.get_tokens(address[0], TokenType.WORD)],
185 sdata.housenumbers = dbf.WeightedStrings([], [])
186 yield dbs.PlaceSearch(0.05, sdata, expected_count)
189 def build_name_search(self, sdata: dbf.SearchData,
190 name: TokenRange, address: List[TokenRange],
191 is_category: bool) -> Iterator[dbs.AbstractSearch]:
192 """ Build abstract search queries for simple name or address searches.
194 if is_category or not sdata.housenumbers or self.configured_for_housenumbers:
195 ranking = self.get_name_ranking(name)
196 name_penalty = ranking.normalize_penalty()
198 sdata.rankings.append(ranking)
199 for penalty, count, lookup in self.yield_lookups(name, address):
200 sdata.lookups = lookup
201 yield dbs.PlaceSearch(penalty + name_penalty, sdata, count)
204 def yield_lookups(self, name: TokenRange, address: List[TokenRange])\
205 -> Iterator[Tuple[float, int, List[dbf.FieldLookup]]]:
206 """ Yield all variants how the given name and address should best
207 be searched for. This takes into account how frequent the terms
208 are and tries to find a lookup that optimizes index use.
210 penalty = 0.0 # extra penalty
211 name_partials = {t.token: t for t in self.query.get_partials_list(name)}
213 addr_partials = [t for r in address for t in self.query.get_partials_list(r)]
214 addr_tokens = list({t.token for t in addr_partials})
216 partials_indexed = all(t.is_indexed for t in name_partials.values()) \
217 and all(t.is_indexed for t in addr_partials)
218 exp_count = min(t.count for t in name_partials.values()) / (2**(len(name_partials) - 1))
220 if (len(name_partials) > 3 or exp_count < 8000) and partials_indexed:
221 yield penalty, exp_count, dbf.lookup_by_names(list(name_partials.keys()), addr_tokens)
224 # Partial term to frequent. Try looking up by rare full names first.
225 name_fulls = self.query.get_tokens(name, TokenType.WORD)
227 fulls_count = sum(t.count for t in name_fulls)
228 # At this point drop unindexed partials from the address.
229 # This might yield wrong results, nothing we can do about that.
230 if not partials_indexed:
231 addr_tokens = [t.token for t in addr_partials if t.is_indexed]
232 penalty += 1.2 * sum(t.penalty for t in addr_partials if not t.is_indexed)
233 # Any of the full names applies with all of the partials from the address
234 yield penalty, fulls_count / (2**len(addr_tokens)),\
235 dbf.lookup_by_any_name([t.token for t in name_fulls],
237 fulls_count > 30000 / max(1, len(addr_tokens)))
239 # To catch remaining results, lookup by name and address
240 # We only do this if there is a reasonable number of results expected.
241 exp_count = exp_count / (2**len(addr_tokens)) if addr_tokens else exp_count
242 if exp_count < 10000 and all(t.is_indexed for t in name_partials.values()):
243 lookup = [dbf.FieldLookup('name_vector', list(name_partials.keys()), lookups.LookupAll)]
245 lookup.append(dbf.FieldLookup('nameaddress_vector', addr_tokens, lookups.LookupAll))
246 penalty += 0.35 * max(1 if name_fulls else 0.1,
247 5 - len(name_partials) - len(addr_tokens))
248 yield penalty, exp_count, lookup
251 def get_name_ranking(self, trange: TokenRange,
252 db_field: str = 'name_vector') -> dbf.FieldRanking:
253 """ Create a ranking expression for a name term in the given range.
255 name_fulls = self.query.get_tokens(trange, TokenType.WORD)
256 ranks = [dbf.RankedTokens(t.penalty, [t.token]) for t in name_fulls]
257 ranks.sort(key=lambda r: r.penalty)
258 # Fallback, sum of penalty for partials
259 name_partials = self.query.get_partials_list(trange)
260 default = sum(t.penalty for t in name_partials) + 0.2
261 return dbf.FieldRanking(db_field, default, ranks)
264 def get_addr_ranking(self, trange: TokenRange) -> dbf.FieldRanking:
265 """ Create a list of ranking expressions for an address term
266 for the given ranges.
268 todo: List[Tuple[int, int, dbf.RankedTokens]] = []
269 heapq.heappush(todo, (0, trange.start, dbf.RankedTokens(0.0, [])))
270 ranks: List[dbf.RankedTokens] = []
272 while todo: # pylint: disable=too-many-nested-blocks
273 neglen, pos, rank = heapq.heappop(todo)
274 for tlist in self.query.nodes[pos].starting:
275 if tlist.ttype in (TokenType.PARTIAL, TokenType.WORD):
276 if tlist.end < trange.end:
277 chgpenalty = PENALTY_WORDCHANGE[self.query.nodes[tlist.end].btype]
278 if tlist.ttype == TokenType.PARTIAL:
279 penalty = rank.penalty + chgpenalty \
280 + max(t.penalty for t in tlist.tokens)
281 heapq.heappush(todo, (neglen - 1, tlist.end,
282 dbf.RankedTokens(penalty, rank.tokens)))
284 for t in tlist.tokens:
285 heapq.heappush(todo, (neglen - 1, tlist.end,
286 rank.with_token(t, chgpenalty)))
287 elif tlist.end == trange.end:
288 if tlist.ttype == TokenType.PARTIAL:
289 ranks.append(dbf.RankedTokens(rank.penalty
290 + max(t.penalty for t in tlist.tokens),
293 ranks.extend(rank.with_token(t, 0.0) for t in tlist.tokens)
295 # Too many variants, bail out and only add
296 # Worst-case Fallback: sum of penalty of partials
297 name_partials = self.query.get_partials_list(trange)
298 default = sum(t.penalty for t in name_partials) + 0.2
299 ranks.append(dbf.RankedTokens(rank.penalty + default, []))
300 # Bail out of outer loop
304 ranks.sort(key=lambda r: len(r.tokens))
305 default = ranks[0].penalty + 0.3
307 ranks.sort(key=lambda r: r.penalty)
309 return dbf.FieldRanking('nameaddress_vector', default, ranks)
312 def get_search_data(self, assignment: TokenAssignment) -> Optional[dbf.SearchData]:
313 """ Collect the tokens for the non-name search fields in the
316 sdata = dbf.SearchData()
317 sdata.penalty = assignment.penalty
318 if assignment.country:
319 tokens = self.get_country_tokens(assignment.country)
322 sdata.set_strings('countries', tokens)
323 elif self.details.countries:
324 sdata.countries = dbf.WeightedStrings(self.details.countries,
325 [0.0] * len(self.details.countries))
326 if assignment.housenumber:
327 sdata.set_strings('housenumbers',
328 self.query.get_tokens(assignment.housenumber,
329 TokenType.HOUSENUMBER))
330 if assignment.postcode:
331 sdata.set_strings('postcodes',
332 self.query.get_tokens(assignment.postcode,
334 if assignment.qualifier:
335 tokens = self.get_qualifier_tokens(assignment.qualifier)
338 sdata.set_qualifiers(tokens)
339 elif self.details.categories:
340 sdata.qualifiers = dbf.WeightedCategories(self.details.categories,
341 [0.0] * len(self.details.categories))
343 if assignment.address:
344 if not assignment.name and assignment.housenumber:
345 # housenumber search: the first item needs to be handled like
346 # a name in ranking or penalties are not comparable with
348 sdata.set_ranking([self.get_name_ranking(assignment.address[0],
349 db_field='nameaddress_vector')]
350 + [self.get_addr_ranking(r) for r in assignment.address[1:]])
352 sdata.set_ranking([self.get_addr_ranking(r) for r in assignment.address])
359 def get_country_tokens(self, trange: TokenRange) -> List[Token]:
360 """ Return the list of country tokens for the given range,
361 optionally filtered by the country list from the details
364 tokens = self.query.get_tokens(trange, TokenType.COUNTRY)
365 if self.details.countries:
366 tokens = [t for t in tokens if t.lookup_word in self.details.countries]
371 def get_qualifier_tokens(self, trange: TokenRange) -> List[Token]:
372 """ Return the list of qualifier tokens for the given range,
373 optionally filtered by the qualifier list from the details
376 tokens = self.query.get_tokens(trange, TokenType.QUALIFIER)
377 if self.details.categories:
378 tokens = [t for t in tokens if t.get_category() in self.details.categories]
383 def get_near_items(self, assignment: TokenAssignment) -> Optional[dbf.WeightedCategories]:
384 """ Collect tokens for near items search or use the categories
385 requested per parameter.
386 Returns None if no category search is requested.
388 if assignment.near_item:
389 tokens: Dict[Tuple[str, str], float] = {}
390 for t in self.query.get_tokens(assignment.near_item, TokenType.NEAR_ITEM):
391 cat = t.get_category()
392 # The category of a near search will be that of near_item.
393 # Thus, if search is restricted to a category parameter,
394 # the two sets must intersect.
395 if (not self.details.categories or cat in self.details.categories)\
396 and t.penalty < tokens.get(cat, 1000.0):
397 tokens[cat] = t.penalty
398 return dbf.WeightedCategories(list(tokens.keys()), list(tokens.values()))
403 PENALTY_WORDCHANGE = {
404 BreakType.START: 0.0,
406 BreakType.PHRASE: 0.0,