1 # SPDX-License-Identifier: GPL-3.0-or-later
 
   3 # This file is part of Nominatim. (https://nominatim.org)
 
   5 # Copyright (C) 2025 by the Nominatim developer community.
 
   6 # For a full list of authors see the git log.
 
   9 from pathlib import Path
 
  12 from psycopg import sql as pysql
 
  15 # always test against the source
 
  16 SRC_DIR = (Path(__file__) / '..' / '..' / '..').resolve()
 
  17 sys.path.insert(0, str(SRC_DIR / 'src'))
 
  19 from nominatim_db.config import Configuration
 
  20 from nominatim_db.db import connection
 
  21 from nominatim_db.db.sql_preprocessor import SQLPreprocessor
 
  22 import nominatim_db.tokenizer.factory
 
  24 import dummy_tokenizer
 
  26 from cursor import CursorForTesting
 
  35 def temp_db(monkeypatch):
 
  36     """ Create an empty database for the test. The database name is also
 
  37         exported into NOMINATIM_DATABASE_DSN.
 
  39     name = 'test_nominatim_python_unittest'
 
  41     with psycopg.connect(dbname='postgres', autocommit=True) as conn:
 
  42         with conn.cursor() as cur:
 
  43             cur.execute(pysql.SQL('DROP DATABASE IF EXISTS') + pysql.Identifier(name))
 
  44             cur.execute(pysql.SQL('CREATE DATABASE') + pysql.Identifier(name))
 
  46     monkeypatch.setenv('NOMINATIM_DATABASE_DSN', 'dbname=' + name)
 
  48     with psycopg.connect(dbname=name) as conn:
 
  49         with conn.cursor() as cur:
 
  50             cur.execute('CREATE EXTENSION hstore')
 
  54     with psycopg.connect(dbname='postgres', autocommit=True) as conn:
 
  55         with conn.cursor() as cur:
 
  56             cur.execute('DROP DATABASE IF EXISTS {}'.format(name))
 
  61     return 'dbname=' + temp_db
 
  65 def temp_db_with_extensions(temp_db):
 
  66     with psycopg.connect(dbname=temp_db) as conn:
 
  67         with conn.cursor() as cur:
 
  68             cur.execute('CREATE EXTENSION postgis')
 
  74 def temp_db_conn(temp_db):
 
  75     """ Connection to the test database.
 
  77     with connection.connect('', autocommit=True, dbname=temp_db) as conn:
 
  78         connection.register_hstore(conn)
 
  83 def temp_db_cursor(temp_db):
 
  84     """ Connection and cursor towards the test database. The connection will
 
  85         be in auto-commit mode.
 
  87     with psycopg.connect(dbname=temp_db, autocommit=True, cursor_factory=CursorForTesting) as conn:
 
  88         connection.register_hstore(conn)
 
  89         with conn.cursor() as cur:
 
  94 def table_factory(temp_db_conn):
 
  95     """ A fixture that creates new SQL tables, potentially filled with
 
  98     def mk_table(name, definition='id INT', content=None):
 
  99         with psycopg.ClientCursor(temp_db_conn) as cur:
 
 100             cur.execute('CREATE TABLE {} ({})'.format(name, definition))
 
 102                 sql = pysql.SQL("INSERT INTO {} VALUES ({})")\
 
 103                            .format(pysql.Identifier(name),
 
 104                                    pysql.SQL(',').join([pysql.Placeholder()
 
 105                                                         for _ in range(len(content[0]))]))
 
 106                 cur.executemany(sql, content)
 
 113     cfg = Configuration(None)
 
 118 def project_env(tmp_path):
 
 119     projdir = tmp_path / 'project'
 
 121     cfg = Configuration(projdir)
 
 126 def property_table(table_factory, temp_db_conn):
 
 127     table_factory('nominatim_properties', 'property TEXT, value TEXT')
 
 129     return mocks.MockPropertyTable(temp_db_conn)
 
 133 def status_table(table_factory):
 
 134     """ Create an empty version of the status table and
 
 135         the status logging table.
 
 137     table_factory('import_status',
 
 138                   """lastimportdate timestamp with time zone NOT NULL,
 
 141     table_factory('import_osmosis_log',
 
 142                   """batchend timestamp,
 
 151 def place_table(temp_db_with_extensions, table_factory):
 
 152     """ Create an empty version of the place table.
 
 154     table_factory('place',
 
 155                   """osm_id int8 NOT NULL,
 
 156                      osm_type char(1) NOT NULL,
 
 160                      admin_level smallint,
 
 163                      geometry Geometry(Geometry,4326) NOT NULL""")
 
 167 def place_row(place_table, temp_db_cursor):
 
 168     """ A factory for rows in the place table. The table is created as a
 
 169         prerequisite to the fixture.
 
 171     idseq = itertools.count(1001)
 
 172     def _insert(osm_type='N', osm_id=None, cls='amenity', typ='cafe', names=None,
 
 173                 admin_level=None, address=None, extratags=None, geom=None):
 
 174         temp_db_cursor.execute("INSERT INTO place VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s)",
 
 175                                (osm_id or next(idseq), osm_type, cls, typ, names,
 
 176                                 admin_level, address, extratags,
 
 177                                 geom or 'SRID=4326;POINT(0 0)'))
 
 183 def placex_table(temp_db_with_extensions, temp_db_conn):
 
 184     """ Create an empty version of the place table.
 
 186     return mocks.MockPlacexTable(temp_db_conn)
 
 190 def osmline_table(temp_db_with_extensions, table_factory):
 
 191     table_factory('location_property_osmline',
 
 194                      parent_place_id BIGINT,
 
 195                      geometry_sector INTEGER,
 
 196                      indexed_date TIMESTAMP,
 
 200                      indexed_status SMALLINT,
 
 202                      interpolationtype TEXT,
 
 205                      country_code VARCHAR(2)""")
 
 209 def sql_preprocessor_cfg(tmp_path, table_factory, temp_db_with_extensions):
 
 210     table_factory('country_name', 'partition INT', ((0, ), (1, ), (2, )))
 
 211     cfg = Configuration(None)
 
 212     cfg.set_libdirs(sql=tmp_path)
 
 217 def sql_preprocessor(sql_preprocessor_cfg, temp_db_conn):
 
 218     return SQLPreprocessor(temp_db_conn, sql_preprocessor_cfg)
 
 222 def tokenizer_mock(monkeypatch, property_table):
 
 223     """ Sets up the configuration so that the test dummy tokenizer will be
 
 224         loaded when the tokenizer factory is used. Also returns a factory
 
 225         with which a new dummy tokenizer may be created.
 
 227     monkeypatch.setenv('NOMINATIM_TOKENIZER', 'dummy')
 
 229     def _import_dummy(*args, **kwargs):
 
 230         return dummy_tokenizer
 
 232     monkeypatch.setattr(nominatim_db.tokenizer.factory,
 
 233                         "_import_tokenizer", _import_dummy)
 
 234     property_table.set('tokenizer', 'dummy')
 
 236     def _create_tokenizer():
 
 237         return dummy_tokenizer.DummyTokenizer(None)
 
 239     return _create_tokenizer