]> git.openstreetmap.org Git - nominatim.git/blob - nominatim/api/search/query.py
Merge pull request #3117 from lonvia/fix-assorted-search-errors
[nominatim.git] / nominatim / api / search / query.py
1 # SPDX-License-Identifier: GPL-3.0-or-later
2 #
3 # This file is part of Nominatim. (https://nominatim.org)
4 #
5 # Copyright (C) 2023 by the Nominatim developer community.
6 # For a full list of authors see the git log.
7 """
8 Datastructures for a tokenized query.
9 """
10 from typing import List, Tuple, Optional, Iterator
11 from abc import ABC, abstractmethod
12 import dataclasses
13 import enum
14
15 class BreakType(enum.Enum):
16     """ Type of break between tokens.
17     """
18     START = '<'
19     """ Begin of the query. """
20     END = '>'
21     """ End of the query. """
22     PHRASE = ','
23     """ Break between two phrases. """
24     WORD = ' '
25     """ Break between words. """
26     PART = '-'
27     """ Break inside a word, for example a hyphen or apostrophe. """
28     TOKEN = '`'
29     """ Break created as a result of tokenization.
30         This may happen in languages without spaces between words.
31     """
32
33
34 class TokenType(enum.Enum):
35     """ Type of token.
36     """
37     WORD = enum.auto()
38     """ Full name of a place. """
39     PARTIAL = enum.auto()
40     """ Word term without breaks, does not necessarily represent a full name. """
41     HOUSENUMBER = enum.auto()
42     """ Housenumber term. """
43     POSTCODE = enum.auto()
44     """ Postal code term. """
45     COUNTRY = enum.auto()
46     """ Country name or reference. """
47     QUALIFIER = enum.auto()
48     """ Special term used together with name (e.g. _Hotel_ Bellevue). """
49     CATEGORY = enum.auto()
50     """ Special term used as searchable object(e.g. supermarket in ...). """
51
52
53 class PhraseType(enum.Enum):
54     """ Designation of a phrase.
55     """
56     NONE = 0
57     """ No specific designation (i.e. source is free-form query). """
58     AMENITY = enum.auto()
59     """ Contains name or type of a POI. """
60     STREET = enum.auto()
61     """ Contains a street name optionally with a housenumber. """
62     CITY = enum.auto()
63     """ Contains the postal city. """
64     COUNTY = enum.auto()
65     """ Contains the equivalent of a county. """
66     STATE = enum.auto()
67     """ Contains a state or province. """
68     POSTCODE = enum.auto()
69     """ Contains a postal code. """
70     COUNTRY = enum.auto()
71     """ Contains the country name or code. """
72
73     def compatible_with(self, ttype: TokenType) -> bool:
74         """ Check if the given token type can be used with the phrase type.
75         """
76         if self == PhraseType.NONE:
77             return True
78         if self == PhraseType.AMENITY:
79             return ttype in (TokenType.WORD, TokenType.PARTIAL,
80                              TokenType.QUALIFIER, TokenType.CATEGORY)
81         if self == PhraseType.STREET:
82             return ttype in (TokenType.WORD, TokenType.PARTIAL, TokenType.HOUSENUMBER)
83         if self == PhraseType.POSTCODE:
84             return ttype == TokenType.POSTCODE
85         if self == PhraseType.COUNTRY:
86             return ttype == TokenType.COUNTRY
87
88         return ttype in (TokenType.WORD, TokenType.PARTIAL)
89
90
91 @dataclasses.dataclass
92 class Token(ABC):
93     """ Base type for tokens.
94         Specific query analyzers must implement the concrete token class.
95     """
96
97     penalty: float
98     token: int
99     count: int
100     lookup_word: str
101     is_indexed: bool
102
103
104     @abstractmethod
105     def get_category(self) -> Tuple[str, str]:
106         """ Return the category restriction for qualifier terms and
107             category objects.
108         """
109
110 @dataclasses.dataclass
111 class TokenRange:
112     """ Indexes of query nodes over which a token spans.
113     """
114     start: int
115     end: int
116
117     def __lt__(self, other: 'TokenRange') -> bool:
118         return self.end <= other.start
119
120
121     def __le__(self, other: 'TokenRange') -> bool:
122         return NotImplemented
123
124
125     def __gt__(self, other: 'TokenRange') -> bool:
126         return self.start >= other.end
127
128
129     def __ge__(self, other: 'TokenRange') -> bool:
130         return NotImplemented
131
132
133     def replace_start(self, new_start: int) -> 'TokenRange':
134         """ Return a new token range with the new start.
135         """
136         return TokenRange(new_start, self.end)
137
138
139     def replace_end(self, new_end: int) -> 'TokenRange':
140         """ Return a new token range with the new end.
141         """
142         return TokenRange(self.start, new_end)
143
144
145     def split(self, index: int) -> Tuple['TokenRange', 'TokenRange']:
146         """ Split the span into two spans at the given index.
147             The index must be within the span.
148         """
149         return self.replace_end(index), self.replace_start(index)
150
151
152 @dataclasses.dataclass
153 class TokenList:
154     """ List of all tokens of a given type going from one breakpoint to another.
155     """
156     end: int
157     ttype: TokenType
158     tokens: List[Token]
159
160
161     def add_penalty(self, penalty: float) -> None:
162         """ Add the given penalty to all tokens in the list.
163         """
164         for token in self.tokens:
165             token.penalty += penalty
166
167
168 @dataclasses.dataclass
169 class QueryNode:
170     """ A node of the querry representing a break between terms.
171     """
172     btype: BreakType
173     ptype: PhraseType
174     starting: List[TokenList] = dataclasses.field(default_factory=list)
175
176     def has_tokens(self, end: int, *ttypes: TokenType) -> bool:
177         """ Check if there are tokens of the given types ending at the
178             given node.
179         """
180         return any(tl.end == end and tl.ttype in ttypes for tl in self.starting)
181
182
183     def get_tokens(self, end: int, ttype: TokenType) -> Optional[List[Token]]:
184         """ Get the list of tokens of the given type starting at this node
185             and ending at the node 'end'. Returns 'None' if no such
186             tokens exist.
187         """
188         for tlist in self.starting:
189             if tlist.end == end and tlist.ttype == ttype:
190                 return tlist.tokens
191         return None
192
193
194 @dataclasses.dataclass
195 class Phrase:
196     """ A normalized query part. Phrases may be typed which means that
197         they then represent a specific part of the address.
198     """
199     ptype: PhraseType
200     text: str
201
202
203 class QueryStruct:
204     """ A tokenized search query together with the normalized source
205         from which the tokens have been parsed.
206
207         The query contains a list of nodes that represent the breaks
208         between words. Tokens span between nodes, which don't necessarily
209         need to be direct neighbours. Thus the query is represented as a
210         directed acyclic graph.
211
212         When created, a query contains a single node: the start of the
213         query. Further nodes can be added by appending to 'nodes'.
214     """
215
216     def __init__(self, source: List[Phrase]) -> None:
217         self.source = source
218         self.nodes: List[QueryNode] = \
219             [QueryNode(BreakType.START, source[0].ptype if source else PhraseType.NONE)]
220
221
222     def num_token_slots(self) -> int:
223         """ Return the length of the query in vertice steps.
224         """
225         return len(self.nodes) - 1
226
227
228     def add_node(self, btype: BreakType, ptype: PhraseType) -> None:
229         """ Append a new break node with the given break type.
230             The phrase type denotes the type for any tokens starting
231             at the node.
232         """
233         self.nodes.append(QueryNode(btype, ptype))
234
235
236     def add_token(self, trange: TokenRange, ttype: TokenType, token: Token) -> None:
237         """ Add a token to the query. 'start' and 'end' are the indexes of the
238             nodes from which to which the token spans. The indexes must exist
239             and are expected to be in the same phrase.
240             'ttype' denotes the type of the token and 'token' the token to
241             be inserted.
242
243             If the token type is not compatible with the phrase it should
244             be added to, then the token is silently dropped.
245         """
246         snode = self.nodes[trange.start]
247         if snode.ptype.compatible_with(ttype):
248             tlist = snode.get_tokens(trange.end, ttype)
249             if tlist is None:
250                 snode.starting.append(TokenList(trange.end, ttype, [token]))
251             else:
252                 tlist.append(token)
253
254
255     def get_tokens(self, trange: TokenRange, ttype: TokenType) -> List[Token]:
256         """ Get the list of tokens of a given type, spanning the given
257             nodes. The nodes must exist. If no tokens exist, an
258             empty list is returned.
259         """
260         return self.nodes[trange.start].get_tokens(trange.end, ttype) or []
261
262
263     def get_partials_list(self, trange: TokenRange) -> List[Token]:
264         """ Create a list of partial tokens between the given nodes.
265             The list is composed of the first token of type PARTIAL
266             going to the subsequent node. Such PARTIAL tokens are
267             assumed to exist.
268         """
269         return [next(iter(self.get_tokens(TokenRange(i, i+1), TokenType.PARTIAL)))
270                           for i in range(trange.start, trange.end)]
271
272
273     def iter_token_lists(self) -> Iterator[Tuple[int, QueryNode, TokenList]]:
274         """ Iterator over all token lists in the query.
275         """
276         for i, node in enumerate(self.nodes):
277             for tlist in node.starting:
278                 yield i, node, tlist
279
280
281     def find_lookup_word_by_id(self, token: int) -> str:
282         """ Find the first token with the given token ID and return
283             its lookup word. Returns 'None' if no such token exists.
284             The function is very slow and must only be used for
285             debugging.
286         """
287         for node in self.nodes:
288             for tlist in node.starting:
289                 for t in tlist.tokens:
290                     if t.token == token:
291                         return f"[{tlist.ttype.name[0]}]{t.lookup_word}"
292         return 'None'