]> git.openstreetmap.org Git - nominatim.git/blob - src/nominatim_api/v1/helpers.py
apply request timeout also while waiting for a connection from pool
[nominatim.git] / src / nominatim_api / v1 / helpers.py
1 # SPDX-License-Identifier: GPL-3.0-or-later
2 #
3 # This file is part of Nominatim. (https://nominatim.org)
4 #
5 # Copyright (C) 2024 by the Nominatim developer community.
6 # For a full list of authors see the git log.
7 """
8 Helper function for parsing parameters and and outputting data
9 specifically for the v1 version of the API.
10 """
11 from typing import Tuple, Optional, Any, Dict, Iterable
12 from itertools import chain
13 import re
14
15 from ..results import SearchResult, SearchResults, SourceTable
16 from ..types import SearchDetails, GeometryFormat
17
18
19 REVERSE_MAX_RANKS = [2, 2, 2,   # 0-2   Continent/Sea
20                      4, 4,      # 3-4   Country
21                      8,         # 5     State
22                      10, 10,    # 6-7   Region
23                      12, 12,    # 8-9   County
24                      16, 17,    # 10-11 City
25                      18,        # 12    Town
26                      19,        # 13    Village/Suburb
27                      22,        # 14    Hamlet/Neighbourhood
28                      25,        # 15    Localities
29                      26,        # 16    Major Streets
30                      27,        # 17    Minor Streets
31                      30         # 18    Building
32                      ]
33
34
35 def zoom_to_rank(zoom: int) -> int:
36     """ Convert a zoom parameter into a rank according to the v1 API spec.
37     """
38     return REVERSE_MAX_RANKS[max(0, min(18, zoom))]
39
40
41 FEATURE_TYPE_TO_RANK: Dict[Optional[str], Tuple[int, int]] = {
42     'country': (4, 4),
43     'state': (8, 8),
44     'city': (14, 16),
45     'settlement': (8, 20)
46 }
47
48
49 def feature_type_to_rank(feature_type: Optional[str]) -> Tuple[int, int]:
50     """ Convert a feature type parameter to a tuple of
51         feature type name, minimum rank and maximum rank.
52     """
53     return FEATURE_TYPE_TO_RANK.get(feature_type, (0, 30))
54
55
56 def extend_query_parts(queryparts: Dict[str, Any], details: Dict[str, Any],
57                        feature_type: Optional[str],
58                        namedetails: bool, extratags: bool,
59                        excluded: Iterable[str]) -> None:
60     """ Add parameters from details dictionary to the query parts
61         dictionary which is suitable as URL parameter dictionary.
62     """
63     parsed = SearchDetails.from_kwargs(details)
64     if parsed.geometry_output != GeometryFormat.NONE:
65         if GeometryFormat.GEOJSON & parsed.geometry_output:
66             queryparts['polygon_geojson'] = '1'
67         if GeometryFormat.KML & parsed.geometry_output:
68             queryparts['polygon_kml'] = '1'
69         if GeometryFormat.SVG & parsed.geometry_output:
70             queryparts['polygon_svg'] = '1'
71         if GeometryFormat.TEXT & parsed.geometry_output:
72             queryparts['polygon_text'] = '1'
73     if parsed.address_details:
74         queryparts['addressdetails'] = '1'
75     if parsed.entrances:
76         queryparts['entrances'] = '1'
77     if namedetails:
78         queryparts['namedetails'] = '1'
79     if extratags:
80         queryparts['extratags'] = '1'
81     if parsed.geometry_simplification > 0.0:
82         queryparts['polygon_threshold'] = f"{parsed.geometry_simplification:.6g}"
83     if parsed.max_results != 10:
84         queryparts['limit'] = str(parsed.max_results)
85     if parsed.countries:
86         queryparts['countrycodes'] = ','.join(parsed.countries)
87     queryparts['exclude_place_ids'] = \
88         ','.join(chain(excluded, map(str, (e for e in parsed.excluded if e > 0))))
89     if parsed.viewbox:
90         queryparts['viewbox'] = ','.join(f"{c:.7g}" for c in parsed.viewbox.coords)
91     if parsed.bounded_viewbox:
92         queryparts['bounded'] = '1'
93     if not details['dedupe']:
94         queryparts['dedupe'] = '0'
95     if feature_type in FEATURE_TYPE_TO_RANK:
96         queryparts['featureType'] = feature_type
97
98
99 def deduplicate_results(results: SearchResults, max_results: int) -> SearchResults:
100     """ Remove results that look like duplicates.
101
102         Two results are considered the same if they have the same OSM ID
103         or if they have the same category, display name and rank.
104     """
105     osm_ids_done = set()
106     classification_done = set()
107     deduped = SearchResults()
108     for result in results:
109         if result.source_table == SourceTable.POSTCODE:
110             assert result.names and 'ref' in result.names
111             if any(_is_postcode_relation_for(r, result.names['ref']) for r in results):
112                 continue
113         if result.source_table == SourceTable.PLACEX:
114             classification = (result.osm_object[0] if result.osm_object else None,
115                               result.category,
116                               result.display_name,
117                               result.rank_address)
118             if result.osm_object not in osm_ids_done \
119                and classification not in classification_done:
120                 deduped.append(result)
121             osm_ids_done.add(result.osm_object)
122             classification_done.add(classification)
123         else:
124             deduped.append(result)
125         if len(deduped) >= max_results:
126             break
127
128     return deduped
129
130
131 def _is_postcode_relation_for(result: SearchResult, postcode: str) -> bool:
132     return result.source_table == SourceTable.PLACEX \
133            and result.osm_object is not None \
134            and result.osm_object[0] == 'R' \
135            and result.category == ('boundary', 'postal_code') \
136            and result.names is not None \
137            and result.names.get('ref') == postcode
138
139
140 def _deg(axis: str) -> str:
141     return f"(?P<{axis}_deg>\\d+\\.\\d+)°?"
142
143
144 def _deg_min(axis: str) -> str:
145     return f"(?P<{axis}_deg>\\d+)[°\\s]+(?P<{axis}_min>[\\d.]+)[′']*"
146
147
148 def _deg_min_sec(axis: str) -> str:
149     return f"(?P<{axis}_deg>\\d+)[°\\s]+(?P<{axis}_min>\\d+)[′'\\s]+(?P<{axis}_sec>[\\d.]+)[\"″]*"
150
151
152 COORD_REGEX = [re.compile(r'(?:(?P<pre>.*?)\s+)??' + r + r'(?:\s+(?P<post>.*))?') for r in (
153     r"(?P<ns>[NS])\s*" + _deg('lat') + r"[\s,]+" + r"(?P<ew>[EW])\s*" + _deg('lon'),
154     _deg('lat') + r"\s*(?P<ns>[NS])[\s,]+" + _deg('lon') + r"\s*(?P<ew>[EW])",
155     r"(?P<ns>[NS])\s*" + _deg_min('lat') + r"[\s,]+" + r"(?P<ew>[EW])\s*" + _deg_min('lon'),
156     _deg_min('lat') + r"\s*(?P<ns>[NS])[\s,]+" + _deg_min('lon') + r"\s*(?P<ew>[EW])",
157     r"(?P<ns>[NS])\s*" + _deg_min_sec('lat') + r"[\s,]+" + r"(?P<ew>[EW])\s*" + _deg_min_sec('lon'),
158     _deg_min_sec('lat') + r"\s*(?P<ns>[NS])[\s,]+" + _deg_min_sec('lon') + r"\s*(?P<ew>[EW])",
159     r"\[?(?P<lat_deg>[+-]?\d+\.\d+)[\s,]+(?P<lon_deg>[+-]?\d+\.\d+)\]?"
160 )]
161
162
163 def extract_coords_from_query(query: str) -> Tuple[str, Optional[float], Optional[float]]:
164     """ Look for something that is formatted like a coordinate at the
165         beginning or end of the query. If found, extract the coordinate and
166         return the remaining query (or the empty string if the query
167         consisted of nothing but a coordinate).
168
169         Only the first match will be returned.
170     """
171     for regex in COORD_REGEX:
172         match = regex.fullmatch(query)
173         if match is None:
174             continue
175         groups = match.groupdict()
176         if not groups['pre'] or not groups['post']:
177             x = float(groups['lon_deg']) \
178                 + float(groups.get('lon_min', 0.0)) / 60.0 \
179                 + float(groups.get('lon_sec', 0.0)) / 3600.0
180             if groups.get('ew') == 'W':
181                 x = -x
182             y = float(groups['lat_deg']) \
183                 + float(groups.get('lat_min', 0.0)) / 60.0 \
184                 + float(groups.get('lat_sec', 0.0)) / 3600.0
185             if groups.get('ns') == 'S':
186                 y = -y
187             return groups['pre'] or groups['post'] or '', x, y
188
189     return query, None, None
190
191
192 CATEGORY_REGEX = re.compile(r'(?P<pre>.*?)\[(?P<cls>[a-zA-Z_]+)=(?P<typ>[a-zA-Z_]+)\](?P<post>.*)')
193
194
195 def extract_category_from_query(query: str) -> Tuple[str, Optional[str], Optional[str]]:
196     """ Extract a hidden category specification of the form '[key=value]' from
197         the query. If found, extract key and value  and
198         return the remaining query (or the empty string if the query
199         consisted of nothing but a category).
200
201         Only the first match will be returned.
202     """
203     match = CATEGORY_REGEX.search(query)
204     if match is not None:
205         return (match.group('pre').strip() + ' ' + match.group('post').strip()).strip(), \
206                match.group('cls'), match.group('typ')
207
208     return query, None, None