2 Tests for function providing a non-blocking query interface towards PostgreSQL.
 
   4 from contextlib import closing
 
   5 import concurrent.futures
 
  10 from nominatim.db.async_connection import DBConnection, DeadlockHandler
 
  15     with closing(DBConnection('dbname=' + temp_db)) as connection:
 
  20 def simple_conns(temp_db):
 
  21     conn1 = psycopg2.connect('dbname=' + temp_db)
 
  22     conn2 = psycopg2.connect('dbname=' + temp_db)
 
  24     yield conn1.cursor(), conn2.cursor()
 
  30 def test_simple_query(conn, temp_db_conn):
 
  33     conn.perform('CREATE TABLE foo (id INT)')
 
  36     temp_db_conn.table_exists('foo')
 
  39 def test_wait_for_query(conn):
 
  42     conn.perform('SELECT pg_sleep(1)')
 
  44     assert not conn.is_done()
 
  49 def test_bad_query(conn):
 
  52     conn.perform('SELECT efasfjsea')
 
  54     with pytest.raises(psycopg2.ProgrammingError):
 
  58 def test_bad_query_ignore(temp_db):
 
  59     with closing(DBConnection('dbname=' + temp_db, ignore_sql_errors=True)) as conn:
 
  62         conn.perform('SELECT efasfjsea')
 
  67 def exec_with_deadlock(cur, sql, detector):
 
  68     with DeadlockHandler(lambda *args: detector.append(1)):
 
  72 def test_deadlock(simple_conns):
 
  73     cur1, cur2 = simple_conns
 
  75     cur1.execute("""CREATE TABLE t1 (id INT PRIMARY KEY, t TEXT);
 
  76                     INSERT into t1 VALUES (1, 'a'), (2, 'b')""")
 
  77     cur1.connection.commit()
 
  79     cur1.execute("UPDATE t1 SET t = 'x' WHERE id = 1")
 
  80     cur2.execute("UPDATE t1 SET t = 'x' WHERE id = 2")
 
  82     # This is the tricky part of the test. The first SQL command runs into
 
  83     # a lock and blocks, so we have to run it in a separate thread. When the
 
  84     # second deadlocking SQL statement is issued, Postgresql will abort one of
 
  85     # the two transactions that cause the deadlock. There is no way to tell
 
  86     # which one of the two. Therefore wrap both in a DeadlockHandler and
 
  87     # expect that exactly one of the two triggers.
 
  88     with concurrent.futures.ThreadPoolExecutor(max_workers=1) as executor:
 
  91             future = executor.submit(exec_with_deadlock, cur2,
 
  92                                      "UPDATE t1 SET t = 'y' WHERE id = 1",
 
  95             while not future.running():
 
  99             exec_with_deadlock(cur1, "UPDATE t1 SET t = 'y' WHERE id = 2",
 
 102             # Whatever happens, make sure the deadlock gets resolved.
 
 103             cur1.connection.rollback()
 
 107         assert len(deadlock_check) == 1