1 # SPDX-License-Identifier: GPL-2.0-only
 
   3 # This file is part of Nominatim. (https://nominatim.org)
 
   5 # Copyright (C) 2022 by the Nominatim developer community.
 
   6 # For a full list of authors see the git log.
 
   8 Tests for DB utility functions in db.utils
 
  14 import nominatim.db.utils as db_utils
 
  15 from nominatim.errors import UsageError
 
  17 def test_execute_file_success(dsn, temp_db_cursor, tmp_path):
 
  18     tmpfile = tmp_path / 'test.sql'
 
  19     tmpfile.write_text('CREATE TABLE test (id INT);\nINSERT INTO test VALUES(56);')
 
  21     db_utils.execute_file(dsn, tmpfile)
 
  23     assert temp_db_cursor.row_set('SELECT * FROM test') == {(56, )}
 
  25 def test_execute_file_bad_file(dsn, tmp_path):
 
  26     with pytest.raises(FileNotFoundError):
 
  27         db_utils.execute_file(dsn, tmp_path / 'test2.sql')
 
  30 def test_execute_file_bad_sql(dsn, tmp_path):
 
  31     tmpfile = tmp_path / 'test.sql'
 
  32     tmpfile.write_text('CREATE STABLE test (id INT)')
 
  34     with pytest.raises(UsageError):
 
  35         db_utils.execute_file(dsn, tmpfile)
 
  38 def test_execute_file_bad_sql_ignore_errors(dsn, tmp_path):
 
  39     tmpfile = tmp_path / 'test.sql'
 
  40     tmpfile.write_text('CREATE STABLE test (id INT)')
 
  42     db_utils.execute_file(dsn, tmpfile, ignore_errors=True)
 
  45 def test_execute_file_with_pre_code(dsn, tmp_path, temp_db_cursor):
 
  46     tmpfile = tmp_path / 'test.sql'
 
  47     tmpfile.write_text('INSERT INTO test VALUES(4)')
 
  49     db_utils.execute_file(dsn, tmpfile, pre_code='CREATE TABLE test (id INT)')
 
  51     assert temp_db_cursor.row_set('SELECT * FROM test') == {(4, )}
 
  54 def test_execute_file_with_post_code(dsn, tmp_path, temp_db_cursor):
 
  55     tmpfile = tmp_path / 'test.sql'
 
  56     tmpfile.write_text('CREATE TABLE test (id INT)')
 
  58     db_utils.execute_file(dsn, tmpfile, post_code='INSERT INTO test VALUES(23)')
 
  60     assert temp_db_cursor.row_set('SELECT * FROM test') == {(23, )}
 
  64     TABLE_NAME = 'copytable'
 
  66     @pytest.fixture(autouse=True)
 
  67     def setup_test_table(self, table_factory):
 
  68         table_factory(self.TABLE_NAME, 'colA INT, colB TEXT')
 
  71     def table_rows(self, cursor):
 
  72         return cursor.row_set('SELECT * FROM ' + self.TABLE_NAME)
 
  75     def test_copybuffer_empty(self):
 
  76         with db_utils.CopyBuffer() as buf:
 
  77             buf.copy_out(None, "dummy")
 
  80     def test_all_columns(self, temp_db_cursor):
 
  81         with db_utils.CopyBuffer() as buf:
 
  85             buf.copy_out(temp_db_cursor, self.TABLE_NAME)
 
  87         assert self.table_rows(temp_db_cursor) == {(3, 'hum'), (None, 'f\\t')}
 
  90     def test_selected_columns(self, temp_db_cursor):
 
  91         with db_utils.CopyBuffer() as buf:
 
  94             buf.copy_out(temp_db_cursor, self.TABLE_NAME,
 
  97         assert self.table_rows(temp_db_cursor) == {(None, 'foo')}
 
 100     def test_reordered_columns(self, temp_db_cursor):
 
 101         with db_utils.CopyBuffer() as buf:
 
 105             buf.copy_out(temp_db_cursor, self.TABLE_NAME,
 
 106                          columns=['colB', 'colA'])
 
 108         assert self.table_rows(temp_db_cursor) == {(1, 'one'), (2, ' two ')}
 
 111     def test_special_characters(self, temp_db_cursor):
 
 112         with db_utils.CopyBuffer() as buf:
 
 117             buf.copy_out(temp_db_cursor, self.TABLE_NAME,
 
 120         assert self.table_rows(temp_db_cursor) == {(None, 'foo\tbar'),
 
 126 class TestCopyBufferJson:
 
 127     TABLE_NAME = 'copytable'
 
 129     @pytest.fixture(autouse=True)
 
 130     def setup_test_table(self, table_factory):
 
 131         table_factory(self.TABLE_NAME, 'colA INT, colB JSONB')
 
 134     def table_rows(self, cursor):
 
 135         cursor.execute('SELECT * FROM ' + self.TABLE_NAME)
 
 136         results = {k: v for k,v in cursor}
 
 138         assert len(results) == cursor.rowcount
 
 143     def test_json_object(self, temp_db_cursor):
 
 144         with db_utils.CopyBuffer() as buf:
 
 145             buf.add(1, json.dumps({'test': 'value', 'number': 1}))
 
 147             buf.copy_out(temp_db_cursor, self.TABLE_NAME)
 
 149         assert self.table_rows(temp_db_cursor) == \
 
 150                    {1: {'test': 'value', 'number': 1}}
 
 153     def test_json_object_special_chras(self, temp_db_cursor):
 
 154         with db_utils.CopyBuffer() as buf:
 
 155             buf.add(1, json.dumps({'te\tst': 'va\nlue', 'nu"mber': None}))
 
 157             buf.copy_out(temp_db_cursor, self.TABLE_NAME)
 
 159         assert self.table_rows(temp_db_cursor) == \
 
 160                    {1: {'te\tst': 'va\nlue', 'nu"mber': None}}