1 # SPDX-License-Identifier: GPL-3.0-or-later
 
   3 # This file is part of Nominatim. (https://nominatim.org)
 
   5 # Copyright (C) 2024 by the Nominatim developer community.
 
   6 # For a full list of authors see the git log.
 
   8 Tests for specialised connection and cursor classes.
 
  13 import nominatim_db.db.connection as nc
 
  17     with nc.connect(dsn) as conn:
 
  21 def test_connection_table_exists(db, table_factory):
 
  22     assert not nc.table_exists(db, 'foobar')
 
  24     table_factory('foobar')
 
  26     assert nc.table_exists(db, 'foobar')
 
  29 def test_has_column_no_table(db):
 
  30     assert not nc.table_has_column(db, 'sometable', 'somecolumn')
 
  33 @pytest.mark.parametrize('name,result', [('tram', True), ('car', False)])
 
  34 def test_has_column(db, table_factory, name, result):
 
  35     table_factory('stuff', 'tram TEXT')
 
  37     assert nc.table_has_column(db, 'stuff', name) == result
 
  39 def test_connection_index_exists(db, table_factory, temp_db_cursor):
 
  40     assert not nc.index_exists(db, 'some_index')
 
  42     table_factory('foobar')
 
  43     temp_db_cursor.execute('CREATE INDEX some_index ON foobar(id)')
 
  45     assert nc.index_exists(db, 'some_index')
 
  46     assert nc.index_exists(db, 'some_index', table='foobar')
 
  47     assert not nc.index_exists(db, 'some_index', table='bar')
 
  50 def test_drop_table_existing(db, table_factory):
 
  51     table_factory('dummy')
 
  52     assert nc.table_exists(db, 'dummy')
 
  54     nc.drop_tables(db, 'dummy')
 
  55     assert not nc.table_exists(db, 'dummy')
 
  58 def test_drop_table_non_existing(db):
 
  59     nc.drop_tables(db, 'dfkjgjriogjigjgjrdghehtre')
 
  62 def test_drop_many_tables(db, table_factory):
 
  63     tables = [f'table{n}' for n in range(5)]
 
  67         assert nc.table_exists(db, t)
 
  69     nc.drop_tables(db, *tables)
 
  72         assert not nc.table_exists(db, t)
 
  75 def test_drop_table_non_existing_force(db):
 
  76     with pytest.raises(psycopg.ProgrammingError, match='.*does not exist.*'):
 
  77         nc.drop_tables(db, 'dfkjgjriogjigjgjrdghehtre', if_exists=False)
 
  79 def test_connection_server_version_tuple(db):
 
  80     ver = nc.server_version_tuple(db)
 
  82     assert isinstance(ver, tuple)
 
  87 def test_connection_postgis_version_tuple(db, temp_db_with_extensions):
 
  88     ver = nc.postgis_version_tuple(db)
 
  90     assert isinstance(ver, tuple)
 
  95 def test_cursor_scalar(db, table_factory):
 
  96     table_factory('dummy')
 
  98     assert nc.execute_scalar(db, 'SELECT count(*) FROM dummy') == 0
 
 101 def test_cursor_scalar_many_rows(db):
 
 102     with pytest.raises(RuntimeError, match='Query did not return a single row.'):
 
 103         nc.execute_scalar(db, 'SELECT * FROM pg_tables')
 
 106 def test_cursor_scalar_no_rows(db, table_factory):
 
 107     table_factory('dummy')
 
 109     with pytest.raises(RuntimeError, match='Query did not return a single row.'):
 
 110         nc.execute_scalar(db, 'SELECT id FROM dummy')
 
 113 def test_get_pg_env_add_variable(monkeypatch):
 
 114     monkeypatch.delenv('PGPASSWORD', raising=False)
 
 115     env = nc.get_pg_env('user=fooF')
 
 117     assert env['PGUSER'] == 'fooF'
 
 118     assert 'PGPASSWORD' not in env
 
 121 def test_get_pg_env_overwrite_variable(monkeypatch):
 
 122     monkeypatch.setenv('PGUSER', 'some default')
 
 123     env = nc.get_pg_env('user=overwriter')
 
 125     assert env['PGUSER'] == 'overwriter'
 
 128 def test_get_pg_env_ignore_unknown():
 
 129     env = nc.get_pg_env('client_encoding=stuff', base_env={})