1 # SPDX-License-Identifier: GPL-2.0-only
3 # This file is part of Nominatim. (https://nominatim.org)
5 # Copyright (C) 2022 by the Nominatim developer community.
6 # For a full list of authors see the git log.
7 """ Non-blocking database connections.
9 from typing import Callable, Any, Optional, Iterator, Sequence
15 from psycopg2.extras import wait_select
17 # psycopg2 emits different exceptions pre and post 2.8. Detect if the new error
18 # module is available and adapt the error handling accordingly.
20 import psycopg2.errors # pylint: disable=no-name-in-module,import-error
21 __has_psycopg2_errors__ = True
23 __has_psycopg2_errors__ = False
25 from nominatim.typing import T_cursor, Query
27 LOG = logging.getLogger()
29 class DeadlockHandler:
30 """ Context manager that catches deadlock exceptions and calls
31 the given handler function. All other exceptions are passed on
35 def __init__(self, handler: Callable[[], None], ignore_sql_errors: bool = False) -> None:
36 self.handler = handler
37 self.ignore_sql_errors = ignore_sql_errors
39 def __enter__(self) -> 'DeadlockHandler':
42 def __exit__(self, exc_type: Any, exc_value: Any, traceback: Any) -> bool:
43 if __has_psycopg2_errors__:
44 if exc_type == psycopg2.errors.DeadlockDetected: # pylint: disable=E1101
47 elif exc_type == psycopg2.extensions.TransactionRollbackError \
48 and exc_value.pgcode == '40P01':
52 if self.ignore_sql_errors and isinstance(exc_value, psycopg2.Error):
53 LOG.info("SQL error ignored: %s", exc_value)
60 """ A single non-blocking database connection.
63 def __init__(self, dsn: str,
64 cursor_factory: Optional[Callable[..., T_cursor]] = None,
65 ignore_sql_errors: bool = False) -> None:
68 self.current_query: Optional[Query] = None
69 self.current_params: Optional[Sequence[Any]] = None
70 self.ignore_sql_errors = ignore_sql_errors
72 self.conn: Optional['psycopg2.connection'] = None
73 self.cursor: Optional['psycopg2.cursor'] = None
74 self.connect(cursor_factory=cursor_factory)
76 def close(self) -> None:
77 """ Close all open connections. Does not wait for pending requests.
79 if self.conn is not None:
80 if self.cursor is not None:
81 self.cursor.close() # type: ignore[no-untyped-call]
87 def connect(self, cursor_factory: Optional[Callable[..., T_cursor]] = None) -> None:
88 """ (Re)connect to the database. Creates an asynchronous connection
89 with JIT and parallel processing disabled. If a connection was
90 already open, it is closed and a new connection established.
91 The caller must ensure that no query is pending before reconnecting.
95 # Use a dict to hand in the parameters because async is a reserved
97 self.conn = psycopg2.connect(**{'dsn': self.dsn, 'async': True})
100 if cursor_factory is not None:
101 self.cursor = self.conn.cursor(cursor_factory=cursor_factory)
103 self.cursor = self.conn.cursor()
104 # Disable JIT and parallel workers as they are known to cause problems.
105 # Update pg_settings instead of using SET because it does not yield
106 # errors on older versions of Postgres where the settings are not
109 """ UPDATE pg_settings SET setting = -1 WHERE name = 'jit_above_cost';
110 UPDATE pg_settings SET setting = 0
111 WHERE name = 'max_parallel_workers_per_gather';""")
114 def _deadlock_handler(self) -> None:
115 LOG.info("Deadlock detected (params = %s), retry.", str(self.current_params))
116 assert self.cursor is not None
117 assert self.current_query is not None
118 assert self.current_params is not None
120 self.cursor.execute(self.current_query, self.current_params)
122 def wait(self) -> None:
123 """ Block until any pending operation is done.
126 with DeadlockHandler(self._deadlock_handler, self.ignore_sql_errors):
127 wait_select(self.conn)
128 self.current_query = None
131 def perform(self, sql: Query, args: Optional[Sequence[Any]] = None) -> None:
132 """ Send SQL query to the server. Returns immediately without
135 assert self.cursor is not None
136 self.current_query = sql
137 self.current_params = args
138 self.cursor.execute(sql, args)
140 def fileno(self) -> int:
141 """ File descriptor to wait for. (Makes this class select()able.)
143 assert self.conn is not None
144 return self.conn.fileno()
146 def is_done(self) -> bool:
147 """ Check if the connection is available for a new query.
149 Also checks if the previous query has run into a deadlock.
150 If so, then the previous query is repeated.
152 assert self.conn is not None
154 if self.current_query is None:
157 with DeadlockHandler(self._deadlock_handler, self.ignore_sql_errors):
158 if self.conn.poll() == psycopg2.extensions.POLL_OK:
159 self.current_query = None
166 """ A pool of asynchronous database connections.
168 The pool may be used as a context manager.
170 REOPEN_CONNECTIONS_AFTER = 100000
172 def __init__(self, dsn: str, pool_size: int, ignore_sql_errors: bool = False) -> None:
173 self.threads = [DBConnection(dsn, ignore_sql_errors=ignore_sql_errors)
174 for _ in range(pool_size)]
175 self.free_workers = self._yield_free_worker()
179 def finish_all(self) -> None:
180 """ Wait for all connection to finish.
182 for thread in self.threads:
183 while not thread.is_done():
186 self.free_workers = self._yield_free_worker()
188 def close(self) -> None:
189 """ Close all connections and clear the pool.
191 for thread in self.threads:
194 self.free_workers = iter([])
197 def next_free_worker(self) -> DBConnection:
198 """ Get the next free connection.
200 return next(self.free_workers)
203 def _yield_free_worker(self) -> Iterator[DBConnection]:
212 if command_stat > self.REOPEN_CONNECTIONS_AFTER:
213 self._reconnect_threads()
218 _, ready, _ = select.select([], self.threads, [])
219 self.wait_time += time.time() - tstart
222 def _reconnect_threads(self) -> None:
223 for thread in self.threads:
224 while not thread.is_done():
229 def __enter__(self) -> 'WorkerPool':
233 def __exit__(self, exc_type: Any, exc_value: Any, traceback: Any) -> None: