1 # SPDX-License-Identifier: GPL-2.0-only
 
   3 # This file is part of Nominatim. (https://nominatim.org)
 
   5 # Copyright (C) 2022 by the Nominatim developer community.
 
   6 # For a full list of authors see the git log.
 
   8 Specialised psycopg2 cursor with shortcut functions useful for testing.
 
  10 import psycopg2.extras
 
  12 class CursorForTesting(psycopg2.extras.DictCursor):
 
  13     """ Extension to the DictCursor class that provides execution
 
  14         short-cuts that simplify writing assertions.
 
  17     def scalar(self, sql, params=None):
 
  18         """ Execute a query with a single return value and return this value.
 
  19             Raises an assertion when not exactly one row is returned.
 
  21         self.execute(sql, params)
 
  22         assert self.rowcount == 1
 
  23         return self.fetchone()[0]
 
  26     def row_set(self, sql, params=None):
 
  27         """ Execute a query and return the result as a set of tuples.
 
  28             Fails when the SQL command returns duplicate rows.
 
  30         self.execute(sql, params)
 
  32         result = set((tuple(row) for row in self))
 
  33         assert len(result) == self.rowcount
 
  38     def table_exists(self, table):
 
  39         """ Check that a table with the given name exists in the database.
 
  41         num = self.scalar("""SELECT count(*) FROM pg_tables
 
  42                              WHERE tablename = %s""", (table, ))
 
  46     def index_exists(self, table, index):
 
  47         """ Check that an indexwith the given name exists on the given table.
 
  49         num = self.scalar("""SELECT count(*) FROM pg_indexes
 
  50                              WHERE tablename = %s and indexname = %s""",
 
  55     def table_rows(self, table, where=None):
 
  56         """ Return the number of rows in the given table.
 
  59             return self.scalar('SELECT count(*) FROM ' + table)
 
  61         return self.scalar('SELECT count(*) FROM {} WHERE {}'.format(table, where))
 
  64     def execute_values(self, *args, **kwargs):
 
  65         """ Execute the execute_values() function on the cursor.
 
  67         psycopg2.extras.execute_values(self, *args, **kwargs)