]> git.openstreetmap.org Git - nominatim.git/blob - nominatim/api/core.py
enable API use with psycopg 3
[nominatim.git] / nominatim / api / core.py
1 # SPDX-License-Identifier: GPL-2.0-only
2 #
3 # This file is part of Nominatim. (https://nominatim.org)
4 #
5 # Copyright (C) 2023 by the Nominatim developer community.
6 # For a full list of authors see the git log.
7 """
8 Implementation of classes for API access via libraries.
9 """
10 from typing import Mapping, Optional, Any, AsyncIterator, Dict, Sequence
11 import asyncio
12 import contextlib
13 from pathlib import Path
14
15 import sqlalchemy as sa
16 import sqlalchemy.ext.asyncio as sa_asyncio
17
18
19 from nominatim.db.sqlalchemy_schema import SearchTables
20 from nominatim.db.async_core_library import PGCORE_LIB, PGCORE_ERROR
21 from nominatim.config import Configuration
22 from nominatim.api.connection import SearchConnection
23 from nominatim.api.status import get_status, StatusResult
24 from nominatim.api.lookup import get_detailed_place, get_simple_place
25 from nominatim.api.reverse import ReverseGeocoder
26 from nominatim.api.types import PlaceRef, LookupDetails, AnyPoint, DataLayer
27 from nominatim.api.results import DetailedResult, ReverseResult, SearchResults
28
29
30 class NominatimAPIAsync:
31     """ API loader asynchornous version.
32     """
33     def __init__(self, project_dir: Path,
34                  environ: Optional[Mapping[str, str]] = None) -> None:
35         self.config = Configuration(project_dir, environ)
36         self.server_version = 0
37
38         self._engine_lock = asyncio.Lock()
39         self._engine: Optional[sa_asyncio.AsyncEngine] = None
40         self._tables: Optional[SearchTables] = None
41         self._property_cache: Dict[str, Any] = {'DB:server_version': 0}
42
43
44     async def setup_database(self) -> None:
45         """ Set up the engine and connection parameters.
46
47             This function will be implicitly called when the database is
48             accessed for the first time. You may also call it explicitly to
49             avoid that the first call is delayed by the setup.
50         """
51         async with self._engine_lock:
52             if self._engine:
53                 return
54
55             dsn = self.config.get_database_params()
56
57             query = {k: v for k, v in dsn.items()
58                       if k not in ('user', 'password', 'dbname', 'host', 'port')}
59             if PGCORE_LIB == 'asyncpg':
60                 query['prepared_statement_cache_size'] = '0'
61
62             dburl = sa.engine.URL.create(
63                        f'postgresql+{PGCORE_LIB}',
64                        database=dsn.get('dbname'),
65                        username=dsn.get('user'), password=dsn.get('password'),
66                        host=dsn.get('host'), port=int(dsn['port']) if 'port' in dsn else None,
67                        query=query)
68             engine = sa_asyncio.create_async_engine(dburl, future=True)
69
70             try:
71                 async with engine.begin() as conn:
72                     result = await conn.scalar(sa.text('SHOW server_version_num'))
73                     server_version = int(result)
74             except (PGCORE_ERROR, sa.exc.OperationalError):
75                 server_version = 0
76
77             if server_version >= 110000:
78                 @sa.event.listens_for(engine.sync_engine, "connect")
79                 def _on_connect(dbapi_con: Any, _: Any) -> None:
80                     cursor = dbapi_con.cursor()
81                     cursor.execute("SET jit_above_cost TO '-1'")
82                     cursor.execute("SET max_parallel_workers_per_gather TO '0'")
83                 # Make sure that all connections get the new settings
84                 await self.close()
85
86             self._property_cache['DB:server_version'] = server_version
87
88             self._tables = SearchTables(sa.MetaData(), engine.name) # pylint: disable=no-member
89             self._engine = engine
90
91
92     async def close(self) -> None:
93         """ Close all active connections to the database. The NominatimAPIAsync
94             object remains usable after closing. If a new API functions is
95             called, new connections are created.
96         """
97         if self._engine is not None:
98             await self._engine.dispose()
99
100
101     @contextlib.asynccontextmanager
102     async def begin(self) -> AsyncIterator[SearchConnection]:
103         """ Create a new connection with automatic transaction handling.
104
105             This function may be used to get low-level access to the database.
106             Refer to the documentation of SQLAlchemy for details how to use
107             the connection object.
108         """
109         if self._engine is None:
110             await self.setup_database()
111
112         assert self._engine is not None
113         assert self._tables is not None
114
115         async with self._engine.begin() as conn:
116             yield SearchConnection(conn, self._tables, self._property_cache)
117
118
119     async def status(self) -> StatusResult:
120         """ Return the status of the database.
121         """
122         try:
123             async with self.begin() as conn:
124                 status = await get_status(conn)
125         except (PGCORE_ERROR, sa.exc.OperationalError):
126             return StatusResult(700, 'Database connection failed')
127
128         return status
129
130
131     async def details(self, place: PlaceRef,
132                       details: Optional[LookupDetails] = None) -> Optional[DetailedResult]:
133         """ Get detailed information about a place in the database.
134
135             Returns None if there is no entry under the given ID.
136         """
137         async with self.begin() as conn:
138             return await get_detailed_place(conn, place, details or LookupDetails())
139
140
141     async def lookup(self, places: Sequence[PlaceRef],
142                       details: Optional[LookupDetails] = None) -> SearchResults:
143         """ Get simple information about a list of places.
144
145             Returns a list of place information for all IDs that were found.
146         """
147         if details is None:
148             details = LookupDetails()
149         async with self.begin() as conn:
150             return SearchResults(filter(None,
151                                         [await get_simple_place(conn, p, details) for p in places]))
152
153
154     async def reverse(self, coord: AnyPoint, max_rank: Optional[int] = None,
155                       layer: Optional[DataLayer] = None,
156                       details: Optional[LookupDetails] = None) -> Optional[ReverseResult]:
157         """ Find a place by its coordinates. Also known as reverse geocoding.
158
159             Returns the closest result that can be found or None if
160             no place matches the given criteria.
161         """
162         # The following negation handles NaN correctly. Don't change.
163         if not abs(coord[0]) <= 180 or not abs(coord[1]) <= 90:
164             # There are no results to be expected outside valid coordinates.
165             return None
166
167         if layer is None:
168             layer = DataLayer.ADDRESS | DataLayer.POI
169
170         max_rank = max(0, min(max_rank or 30, 30))
171
172         async with self.begin() as conn:
173             geocoder = ReverseGeocoder(conn, max_rank, layer,
174                                        details or LookupDetails())
175             return await geocoder.lookup(coord)
176
177
178 class NominatimAPI:
179     """ API loader, synchronous version.
180     """
181
182     def __init__(self, project_dir: Path,
183                  environ: Optional[Mapping[str, str]] = None) -> None:
184         self._loop = asyncio.new_event_loop()
185         self._async_api = NominatimAPIAsync(project_dir, environ)
186
187
188     def close(self) -> None:
189         """ Close all active connections to the database. The NominatimAPIAsync
190             object remains usable after closing. If a new API functions is
191             called, new connections are created.
192         """
193         self._loop.run_until_complete(self._async_api.close())
194         self._loop.close()
195
196
197     @property
198     def config(self) -> Configuration:
199         """ Return the configuration used by the API.
200         """
201         return self._async_api.config
202
203     def status(self) -> StatusResult:
204         """ Return the status of the database.
205         """
206         return self._loop.run_until_complete(self._async_api.status())
207
208
209     def details(self, place: PlaceRef,
210                 details: Optional[LookupDetails] = None) -> Optional[DetailedResult]:
211         """ Get detailed information about a place in the database.
212         """
213         return self._loop.run_until_complete(self._async_api.details(place, details))
214
215
216     def lookup(self, places: Sequence[PlaceRef],
217                details: Optional[LookupDetails] = None) -> SearchResults:
218         """ Get simple information about a list of places.
219
220             Returns a list of place information for all IDs that were found.
221         """
222         return self._loop.run_until_complete(self._async_api.lookup(places, details))
223
224
225     def reverse(self, coord: AnyPoint, max_rank: Optional[int] = None,
226                 layer: Optional[DataLayer] = None,
227                 details: Optional[LookupDetails] = None) -> Optional[ReverseResult]:
228         """ Find a place by its coordinates. Also known as reverse geocoding.
229
230             Returns the closest result that can be found or None if
231             no place matches the given criteria.
232         """
233         return self._loop.run_until_complete(
234                    self._async_api.reverse(coord, max_rank, layer, details))