1 # SPDX-License-Identifier: GPL-3.0-or-later
3 # This file is part of Nominatim. (https://nominatim.org)
5 # Copyright (C) 2023 by the Nominatim developer community.
6 # For a full list of authors see the git log.
8 Create query interpretations where each vertice in the query is assigned
9 a specific function (expressed as a token type).
11 from typing import Optional, List, Iterator
14 import nominatim.api.search.query as qmod
15 from nominatim.api.logging import log
17 # pylint: disable=too-many-return-statements,too-many-branches
19 @dataclasses.dataclass
21 """ A token range for a specific type of tokens.
24 trange: qmod.TokenRange
27 PENALTY_TOKENCHANGE = {
28 qmod.BreakType.START: 0.0,
29 qmod.BreakType.END: 0.0,
30 qmod.BreakType.PHRASE: 0.0,
31 qmod.BreakType.WORD: 0.1,
32 qmod.BreakType.PART: 0.2,
33 qmod.BreakType.TOKEN: 0.4
36 TypedRangeSeq = List[TypedRange]
38 @dataclasses.dataclass
39 class TokenAssignment: # pylint: disable=too-many-instance-attributes
40 """ Representation of a possible assignment of token types
41 to the tokens in a tokenized query.
44 name: Optional[qmod.TokenRange] = None
45 address: List[qmod.TokenRange] = dataclasses.field(default_factory=list)
46 housenumber: Optional[qmod.TokenRange] = None
47 postcode: Optional[qmod.TokenRange] = None
48 country: Optional[qmod.TokenRange] = None
49 near_item: Optional[qmod.TokenRange] = None
50 qualifier: Optional[qmod.TokenRange] = None
54 def from_ranges(ranges: TypedRangeSeq) -> 'TokenAssignment':
55 """ Create a new token assignment from a sequence of typed spans.
57 out = TokenAssignment()
59 if token.ttype == qmod.TokenType.PARTIAL:
60 out.address.append(token.trange)
61 elif token.ttype == qmod.TokenType.HOUSENUMBER:
62 out.housenumber = token.trange
63 elif token.ttype == qmod.TokenType.POSTCODE:
64 out.postcode = token.trange
65 elif token.ttype == qmod.TokenType.COUNTRY:
66 out.country = token.trange
67 elif token.ttype == qmod.TokenType.NEAR_ITEM:
68 out.near_item = token.trange
69 elif token.ttype == qmod.TokenType.QUALIFIER:
70 out.qualifier = token.trange
75 """ Working state used to put together the token assignments.
77 Represents an intermediate state while traversing the tokenized
80 def __init__(self, seq: TypedRangeSeq,
81 direction: int = 0, penalty: float = 0.0) -> None:
83 self.direction = direction
84 self.penalty = penalty
87 def __str__(self) -> str:
88 seq = ''.join(f'[{r.trange.start} - {r.trange.end}: {r.ttype.name}]' for r in self.seq)
89 return f'{seq} (dir: {self.direction}, penalty: {self.penalty})'
93 def end_pos(self) -> int:
94 """ Return the index of the global end of the current sequence.
96 return self.seq[-1].trange.end if self.seq else 0
99 def has_types(self, *ttypes: qmod.TokenType) -> bool:
100 """ Check if the current sequence contains any typed ranges of
103 return any(s.ttype in ttypes for s in self.seq)
106 def is_final(self) -> bool:
107 """ Return true when the sequence cannot be extended by any
108 form of token anymore.
110 # Country and category must be the final term for left-to-right
111 return len(self.seq) > 1 and \
112 self.seq[-1].ttype in (qmod.TokenType.COUNTRY, qmod.TokenType.NEAR_ITEM)
115 def appendable(self, ttype: qmod.TokenType) -> Optional[int]:
116 """ Check if the give token type is appendable to the existing sequence.
118 Returns None if the token type is not appendable, otherwise the
119 new direction of the sequence after adding such a type. The
122 if ttype == qmod.TokenType.WORD:
126 # Append unconditionally to the empty list
127 if ttype == qmod.TokenType.COUNTRY:
129 if ttype in (qmod.TokenType.HOUSENUMBER, qmod.TokenType.QUALIFIER):
131 return self.direction
133 # Name tokens are always acceptable and don't change direction
134 if ttype == qmod.TokenType.PARTIAL:
135 # qualifiers cannot appear in the middle of the query. They need
136 # to be near the next phrase.
137 if self.direction == -1 \
138 and any(t.ttype == qmod.TokenType.QUALIFIER for t in self.seq[:-1]):
140 return self.direction
142 # Other tokens may only appear once
143 if self.has_types(ttype):
146 if ttype == qmod.TokenType.HOUSENUMBER:
147 if self.direction == 1:
148 if len(self.seq) == 1 and self.seq[0].ttype == qmod.TokenType.QUALIFIER:
150 if len(self.seq) > 2 \
151 or self.has_types(qmod.TokenType.POSTCODE, qmod.TokenType.COUNTRY):
152 return None # direction left-to-right: housenumber must come before anything
153 elif self.direction == -1 \
154 or self.has_types(qmod.TokenType.POSTCODE, qmod.TokenType.COUNTRY):
155 return -1 # force direction right-to-left if after other terms
157 return self.direction
159 if ttype == qmod.TokenType.POSTCODE:
160 if self.direction == -1:
161 if self.has_types(qmod.TokenType.HOUSENUMBER, qmod.TokenType.QUALIFIER):
164 if self.direction == 1:
165 return None if self.has_types(qmod.TokenType.COUNTRY) else 1
166 if self.has_types(qmod.TokenType.HOUSENUMBER, qmod.TokenType.QUALIFIER):
168 return self.direction
170 if ttype == qmod.TokenType.COUNTRY:
171 return None if self.direction == -1 else 1
173 if ttype == qmod.TokenType.NEAR_ITEM:
174 return self.direction
176 if ttype == qmod.TokenType.QUALIFIER:
177 if self.direction == 1:
178 if (len(self.seq) == 1
179 and self.seq[0].ttype in (qmod.TokenType.PARTIAL, qmod.TokenType.NEAR_ITEM)) \
180 or (len(self.seq) == 2
181 and self.seq[0].ttype == qmod.TokenType.NEAR_ITEM
182 and self.seq[1].ttype == qmod.TokenType.PARTIAL):
185 if self.direction == -1:
188 tempseq = self.seq[1:] if self.seq[0].ttype == qmod.TokenType.NEAR_ITEM else self.seq
189 if len(tempseq) == 0:
191 if len(tempseq) == 1 and self.seq[0].ttype == qmod.TokenType.HOUSENUMBER:
193 if len(tempseq) > 1 or self.has_types(qmod.TokenType.POSTCODE, qmod.TokenType.COUNTRY):
200 def advance(self, ttype: qmod.TokenType, end_pos: int,
201 btype: qmod.BreakType) -> Optional['_TokenSequence']:
202 """ Return a new token sequence state with the given token type
205 newdir = self.appendable(ttype)
210 newseq = [TypedRange(ttype, qmod.TokenRange(0, end_pos))]
214 if btype != qmod.BreakType.PHRASE and last.ttype == ttype:
215 # extend the existing range
216 newseq = self.seq[:-1] + [TypedRange(ttype, last.trange.replace_end(end_pos))]
220 newseq = list(self.seq) + [TypedRange(ttype,
221 qmod.TokenRange(last.trange.end, end_pos))]
222 new_penalty = PENALTY_TOKENCHANGE[btype]
224 return _TokenSequence(newseq, newdir, self.penalty + new_penalty)
227 def _adapt_penalty_from_priors(self, priors: int, new_dir: int) -> bool:
229 if self.direction == 0:
230 self.direction = new_dir
240 def recheck_sequence(self) -> bool:
241 """ Check that the sequence is a fully valid token assignment
242 and adapt direction and penalties further if necessary.
244 This function catches some impossible assignments that need
245 forward context and can therefore not be excluded when building
248 # housenumbers may not be further than 2 words from the beginning.
249 # If there are two words in front, give it a penalty.
250 hnrpos = next((i for i, tr in enumerate(self.seq)
251 if tr.ttype == qmod.TokenType.HOUSENUMBER),
253 if hnrpos is not None:
254 if self.direction != -1:
255 priors = sum(1 for t in self.seq[:hnrpos] if t.ttype == qmod.TokenType.PARTIAL)
256 if not self._adapt_penalty_from_priors(priors, -1):
258 if self.direction != 1:
259 priors = sum(1 for t in self.seq[hnrpos+1:] if t.ttype == qmod.TokenType.PARTIAL)
260 if not self._adapt_penalty_from_priors(priors, 1):
262 if any(t.ttype == qmod.TokenType.NEAR_ITEM for t in self.seq):
268 def _get_assignments_postcode(self, base: TokenAssignment,
269 query_len: int) -> Iterator[TokenAssignment]:
270 """ Yield possible assignments of Postcode searches with an
273 assert base.postcode is not None
275 if (base.postcode.start == 0 and self.direction != -1)\
276 or (base.postcode.end == query_len and self.direction != 1):
277 log().comment('postcode search')
278 # <address>,<postcode> should give preference to address search
279 if base.postcode.start == 0:
280 penalty = self.penalty
281 self.direction = -1 # name searches are only possible backwards
283 penalty = self.penalty + 0.1
284 self.direction = 1 # name searches are only possible forwards
285 yield dataclasses.replace(base, penalty=penalty)
288 def _get_assignments_address_forward(self, base: TokenAssignment,
289 query: qmod.QueryStruct) -> Iterator[TokenAssignment]:
290 """ Yield possible assignments of address searches with
291 left-to-right reading.
293 first = base.address[0]
295 log().comment('first word = name')
296 yield dataclasses.replace(base, penalty=self.penalty,
297 name=first, address=base.address[1:])
300 # * if another name term comes after the first one and before the
302 # * a qualifier comes after the name
303 # * the containing phrase is strictly typed
304 if (base.housenumber and first.end < base.housenumber.start)\
305 or (base.qualifier and base.qualifier > first)\
306 or (query.nodes[first.start].ptype != qmod.PhraseType.NONE):
309 penalty = self.penalty
312 # * <name>, <street>, <housenumber> , ...
313 # * queries that are comma-separated
314 if (base.housenumber and base.housenumber > first) or len(query.source) > 1:
317 for i in range(first.start + 1, first.end):
318 name, addr = first.split(i)
319 log().comment(f'split first word = name ({i - first.start})')
320 yield dataclasses.replace(base, name=name, address=[addr] + base.address[1:],
321 penalty=penalty + PENALTY_TOKENCHANGE[query.nodes[i].btype])
324 def _get_assignments_address_backward(self, base: TokenAssignment,
325 query: qmod.QueryStruct) -> Iterator[TokenAssignment]:
326 """ Yield possible assignments of address searches with
327 right-to-left reading.
329 last = base.address[-1]
331 if self.direction == -1 or len(base.address) > 1:
332 log().comment('last word = name')
333 yield dataclasses.replace(base, penalty=self.penalty,
334 name=last, address=base.address[:-1])
337 # * if another name term comes before the last one and after the
339 # * a qualifier comes before the name
340 # * the containing phrase is strictly typed
341 if (base.housenumber and last.start > base.housenumber.end)\
342 or (base.qualifier and base.qualifier < last)\
343 or (query.nodes[last.start].ptype != qmod.PhraseType.NONE):
346 penalty = self.penalty
347 if base.housenumber and base.housenumber < last:
349 if len(query.source) > 1:
352 for i in range(last.start + 1, last.end):
353 addr, name = last.split(i)
354 log().comment(f'split last word = name ({i - last.start})')
355 yield dataclasses.replace(base, name=name, address=base.address[:-1] + [addr],
356 penalty=penalty + PENALTY_TOKENCHANGE[query.nodes[i].btype])
359 def get_assignments(self, query: qmod.QueryStruct) -> Iterator[TokenAssignment]:
360 """ Yield possible assignments for the current sequence.
362 This function splits up general name assignments into name
363 and address and yields all possible variants of that.
365 base = TokenAssignment.from_ranges(self.seq)
367 num_addr_tokens = sum(t.end - t.start for t in base.address)
368 if num_addr_tokens > 50:
371 # Postcode search (postcode-only search is covered in next case)
372 if base.postcode is not None and base.address:
373 yield from self._get_assignments_postcode(base, query.num_token_slots())
375 # Postcode or country-only search
377 if not base.housenumber and (base.postcode or base.country or base.near_item):
378 log().comment('postcode/country search')
379 yield dataclasses.replace(base, penalty=self.penalty)
381 # <postcode>,<address> should give preference to postcode search
382 if base.postcode and base.postcode.start == 0:
385 # Right-to-left reading of the address
386 if self.direction != -1:
387 yield from self._get_assignments_address_forward(base, query)
389 # Left-to-right reading of the address
390 if self.direction != 1:
391 yield from self._get_assignments_address_backward(base, query)
393 # variant for special housenumber searches
394 if base.housenumber and not base.qualifier:
395 yield dataclasses.replace(base, penalty=self.penalty)
398 def yield_token_assignments(query: qmod.QueryStruct) -> Iterator[TokenAssignment]:
399 """ Return possible word type assignments to word positions.
401 The assignments are computed from the concrete tokens listed
402 in the tokenized query.
404 The result includes the penalty for transitions from one word type to
405 another. It does not include penalties for transitions within a
408 todo = [_TokenSequence([], direction=0 if query.source[0].ptype == qmod.PhraseType.NONE else 1)]
412 node = query.nodes[state.end_pos]
414 for tlist in node.starting:
415 newstate = state.advance(tlist.ttype, tlist.end, node.btype)
416 if newstate is not None:
417 if newstate.end_pos == query.num_token_slots():
418 if newstate.recheck_sequence():
419 log().var_dump('Assignment', newstate)
420 yield from newstate.get_assignments(query)
421 elif not newstate.is_final():
422 todo.append(newstate)