1 # SPDX-License-Identifier: GPL-3.0-or-later
 
   3 # This file is part of Nominatim. (https://nominatim.org)
 
   5 # Copyright (C) 2023 by the Nominatim developer community.
 
   6 # For a full list of authors see the git log.
 
   8 Extended SQLAlchemy connection class that also includes access to the schema.
 
  10 from typing import Any, Mapping, Sequence, Union, Dict, cast
 
  12 import sqlalchemy as sa
 
  13 from sqlalchemy.ext.asyncio import AsyncConnection
 
  15 from nominatim.db.sqlalchemy_schema import SearchTables
 
  16 from nominatim.api.logging import log
 
  18 class SearchConnection:
 
  19     """ An extended SQLAlchemy connection class, that also contains
 
  20         then table definitions. The underlying asynchronous SQLAlchemy
 
  21         connection can be accessed with the 'connection' property.
 
  22         The 't' property is the collection of Nominatim tables.
 
  25     def __init__(self, conn: AsyncConnection,
 
  27                  properties: Dict[str, Any]) -> None:
 
  28         self.connection = conn
 
  29         self.t = tables # pylint: disable=invalid-name
 
  30         self._property_cache = properties
 
  33     async def scalar(self, sql: sa.sql.base.Executable,
 
  34                      params: Union[Mapping[str, Any], None] = None
 
  36         """ Execute a 'scalar()' query on the connection.
 
  38         log().sql(self.connection, sql)
 
  39         return await self.connection.scalar(sql, params)
 
  42     async def execute(self, sql: 'sa.Executable',
 
  43                       params: Union[Mapping[str, Any], Sequence[Mapping[str, Any]], None] = None
 
  44                      ) -> 'sa.Result[Any]':
 
  45         """ Execute a 'execute()' query on the connection.
 
  47         log().sql(self.connection, sql)
 
  48         return await self.connection.execute(sql, params)
 
  51     async def get_property(self, name: str, cached: bool = True) -> str:
 
  52         """ Get a property from Nominatim's property table.
 
  54             Property values are normally cached so that they are only
 
  55             retrieved from the database when they are queried for the
 
  56             first time with this function. Set 'cached' to False to force
 
  57             reading the property from the database.
 
  59             Raises a ValueError if the property does not exist.
 
  61         if name.startswith('DB:'):
 
  62             raise ValueError(f"Illegal property value '{name}'.")
 
  64         if cached and name in self._property_cache:
 
  65             return cast(str, self._property_cache[name])
 
  67         sql = sa.select(self.t.properties.c.value)\
 
  68             .where(self.t.properties.c.property == name)
 
  69         value = await self.connection.scalar(sql)
 
  72             raise ValueError(f"Property '{name}' not found in database.")
 
  74         self._property_cache[name] = cast(str, value)
 
  76         return cast(str, value)
 
  79     async def get_db_property(self, name: str) -> Any:
 
  80         """ Get a setting from the database. At the moment, only
 
  81             'server_version', the version of the database software, can
 
  82             be retrieved with this function.
 
  84             Raises a ValueError if the property does not exist.
 
  86         if name != 'server_version':
 
  87             raise ValueError(f"DB setting '{name}' not found in database.")
 
  89         return self._property_cache['DB:server_version']