1 # SPDX-License-Identifier: GPL-3.0-or-later
 
   3 # This file is part of Nominatim. (https://nominatim.org)
 
   5 # Copyright (C) 2023 by the Nominatim developer community.
 
   6 # For a full list of authors see the git log.
 
   8 Extended SQLAlchemy connection class that also includes access to the schema.
 
  10 from typing import cast, Any, Mapping, Sequence, Union, Dict, Optional, Set, \
 
  11                    Awaitable, Callable, TypeVar
 
  14 import sqlalchemy as sa
 
  15 from sqlalchemy.ext.asyncio import AsyncConnection
 
  17 from nominatim.typing import SaFromClause
 
  18 from nominatim.db.sqlalchemy_schema import SearchTables
 
  19 from nominatim.db.sqlalchemy_types import Geometry
 
  20 from nominatim.api.logging import log
 
  24 class SearchConnection:
 
  25     """ An extended SQLAlchemy connection class, that also contains
 
  26         then table definitions. The underlying asynchronous SQLAlchemy
 
  27         connection can be accessed with the 'connection' property.
 
  28         The 't' property is the collection of Nominatim tables.
 
  31     def __init__(self, conn: AsyncConnection,
 
  33                  properties: Dict[str, Any]) -> None:
 
  34         self.connection = conn
 
  35         self.t = tables # pylint: disable=invalid-name
 
  36         self._property_cache = properties
 
  37         self._classtables: Optional[Set[str]] = None
 
  38         self.query_timeout: Optional[int] = None
 
  41     def set_query_timeout(self, timeout: Optional[int]) -> None:
 
  42         """ Set the timeout after which a query over this connection
 
  45         self.query_timeout = timeout
 
  48     async def scalar(self, sql: sa.sql.base.Executable,
 
  49                      params: Union[Mapping[str, Any], None] = None
 
  51         """ Execute a 'scalar()' query on the connection.
 
  53         log().sql(self.connection, sql, params)
 
  54         return await asyncio.wait_for(self.connection.scalar(sql, params), self.query_timeout)
 
  57     async def execute(self, sql: 'sa.Executable',
 
  58                       params: Union[Mapping[str, Any], Sequence[Mapping[str, Any]], None] = None
 
  59                      ) -> 'sa.Result[Any]':
 
  60         """ Execute a 'execute()' query on the connection.
 
  62         log().sql(self.connection, sql, params)
 
  63         return await asyncio.wait_for(self.connection.execute(sql, params), self.query_timeout)
 
  66     async def get_property(self, name: str, cached: bool = True) -> str:
 
  67         """ Get a property from Nominatim's property table.
 
  69             Property values are normally cached so that they are only
 
  70             retrieved from the database when they are queried for the
 
  71             first time with this function. Set 'cached' to False to force
 
  72             reading the property from the database.
 
  74             Raises a ValueError if the property does not exist.
 
  76         lookup_name = f'DBPROP:{name}'
 
  78         if cached and lookup_name in self._property_cache:
 
  79             return cast(str, self._property_cache[lookup_name])
 
  81         sql = sa.select(self.t.properties.c.value)\
 
  82             .where(self.t.properties.c.property == name)
 
  83         value = await self.connection.scalar(sql)
 
  86             raise ValueError(f"Property '{name}' not found in database.")
 
  88         self._property_cache[lookup_name] = cast(str, value)
 
  90         return cast(str, value)
 
  93     async def get_db_property(self, name: str) -> Any:
 
  94         """ Get a setting from the database. At the moment, only
 
  95             'server_version', the version of the database software, can
 
  96             be retrieved with this function.
 
  98             Raises a ValueError if the property does not exist.
 
 100         if name != 'server_version':
 
 101             raise ValueError(f"DB setting '{name}' not found in database.")
 
 103         return self._property_cache['DB:server_version']
 
 106     async def get_cached_value(self, group: str, name: str,
 
 107                                factory: Callable[[], Awaitable[T]]) -> T:
 
 108         """ Access the cache for this Nominatim instance.
 
 109             Each cache value needs to belong to a group and have a name.
 
 110             This function is for internal API use only.
 
 112             `factory` is an async callback function that produces
 
 113             the value if it is not already cached.
 
 115             Returns the cached value or the result of factory (also caching
 
 118         full_name = f'{group}:{name}'
 
 120         if full_name in self._property_cache:
 
 121             return cast(T, self._property_cache[full_name])
 
 123         value = await factory()
 
 124         self._property_cache[full_name] = value
 
 129     async def get_class_table(self, cls: str, typ: str) -> Optional[SaFromClause]:
 
 130         """ Lookup up if there is a classtype table for the given category
 
 131             and return a SQLAlchemy table for it, if it exists.
 
 133         if self._classtables is None:
 
 134             res = await self.execute(sa.text("""SELECT tablename FROM pg_tables
 
 135                                                 WHERE tablename LIKE 'place_classtype_%'
 
 137             self._classtables = {r[0] for r in res}
 
 139         tablename = f"place_classtype_{cls}_{typ}"
 
 141         if tablename not in self._classtables:
 
 144         if tablename in self.t.meta.tables:
 
 145             return self.t.meta.tables[tablename]
 
 147         return sa.Table(tablename, self.t.meta,
 
 148                         sa.Column('place_id', sa.BigInteger),
 
 149                         sa.Column('centroid', Geometry))