1 # SPDX-License-Identifier: GPL-2.0-only
3 # This file is part of Nominatim. (https://nominatim.org)
5 # Copyright (C) 2023 by the Nominatim developer community.
6 # For a full list of authors see the git log.
8 Implementation of classes for API access via libraries.
10 from typing import Mapping, Optional, Any, AsyncIterator, Dict, Sequence
13 from pathlib import Path
15 import sqlalchemy as sa
16 import sqlalchemy.ext.asyncio as sa_asyncio
19 from nominatim.db.sqlalchemy_schema import SearchTables
20 from nominatim.db.async_core_library import PGCORE_LIB, PGCORE_ERROR
21 from nominatim.config import Configuration
22 from nominatim.api.connection import SearchConnection
23 from nominatim.api.status import get_status, StatusResult
24 from nominatim.api.lookup import get_detailed_place, get_simple_place
25 from nominatim.api.reverse import ReverseGeocoder
26 from nominatim.api.types import PlaceRef, LookupDetails, AnyPoint, DataLayer
27 from nominatim.api.results import DetailedResult, ReverseResult, SearchResults
30 class NominatimAPIAsync:
31 """ API loader asynchornous version.
33 def __init__(self, project_dir: Path,
34 environ: Optional[Mapping[str, str]] = None) -> None:
35 self.config = Configuration(project_dir, environ)
36 self.server_version = 0
38 self._engine_lock = asyncio.Lock()
39 self._engine: Optional[sa_asyncio.AsyncEngine] = None
40 self._tables: Optional[SearchTables] = None
41 self._property_cache: Dict[str, Any] = {'DB:server_version': 0}
44 async def setup_database(self) -> None:
45 """ Set up the engine and connection parameters.
47 This function will be implicitly called when the database is
48 accessed for the first time. You may also call it explicitly to
49 avoid that the first call is delayed by the setup.
51 async with self._engine_lock:
55 dsn = self.config.get_database_params()
57 query = {k: v for k, v in dsn.items()
58 if k not in ('user', 'password', 'dbname', 'host', 'port')}
59 if PGCORE_LIB == 'asyncpg':
60 query['prepared_statement_cache_size'] = '0'
62 dburl = sa.engine.URL.create(
63 f'postgresql+{PGCORE_LIB}',
64 database=dsn.get('dbname'),
65 username=dsn.get('user'), password=dsn.get('password'),
66 host=dsn.get('host'), port=int(dsn['port']) if 'port' in dsn else None,
68 engine = sa_asyncio.create_async_engine(dburl, future=True)
71 async with engine.begin() as conn:
72 result = await conn.scalar(sa.text('SHOW server_version_num'))
73 server_version = int(result)
74 except (PGCORE_ERROR, sa.exc.OperationalError):
77 if server_version >= 110000:
78 @sa.event.listens_for(engine.sync_engine, "connect")
79 def _on_connect(dbapi_con: Any, _: Any) -> None:
80 cursor = dbapi_con.cursor()
81 cursor.execute("SET jit_above_cost TO '-1'")
82 cursor.execute("SET max_parallel_workers_per_gather TO '0'")
83 # Make sure that all connections get the new settings
86 self._property_cache['DB:server_version'] = server_version
88 self._tables = SearchTables(sa.MetaData(), engine.name) # pylint: disable=no-member
92 async def close(self) -> None:
93 """ Close all active connections to the database. The NominatimAPIAsync
94 object remains usable after closing. If a new API functions is
95 called, new connections are created.
97 if self._engine is not None:
98 await self._engine.dispose()
101 @contextlib.asynccontextmanager
102 async def begin(self) -> AsyncIterator[SearchConnection]:
103 """ Create a new connection with automatic transaction handling.
105 This function may be used to get low-level access to the database.
106 Refer to the documentation of SQLAlchemy for details how to use
107 the connection object.
109 if self._engine is None:
110 await self.setup_database()
112 assert self._engine is not None
113 assert self._tables is not None
115 async with self._engine.begin() as conn:
116 yield SearchConnection(conn, self._tables, self._property_cache)
119 async def status(self) -> StatusResult:
120 """ Return the status of the database.
123 async with self.begin() as conn:
124 status = await get_status(conn)
125 except (PGCORE_ERROR, sa.exc.OperationalError):
126 return StatusResult(700, 'Database connection failed')
131 async def details(self, place: PlaceRef,
132 details: Optional[LookupDetails] = None) -> Optional[DetailedResult]:
133 """ Get detailed information about a place in the database.
135 Returns None if there is no entry under the given ID.
137 async with self.begin() as conn:
138 return await get_detailed_place(conn, place, details or LookupDetails())
141 async def lookup(self, places: Sequence[PlaceRef],
142 details: Optional[LookupDetails] = None) -> SearchResults:
143 """ Get simple information about a list of places.
145 Returns a list of place information for all IDs that were found.
148 details = LookupDetails()
149 async with self.begin() as conn:
150 return SearchResults(filter(None,
151 [await get_simple_place(conn, p, details) for p in places]))
154 async def reverse(self, coord: AnyPoint, max_rank: Optional[int] = None,
155 layer: Optional[DataLayer] = None,
156 details: Optional[LookupDetails] = None) -> Optional[ReverseResult]:
157 """ Find a place by its coordinates. Also known as reverse geocoding.
159 Returns the closest result that can be found or None if
160 no place matches the given criteria.
162 # The following negation handles NaN correctly. Don't change.
163 if not abs(coord[0]) <= 180 or not abs(coord[1]) <= 90:
164 # There are no results to be expected outside valid coordinates.
168 layer = DataLayer.ADDRESS | DataLayer.POI
170 max_rank = max(0, min(max_rank or 30, 30))
172 async with self.begin() as conn:
173 geocoder = ReverseGeocoder(conn, max_rank, layer,
174 details or LookupDetails())
175 return await geocoder.lookup(coord)
179 """ API loader, synchronous version.
182 def __init__(self, project_dir: Path,
183 environ: Optional[Mapping[str, str]] = None) -> None:
184 self._loop = asyncio.new_event_loop()
185 self._async_api = NominatimAPIAsync(project_dir, environ)
188 def close(self) -> None:
189 """ Close all active connections to the database. The NominatimAPIAsync
190 object remains usable after closing. If a new API functions is
191 called, new connections are created.
193 self._loop.run_until_complete(self._async_api.close())
198 def config(self) -> Configuration:
199 """ Return the configuration used by the API.
201 return self._async_api.config
203 def status(self) -> StatusResult:
204 """ Return the status of the database.
206 return self._loop.run_until_complete(self._async_api.status())
209 def details(self, place: PlaceRef,
210 details: Optional[LookupDetails] = None) -> Optional[DetailedResult]:
211 """ Get detailed information about a place in the database.
213 return self._loop.run_until_complete(self._async_api.details(place, details))
216 def lookup(self, places: Sequence[PlaceRef],
217 details: Optional[LookupDetails] = None) -> SearchResults:
218 """ Get simple information about a list of places.
220 Returns a list of place information for all IDs that were found.
222 return self._loop.run_until_complete(self._async_api.lookup(places, details))
225 def reverse(self, coord: AnyPoint, max_rank: Optional[int] = None,
226 layer: Optional[DataLayer] = None,
227 details: Optional[LookupDetails] = None) -> Optional[ReverseResult]:
228 """ Find a place by its coordinates. Also known as reverse geocoding.
230 Returns the closest result that can be found or None if
231 no place matches the given criteria.
233 return self._loop.run_until_complete(
234 self._async_api.reverse(coord, max_rank, layer, details))