1 # SPDX-License-Identifier: GPL-3.0-or-later
3 # This file is part of Nominatim. (https://nominatim.org)
5 # Copyright (C) 2026 by the Nominatim developer community.
6 # For a full list of authors see the git log.
9 from pathlib import Path
12 from psycopg import sql as pysql
15 # always test against the source
16 SRC_DIR = (Path(__file__) / '..' / '..' / '..').resolve()
17 sys.path.insert(0, str(SRC_DIR / 'src'))
19 from nominatim_db.config import Configuration
20 from nominatim_db.db import connection, properties
21 from nominatim_db.db.sql_preprocessor import SQLPreprocessor
22 import nominatim_db.tokenizer.factory
24 import dummy_tokenizer
26 from cursor import CursorForTesting
29 def _with_srid(geom, default=None):
31 return None if default is None else f"SRID=4326;{default}"
33 return f"SRID=4326;{geom}"
42 def temp_db(monkeypatch):
43 """ Create an empty database for the test. The database name is also
44 exported into NOMINATIM_DATABASE_DSN.
46 name = 'test_nominatim_python_unittest'
48 with psycopg.connect(dbname='postgres', autocommit=True) as conn:
49 with conn.cursor() as cur:
50 cur.execute(pysql.SQL('DROP DATABASE IF EXISTS') + pysql.Identifier(name))
51 cur.execute(pysql.SQL('CREATE DATABASE') + pysql.Identifier(name))
53 monkeypatch.setenv('NOMINATIM_DATABASE_DSN', 'dbname=' + name)
55 with psycopg.connect(dbname=name) as conn:
56 with conn.cursor() as cur:
57 cur.execute('CREATE EXTENSION hstore')
61 with psycopg.connect(dbname='postgres', autocommit=True) as conn:
62 with conn.cursor() as cur:
63 cur.execute(pysql.SQL('DROP DATABASE IF EXISTS') + pysql.Identifier(name))
68 return 'dbname=' + temp_db
72 def temp_db_with_extensions(temp_db):
73 with psycopg.connect(dbname=temp_db) as conn:
74 with conn.cursor() as cur:
75 cur.execute('CREATE EXTENSION postgis')
81 def temp_db_conn(temp_db):
82 """ Connection to the test database.
84 with connection.connect('', autocommit=True, dbname=temp_db) as conn:
85 connection.register_hstore(conn)
90 def temp_db_cursor(temp_db):
91 """ Connection and cursor towards the test database. The connection will
92 be in auto-commit mode.
94 with psycopg.connect(dbname=temp_db, autocommit=True, cursor_factory=CursorForTesting) as conn:
95 connection.register_hstore(conn)
96 with conn.cursor() as cur:
101 def table_factory(temp_db_conn):
102 """ A fixture that creates new SQL tables, potentially filled with
105 def mk_table(name, definition='id INT', content=None):
106 with psycopg.ClientCursor(temp_db_conn) as cur:
107 cur.execute(pysql.SQL("CREATE TABLE {} ({})")
108 .format(pysql.Identifier(name),
109 pysql.SQL(definition)))
111 sql = pysql.SQL("INSERT INTO {} VALUES ({})")\
112 .format(pysql.Identifier(name),
113 pysql.SQL(',').join([pysql.Placeholder()
114 for _ in range(len(content[0]))]))
115 cur.executemany(sql, content)
122 cfg = Configuration(None)
127 def project_env(tmp_path):
128 projdir = tmp_path / 'project'
130 cfg = Configuration(projdir)
135 def country_table(table_factory):
136 table_factory('country_name', 'partition INT, country_code varchar(2), name hstore')
140 def country_row(country_table, temp_db_cursor):
141 def _add(partition=None, country=None, names=None):
142 temp_db_cursor.insert_row('country_name', partition=partition,
143 country_code=country, name=names)
149 def load_sql(temp_db_conn, country_row):
150 proc = SQLPreprocessor(temp_db_conn, Configuration(None))
152 def _run(filename, **kwargs):
153 proc.run_sql_file(temp_db_conn, filename, **kwargs)
159 def property_table(load_sql, temp_db_conn):
160 load_sql('tables/nominatim_properties.sql')
163 def set(self, name, value):
164 properties.set_property(temp_db_conn, name, value)
167 return properties.get_property(temp_db_conn, name)
173 def status_table(load_sql):
174 """ Create an empty version of the status table and
175 the status logging table.
177 load_sql('tables/status.sql')
181 def place_table(temp_db_with_extensions, table_factory):
182 """ Create an empty version of the place table.
184 table_factory('place',
185 """osm_id int8 NOT NULL,
186 osm_type char(1) NOT NULL,
190 admin_level smallint,
193 geometry Geometry(Geometry,4326) NOT NULL""")
197 def place_row(place_table, temp_db_cursor):
198 """ A factory for rows in the place table. The table is created as a
199 prerequisite to the fixture.
201 idseq = itertools.count(1001)
203 def _insert(osm_type='N', osm_id=None, cls='amenity', typ='cafe', names=None,
204 admin_level=None, address=None, extratags=None, geom='POINT(0 0)'):
205 args = {'osm_type': osm_type, 'osm_id': osm_id or next(idseq),
206 'class': cls, 'type': typ, 'name': names, 'admin_level': admin_level,
207 'address': address, 'extratags': extratags,
208 'geometry': _with_srid(geom)}
209 temp_db_cursor.insert_row('place', **args)
215 def place_postcode_table(temp_db_with_extensions, table_factory):
216 """ Create an empty version of the place_postcode table.
218 table_factory('place_postcode',
219 """osm_type char(1) NOT NULL,
220 osm_id bigint NOT NULL,
221 postcode text NOT NULL,
223 centroid Geometry(Point, 4326) NOT NULL,
224 geometry Geometry(Geometry, 4326)""")
228 def place_postcode_row(place_postcode_table, temp_db_cursor):
229 """ A factory for rows in the place_postcode table. The table is created as a
230 prerequisite to the fixture.
232 idseq = itertools.count(5001)
234 def _insert(osm_type='N', osm_id=None, postcode=None, country=None,
235 centroid='POINT(12.0 4.0)', geom=None):
236 temp_db_cursor.insert_row('place_postcode',
237 osm_type=osm_type, osm_id=osm_id or next(idseq),
238 postcode=postcode, country_code=country,
239 centroid=_with_srid(centroid),
240 geometry=_with_srid(geom))
246 def placex_table(temp_db_with_extensions, temp_db_conn):
247 """ Create an empty version of the place table.
249 return mocks.MockPlacexTable(temp_db_conn)
253 def osmline_table(temp_db_with_extensions, load_sql):
254 load_sql('tables/interpolation.sql')
258 def sql_preprocessor_cfg(tmp_path, table_factory, temp_db_with_extensions, country_row):
259 for part in range(3):
260 country_row(partition=part)
262 cfg = Configuration(None)
263 cfg.set_libdirs(sql=tmp_path)
268 def sql_preprocessor(sql_preprocessor_cfg, temp_db_conn):
269 return SQLPreprocessor(temp_db_conn, sql_preprocessor_cfg)
273 def tokenizer_mock(monkeypatch, property_table):
274 """ Sets up the configuration so that the test dummy tokenizer will be
275 loaded when the tokenizer factory is used. Also returns a factory
276 with which a new dummy tokenizer may be created.
278 monkeypatch.setenv('NOMINATIM_TOKENIZER', 'dummy')
280 def _import_dummy(*args, **kwargs):
281 return dummy_tokenizer
283 monkeypatch.setattr(nominatim_db.tokenizer.factory,
284 "_import_tokenizer", _import_dummy)
285 property_table.set('tokenizer', 'dummy')
287 def _create_tokenizer():
288 return dummy_tokenizer.DummyTokenizer(None)
290 return _create_tokenizer