1 # SPDX-License-Identifier: GPL-2.0-only
 
   3 # This file is part of Nominatim. (https://nominatim.org)
 
   5 # Copyright (C) 2023 by the Nominatim developer community.
 
   6 # For a full list of authors see the git log.
 
   8 Implementation of classes for API access via libraries.
 
  10 from typing import Mapping, Optional, Any, AsyncIterator, Dict
 
  13 from pathlib import Path
 
  15 import sqlalchemy as sa
 
  16 import sqlalchemy.ext.asyncio as sa_asyncio
 
  19 from nominatim.db.sqlalchemy_schema import SearchTables
 
  20 from nominatim.config import Configuration
 
  21 from nominatim.api.connection import SearchConnection
 
  22 from nominatim.api.status import get_status, StatusResult
 
  23 from nominatim.api.lookup import get_place_by_id
 
  24 from nominatim.api.reverse import ReverseGeocoder
 
  25 from nominatim.api.types import PlaceRef, LookupDetails, AnyPoint, DataLayer
 
  26 from nominatim.api.results import DetailedResult, ReverseResult
 
  29 class NominatimAPIAsync:
 
  30     """ API loader asynchornous version.
 
  32     def __init__(self, project_dir: Path,
 
  33                  environ: Optional[Mapping[str, str]] = None) -> None:
 
  34         self.config = Configuration(project_dir, environ)
 
  35         self.server_version = 0
 
  37         self._engine_lock = asyncio.Lock()
 
  38         self._engine: Optional[sa_asyncio.AsyncEngine] = None
 
  39         self._tables: Optional[SearchTables] = None
 
  40         self._property_cache: Dict[str, Any] = {'DB:server_version': 0}
 
  43     async def setup_database(self) -> None:
 
  44         """ Set up the engine and connection parameters.
 
  46             This function will be implicitly called when the database is
 
  47             accessed for the first time. You may also call it explicitly to
 
  48             avoid that the first call is delayed by the setup.
 
  50         async with self._engine_lock:
 
  54             dsn = self.config.get_database_params()
 
  56             dburl = sa.engine.URL.create(
 
  58                        database=dsn.get('dbname'),
 
  59                        username=dsn.get('user'), password=dsn.get('password'),
 
  60                        host=dsn.get('host'), port=int(dsn['port']) if 'port' in dsn else None,
 
  61                        query={k: v for k, v in dsn.items()
 
  62                               if k not in ('user', 'password', 'dbname', 'host', 'port')})
 
  63             engine = sa_asyncio.create_async_engine(
 
  65                              connect_args={'server_settings': {
 
  66                                 'DateStyle': 'sql,european',
 
  67                                 'max_parallel_workers_per_gather': '0'
 
  71                 async with engine.begin() as conn:
 
  72                     result = await conn.scalar(sa.text('SHOW server_version_num'))
 
  73                     server_version = int(result)
 
  74             except asyncpg.PostgresError:
 
  77             if server_version >= 110000:
 
  78                 @sa.event.listens_for(engine.sync_engine, "connect")
 
  79                 def _on_connect(dbapi_con: Any, _: Any) -> None:
 
  80                     cursor = dbapi_con.cursor()
 
  81                     cursor.execute("SET jit_above_cost TO '-1'")
 
  82                 # Make sure that all connections get the new settings
 
  85             self._property_cache['DB:server_version'] = server_version
 
  87             self._tables = SearchTables(sa.MetaData(), engine.name) # pylint: disable=no-member
 
  91     async def close(self) -> None:
 
  92         """ Close all active connections to the database. The NominatimAPIAsync
 
  93             object remains usable after closing. If a new API functions is
 
  94             called, new connections are created.
 
  96         if self._engine is not None:
 
  97             await self._engine.dispose()
 
 100     @contextlib.asynccontextmanager
 
 101     async def begin(self) -> AsyncIterator[SearchConnection]:
 
 102         """ Create a new connection with automatic transaction handling.
 
 104             This function may be used to get low-level access to the database.
 
 105             Refer to the documentation of SQLAlchemy for details how to use
 
 106             the connection object.
 
 108         if self._engine is None:
 
 109             await self.setup_database()
 
 111         assert self._engine is not None
 
 112         assert self._tables is not None
 
 114         async with self._engine.begin() as conn:
 
 115             yield SearchConnection(conn, self._tables, self._property_cache)
 
 118     async def status(self) -> StatusResult:
 
 119         """ Return the status of the database.
 
 122             async with self.begin() as conn:
 
 123                 status = await get_status(conn)
 
 124         except asyncpg.PostgresError:
 
 125             return StatusResult(700, 'Database connection failed')
 
 130     async def lookup(self, place: PlaceRef,
 
 131                      details: Optional[LookupDetails] = None) -> Optional[DetailedResult]:
 
 132         """ Get detailed information about a place in the database.
 
 134             Returns None if there is no entry under the given ID.
 
 136         async with self.begin() as conn:
 
 137             return await get_place_by_id(conn, place, details or LookupDetails())
 
 140     async def reverse(self, coord: AnyPoint, max_rank: Optional[int] = None,
 
 141                       layer: Optional[DataLayer] = None,
 
 142                       details: Optional[LookupDetails] = None) -> Optional[ReverseResult]:
 
 143         """ Find a place by its coordinates. Also known as reverse geocoding.
 
 145             Returns the closest result that can be found or None if
 
 146             no place matches the given criteria.
 
 148         # The following negation handles NaN correctly. Don't change.
 
 149         if not abs(coord[0]) <= 180 or not abs(coord[1]) <= 90:
 
 150             # There are no results to be expected outside valid coordinates.
 
 154             layer = DataLayer.ADDRESS | DataLayer.POI
 
 156         max_rank = max(0, min(max_rank or 30, 30))
 
 158         async with self.begin() as conn:
 
 159             geocoder = ReverseGeocoder(conn, max_rank, layer,
 
 160                                        details or LookupDetails())
 
 161             return await geocoder.lookup(coord)
 
 165     """ API loader, synchronous version.
 
 168     def __init__(self, project_dir: Path,
 
 169                  environ: Optional[Mapping[str, str]] = None) -> None:
 
 170         self._loop = asyncio.new_event_loop()
 
 171         self._async_api = NominatimAPIAsync(project_dir, environ)
 
 174     def close(self) -> None:
 
 175         """ Close all active connections to the database. The NominatimAPIAsync
 
 176             object remains usable after closing. If a new API functions is
 
 177             called, new connections are created.
 
 179         self._loop.run_until_complete(self._async_api.close())
 
 184     def config(self) -> Configuration:
 
 185         """ Return the configuration used by the API.
 
 187         return self._async_api.config
 
 189     def status(self) -> StatusResult:
 
 190         """ Return the status of the database.
 
 192         return self._loop.run_until_complete(self._async_api.status())
 
 195     def lookup(self, place: PlaceRef,
 
 196                details: Optional[LookupDetails] = None) -> Optional[DetailedResult]:
 
 197         """ Get detailed information about a place in the database.
 
 199         return self._loop.run_until_complete(self._async_api.lookup(place, details))
 
 202     def reverse(self, coord: AnyPoint, max_rank: Optional[int] = None,
 
 203                 layer: Optional[DataLayer] = None,
 
 204                 details: Optional[LookupDetails] = None) -> Optional[ReverseResult]:
 
 205         """ Find a place by its coordinates. Also known as reverse geocoding.
 
 207             Returns the closest result that can be found or None if
 
 208             no place matches the given criteria.
 
 210         return self._loop.run_until_complete(
 
 211                    self._async_api.reverse(coord, max_rank, layer, details))