]> git.openstreetmap.org Git - nominatim.git/blob - nominatim/db/async_connection.py
move WorkerPool into db module
[nominatim.git] / nominatim / db / async_connection.py
1 # SPDX-License-Identifier: GPL-2.0-only
2 #
3 # This file is part of Nominatim.
4 # Copyright (C) 2021 by the Nominatim developer community.
5 # For a full list of authors see the git log.
6 """ Database helper functions for the indexer.
7 """
8 import logging
9 import select
10 import time
11
12 import psycopg2
13 from psycopg2.extras import wait_select
14
15 # psycopg2 emits different exceptions pre and post 2.8. Detect if the new error
16 # module is available and adapt the error handling accordingly.
17 try:
18     import psycopg2.errors # pylint: disable=no-name-in-module,import-error
19     __has_psycopg2_errors__ = True
20 except ImportError:
21     __has_psycopg2_errors__ = False
22
23 LOG = logging.getLogger()
24
25 class DeadlockHandler:
26     """ Context manager that catches deadlock exceptions and calls
27         the given handler function. All other exceptions are passed on
28         normally.
29     """
30
31     def __init__(self, handler):
32         self.handler = handler
33
34     def __enter__(self):
35         pass
36
37     def __exit__(self, exc_type, exc_value, traceback):
38         if __has_psycopg2_errors__:
39             if exc_type == psycopg2.errors.DeadlockDetected: # pylint: disable=E1101
40                 self.handler()
41                 return True
42         else:
43             if exc_type == psycopg2.extensions.TransactionRollbackError:
44                 if exc_value.pgcode == '40P01':
45                     self.handler()
46                     return True
47         return False
48
49
50 class DBConnection:
51     """ A single non-blocking database connection.
52     """
53
54     def __init__(self, dsn, cursor_factory=None):
55         self.current_query = None
56         self.current_params = None
57         self.dsn = dsn
58
59         self.conn = None
60         self.cursor = None
61         self.connect(cursor_factory=cursor_factory)
62
63     def close(self):
64         """ Close all open connections. Does not wait for pending requests.
65         """
66         if self.conn is not None:
67             self.cursor.close()
68             self.conn.close()
69
70         self.conn = None
71
72     def connect(self, cursor_factory=None):
73         """ (Re)connect to the database. Creates an asynchronous connection
74             with JIT and parallel processing disabled. If a connection was
75             already open, it is closed and a new connection established.
76             The caller must ensure that no query is pending before reconnecting.
77         """
78         self.close()
79
80         # Use a dict to hand in the parameters because async is a reserved
81         # word in Python3.
82         self.conn = psycopg2.connect(**{'dsn' : self.dsn, 'async' : True})
83         self.wait()
84
85         self.cursor = self.conn.cursor(cursor_factory=cursor_factory)
86         # Disable JIT and parallel workers as they are known to cause problems.
87         # Update pg_settings instead of using SET because it does not yield
88         # errors on older versions of Postgres where the settings are not
89         # implemented.
90         self.perform(
91             """ UPDATE pg_settings SET setting = -1 WHERE name = 'jit_above_cost';
92                 UPDATE pg_settings SET setting = 0
93                    WHERE name = 'max_parallel_workers_per_gather';""")
94         self.wait()
95
96     def _deadlock_handler(self):
97         LOG.info("Deadlock detected (params = %s), retry.", str(self.current_params))
98         self.cursor.execute(self.current_query, self.current_params)
99
100     def wait(self):
101         """ Block until any pending operation is done.
102         """
103         while True:
104             with DeadlockHandler(self._deadlock_handler):
105                 wait_select(self.conn)
106                 self.current_query = None
107                 return
108
109     def perform(self, sql, args=None):
110         """ Send SQL query to the server. Returns immediately without
111             blocking.
112         """
113         self.current_query = sql
114         self.current_params = args
115         self.cursor.execute(sql, args)
116
117     def fileno(self):
118         """ File descriptor to wait for. (Makes this class select()able.)
119         """
120         return self.conn.fileno()
121
122     def is_done(self):
123         """ Check if the connection is available for a new query.
124
125             Also checks if the previous query has run into a deadlock.
126             If so, then the previous query is repeated.
127         """
128         if self.current_query is None:
129             return True
130
131         with DeadlockHandler(self._deadlock_handler):
132             if self.conn.poll() == psycopg2.extensions.POLL_OK:
133                 self.current_query = None
134                 return True
135
136         return False
137
138
139 class WorkerPool:
140     """ A pool of asynchronous database connections.
141
142         The pool may be used as a context manager.
143     """
144     REOPEN_CONNECTIONS_AFTER = 100000
145
146     def __init__(self, dsn, pool_size):
147         self.threads = [DBConnection(dsn) for _ in range(pool_size)]
148         self.free_workers = self._yield_free_worker()
149         self.wait_time = 0
150
151
152     def finish_all(self):
153         """ Wait for all connection to finish.
154         """
155         for thread in self.threads:
156             while not thread.is_done():
157                 thread.wait()
158
159         self.free_workers = self._yield_free_worker()
160
161     def close(self):
162         """ Close all connections and clear the pool.
163         """
164         for thread in self.threads:
165             thread.close()
166         self.threads = []
167         self.free_workers = None
168
169
170     def next_free_worker(self):
171         """ Get the next free connection.
172         """
173         return next(self.free_workers)
174
175
176     def _yield_free_worker(self):
177         ready = self.threads
178         command_stat = 0
179         while True:
180             for thread in ready:
181                 if thread.is_done():
182                     command_stat += 1
183                     yield thread
184
185             if command_stat > self.REOPEN_CONNECTIONS_AFTER:
186                 for thread in self.threads:
187                     while not thread.is_done():
188                         thread.wait()
189                     thread.connect()
190                 ready = self.threads
191                 command_stat = 0
192             else:
193                 tstart = time.time()
194                 _, ready, _ = select.select([], self.threads, [])
195                 self.wait_time += time.time() - tstart
196
197
198     def __enter__(self):
199         return self
200
201
202     def __exit__(self, exc_type, exc_value, traceback):
203         self.finish_all()
204         self.close()