]> git.openstreetmap.org Git - nominatim.git/commitdiff
improve deadlock detection for various versions of psycopg2
authorSarah Hoffmann <lonvia@denofr.de>
Thu, 25 Feb 2021 16:36:31 +0000 (17:36 +0100)
committerSarah Hoffmann <lonvia@denofr.de>
Thu, 25 Feb 2021 17:11:16 +0000 (18:11 +0100)
Psycopg2 has changed the kind of exception that is emitted on
deadlocks between versions 2.7 and 2.8. The code was already
trying to catch both kind of errors but because the
psycopg2.errors package is unknown in 2.7 and below, the
code would throw an exception on anything but a deadlock error.

This commit wraps the deadlock handling into a context manager
to avoid code duplication and uses module imports to detect if
the new error codes are available.

Also sets the required psycopg2 version to 2.7 or bigger as
versions below are difficult to test.

docs/admin/Installation.md
nominatim/db/async_connection.py
test/python/test_db_async_connection.py [new file with mode: 0644]

index 0013e993d2f7188a106dcfedf67581b6038961ed..05e57f9375c05e257a430329f36e554200824a30 100644 (file)
@@ -39,7 +39,7 @@ For running Nominatim:
   * [PostgreSQL](https://www.postgresql.org) (9.3+)
   * [PostGIS](https://postgis.net) (2.2+)
   * [Python 3](https://www.python.org/) (3.5+)
-  * [Psycopg2](https://www.psycopg.org)
+  * [Psycopg2](https://www.psycopg.org) (2.7+)
   * [Python Dotenv](https://github.com/theskumar/python-dotenv)
   * [PHP](https://php.net) (7.0 or later)
   * PHP-pgsql
index 45e83664663ba835419db49304209b9da6491d35..c5d6872bf0790abfaa10f7b07d2567ba0913663d 100644 (file)
@@ -9,8 +9,41 @@ import logging
 import psycopg2
 from psycopg2.extras import wait_select
 
+# psycopg2 emits different exceptions pre and post 2.8. Detect if the new error
+# module is available and adapt the error handling accordingly.
+try:
+    import psycopg2.errors # pylint: disable=no-name-in-module,import-error
+    __has_psycopg2_errors__ = True
+except ModuleNotFoundError:
+    __has_psycopg2_errors__ = False
+
 LOG = logging.getLogger()
 
+class DeadlockHandler:
+    """ Context manager that catches deadlock exceptions and calls
+        the given handler function. All other exceptions are passed on
+        normally.
+    """
+
+    def __init__(self, handler):
+        self.handler = handler
+
+    def __enter__(self):
+        pass
+
+    def __exit__(self, exc_type, exc_value, traceback):
+        if __has_psycopg2_errors__:
+            if exc_type == psycopg2.errors.DeadlockDetected: # pylint: disable=E1101
+                self.handler()
+                return True
+        else:
+            if exc_type == psycopg2.extensions.TransactionRollbackError:
+                if exc_value.pgcode == '40P01':
+                    self.handler()
+                    return True
+        return False
+
+
 class DBConnection:
     """ A single non-blocking database connection.
     """
@@ -24,15 +57,22 @@ class DBConnection:
         self.cursor = None
         self.connect()
 
+    def close(self):
+        """ Close all open connections. Does not wait for pending requests.
+        """
+        if self.conn is not None:
+            self.cursor.close()
+            self.conn.close()
+
+        self.conn = None
+
     def connect(self):
         """ (Re)connect to the database. Creates an asynchronous connection
             with JIT and parallel processing disabled. If a connection was
             already open, it is closed and a new connection established.
             The caller must ensure that no query is pending before reconnecting.
         """
-        if self.conn is not None:
-            self.cursor.close()
-            self.conn.close()
+        self.close()
 
         # Use a dict to hand in the parameters because async is a reserved
         # word in Python3.
@@ -50,23 +90,18 @@ class DBConnection:
                    WHERE name = 'max_parallel_workers_per_gather';""")
         self.wait()
 
+    def _deadlock_handler(self):
+        LOG.info("Deadlock detected (params = %s), retry.", str(self.current_params))
+        self.cursor.execute(self.current_query, self.current_params)
+
     def wait(self):
         """ Block until any pending operation is done.
         """
         while True:
-            try:
+            with DeadlockHandler(self._deadlock_handler):
                 wait_select(self.conn)
                 self.current_query = None
                 return
-            except psycopg2.extensions.TransactionRollbackError as error:
-                if error.pgcode == '40P01':
-                    LOG.info("Deadlock detected (params = %s), retry.",
-                             str(self.current_params))
-                    self.cursor.execute(self.current_query, self.current_params)
-                else:
-                    raise
-            except psycopg2.errors.DeadlockDetected: # pylint: disable=E1101
-                self.cursor.execute(self.current_query, self.current_params)
 
     def perform(self, sql, args=None):
         """ Send SQL query to the server. Returns immediately without
@@ -90,17 +125,9 @@ class DBConnection:
         if self.current_query is None:
             return True
 
-        try:
+        with DeadlockHandler(self._deadlock_handler):
             if self.conn.poll() == psycopg2.extensions.POLL_OK:
                 self.current_query = None
                 return True
-        except psycopg2.extensions.TransactionRollbackError as error:
-            if error.pgcode == '40P01':
-                LOG.info("Deadlock detected (params = %s), retry.", str(self.current_params))
-                self.cursor.execute(self.current_query, self.current_params)
-            else:
-                raise
-        except psycopg2.errors.DeadlockDetected: # pylint: disable=E1101
-            self.cursor.execute(self.current_query, self.current_params)
 
         return False
diff --git a/test/python/test_db_async_connection.py b/test/python/test_db_async_connection.py
new file mode 100644 (file)
index 0000000..b52f705
--- /dev/null
@@ -0,0 +1,102 @@
+"""
+Tests for function providing a non-blocking query interface towards PostgreSQL.
+"""
+from contextlib import closing
+import concurrent.futures
+
+import pytest
+import psycopg2
+from psycopg2.extras import wait_select
+
+from nominatim.db.async_connection import DBConnection, DeadlockHandler
+
+
+@pytest.fixture
+def conn(temp_db):
+    with closing(DBConnection('dbname=' + temp_db)) as c:
+        yield c
+
+
+@pytest.fixture
+def simple_conns(temp_db):
+    conn1 = psycopg2.connect('dbname=' + temp_db)
+    conn2 = psycopg2.connect('dbname=' + temp_db)
+
+    yield conn1.cursor(), conn2.cursor()
+
+    conn1.close()
+    conn2.close()
+
+
+def test_simple_query(conn, temp_db_conn):
+    conn.connect()
+
+    conn.perform('CREATE TABLE foo (id INT)')
+    conn.wait()
+
+    temp_db_conn.table_exists('foo')
+
+
+def test_wait_for_query(conn):
+    conn.connect()
+
+    conn.perform('SELECT pg_sleep(1)')
+
+    assert not conn.is_done()
+
+    conn.wait()
+
+
+def test_bad_query(conn):
+    conn.connect()
+
+    conn.perform('SELECT efasfjsea')
+
+    with pytest.raises(psycopg2.ProgrammingError):
+        conn.wait()
+
+
+def exec_with_deadlock(cur, sql, detector):
+    with DeadlockHandler(lambda *args: detector.append(1)):
+        cur.execute(sql)
+
+
+def test_deadlock(simple_conns):
+    print(psycopg2.__version__)
+    cur1, cur2 = simple_conns
+
+    cur1.execute("""CREATE TABLE t1 (id INT PRIMARY KEY, t TEXT);
+                    INSERT into t1 VALUES (1, 'a'), (2, 'b')""")
+    cur1.connection.commit()
+
+    cur1.execute("UPDATE t1 SET t = 'x' WHERE id = 1")
+    cur2.execute("UPDATE t1 SET t = 'x' WHERE id = 2")
+
+    # This is the tricky part of the test. The first SQL command runs into
+    # a lock and blocks, so we have to run it in a separate thread. When the
+    # second deadlocking SQL statement is issued, Postgresql will abort one of
+    # the two transactions that cause the deadlock. There is no way to tell
+    # which one of the two. Therefore wrap both in a DeadlockHandler and
+    # expect that exactly one of the two triggers.
+    with concurrent.futures.ThreadPoolExecutor(max_workers=1) as executor:
+        deadlock_check = []
+        try:
+            future = executor.submit(exec_with_deadlock, cur2,
+                                     "UPDATE t1 SET t = 'y' WHERE id = 1",
+                                     deadlock_check)
+
+            while not future.running():
+                pass
+
+
+            exec_with_deadlock(cur1, "UPDATE t1 SET t = 'y' WHERE id = 2",
+                               deadlock_check)
+        finally:
+            # Whatever happens, make sure the deadlock gets resolved.
+            cur1.connection.rollback()
+
+        future.result()
+
+        assert len(deadlock_check) == 1
+
+