1 # SPDX-License-Identifier: GPL-3.0-or-later
 
   3 # This file is part of Nominatim. (https://nominatim.org)
 
   5 # Copyright (C) 2024 by the Nominatim developer community.
 
   6 # For a full list of authors see the git log.
 
   8 A connection pool that executes incoming queries in parallel.
 
  10 from typing import Any, Tuple, Optional
 
  17 LOG = logging.getLogger()
 
  19 QueueItem = Optional[Tuple[psycopg.abc.Query, Any]]
 
  23     """ Pool to run SQL queries in parallel asynchronous execution.
 
  25         All queries are run in autocommit mode. If parallel execution leads
 
  26         to a deadlock, then the query is repeated.
 
  27         The results of the queries is discarded.
 
  29     def __init__(self, dsn: str, pool_size: int = 1, **conn_args: Any) -> None:
 
  31         self.query_queue: 'asyncio.Queue[QueueItem]' = asyncio.Queue(maxsize=2 * pool_size)
 
  33         self.pool = [asyncio.create_task(self._worker_loop(dsn, **conn_args))
 
  34                      for _ in range(pool_size)]
 
  36     async def put_query(self, query: psycopg.abc.Query, params: Any) -> None:
 
  37         """ Schedule a query for execution.
 
  40         await self.query_queue.put((query, params))
 
  41         self.wait_time += time.time() - tstart
 
  42         await asyncio.sleep(0)
 
  44     async def finish(self) -> None:
 
  45         """ Wait for all queries to finish and close the pool.
 
  48             await self.query_queue.put(None)
 
  51         await asyncio.wait(self.pool)
 
  52         self.wait_time += time.time() - tstart
 
  54         for task in self.pool:
 
  55             excp = task.exception()
 
  59     async def _worker_loop(self, dsn: str, **conn_args: Any) -> None:
 
  60         conn_args['autocommit'] = True
 
  61         aconn = await psycopg.AsyncConnection.connect(dsn, **conn_args)
 
  63             async with aconn.cursor() as cur:
 
  64                 item = await self.query_queue.get()
 
  65                 while item is not None:
 
  68                             await cur.execute(item[0])
 
  70                             await cur.execute(item[0], item[1])
 
  72                         item = await self.query_queue.get()
 
  73                     except psycopg.errors.DeadlockDetected:
 
  74                         assert item is not None
 
  75                         LOG.info("Deadlock detected (sql = %s, params = %s), retry.",
 
  76                                  str(item[0]), str(item[1]))
 
  77                         # item is still valid here, causing a retry
 
  79     async def __aenter__(self) -> 'QueryPool':
 
  82     async def __aexit__(self, exc_type: Any, exc_value: Any, traceback: Any) -> None: