1 # SPDX-License-Identifier: GPL-3.0-or-later
3 # This file is part of Nominatim. (https://nominatim.org)
5 # Copyright (C) 2026 by the Nominatim developer community.
6 # For a full list of authors see the git log.
10 from pathlib import Path
12 if sys.platform == 'win32':
13 asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy())
16 from psycopg import sql as pysql
19 # always test against the source
20 SRC_DIR = (Path(__file__) / '..' / '..' / '..').resolve()
21 sys.path.insert(0, str(SRC_DIR / 'src'))
23 from nominatim_db.config import Configuration
24 from nominatim_db.db import connection, properties
25 from nominatim_db.db.sql_preprocessor import SQLPreprocessor
26 import nominatim_db.tokenizer.factory
28 import dummy_tokenizer
29 from cursor import CursorForTesting
32 def _with_srid(geom, default=None):
34 return None if default is None else f"SRID=4326;{default}"
36 return f"SRID=4326;{geom}"
45 def temp_db(monkeypatch):
46 """ Create an empty database for the test. The database name is also
47 exported into NOMINATIM_DATABASE_DSN.
49 name = 'test_nominatim_python_unittest'
51 with psycopg.connect(dbname='postgres', autocommit=True) as conn:
52 with conn.cursor() as cur:
53 cur.execute(pysql.SQL('DROP DATABASE IF EXISTS') + pysql.Identifier(name))
54 cur.execute(pysql.SQL('CREATE DATABASE') + pysql.Identifier(name))
56 monkeypatch.setenv('NOMINATIM_DATABASE_DSN', 'dbname=' + name)
58 with psycopg.connect(dbname=name) as conn:
59 with conn.cursor() as cur:
60 cur.execute('CREATE EXTENSION hstore')
64 with psycopg.connect(dbname='postgres', autocommit=True) as conn:
65 with conn.cursor() as cur:
66 cur.execute(pysql.SQL('DROP DATABASE IF EXISTS') + pysql.Identifier(name))
71 return 'dbname=' + temp_db
75 def temp_db_with_extensions(temp_db):
76 with psycopg.connect(dbname=temp_db) as conn:
77 with conn.cursor() as cur:
78 cur.execute('CREATE EXTENSION postgis')
84 def temp_db_conn(temp_db):
85 """ Connection to the test database.
87 with connection.connect('', autocommit=True, dbname=temp_db) as conn:
88 connection.register_hstore(conn)
93 def temp_db_cursor(temp_db):
94 """ Connection and cursor towards the test database. The connection will
95 be in auto-commit mode.
97 with psycopg.connect(dbname=temp_db, autocommit=True, cursor_factory=CursorForTesting) as conn:
98 connection.register_hstore(conn)
99 with conn.cursor() as cur:
104 def table_factory(temp_db_conn):
105 """ A fixture that creates new SQL tables, potentially filled with
108 def mk_table(name, definition='id INT', content=None):
109 with psycopg.ClientCursor(temp_db_conn) as cur:
110 cur.execute(pysql.SQL("CREATE TABLE {} ({})")
111 .format(pysql.Identifier(name),
112 pysql.SQL(definition)))
114 sql = pysql.SQL("INSERT INTO {} VALUES ({})")\
115 .format(pysql.Identifier(name),
116 pysql.SQL(',').join([pysql.Placeholder()
117 for _ in range(len(content[0]))]))
118 cur.executemany(sql, content)
125 cfg = Configuration(None)
130 def project_env(tmp_path):
131 projdir = tmp_path / 'project'
133 cfg = Configuration(projdir)
138 def country_table(table_factory):
139 table_factory('country_name', 'partition INT, country_code varchar(2), name hstore')
143 def country_row(country_table, temp_db_cursor):
144 def _add(partition=None, country=None, names=None):
145 temp_db_cursor.insert_row('country_name', partition=partition,
146 country_code=country, name=names)
152 def load_sql(temp_db_conn, country_row):
153 proc = SQLPreprocessor(temp_db_conn, Configuration(None))
155 def _run(filename, **kwargs):
156 proc.run_sql_file(temp_db_conn, filename, **kwargs)
162 def property_table(load_sql, temp_db_conn):
163 load_sql('tables/nominatim_properties.sql')
166 def set(self, name, value):
167 properties.set_property(temp_db_conn, name, value)
170 return properties.get_property(temp_db_conn, name)
176 def status_table(load_sql):
177 """ Create an empty version of the status table and
178 the status logging table.
180 load_sql('tables/status.sql')
184 def place_table(temp_db_with_extensions, table_factory):
185 """ Create an empty version of the place table.
187 table_factory('place',
188 """osm_id int8 NOT NULL,
189 osm_type char(1) NOT NULL,
193 admin_level smallint,
196 geometry Geometry(Geometry,4326) NOT NULL""")
200 def place_row(place_table, temp_db_cursor):
201 """ A factory for rows in the place table. The table is created as a
202 prerequisite to the fixture.
204 idseq = itertools.count(1001)
206 def _insert(osm_type='N', osm_id=None, cls='amenity', typ='cafe', names=None,
207 admin_level=None, address=None, extratags=None, geom='POINT(0 0)'):
208 args = {'osm_type': osm_type, 'osm_id': osm_id or next(idseq),
209 'class': cls, 'type': typ, 'name': names, 'admin_level': admin_level,
210 'address': address, 'extratags': extratags,
211 'geometry': _with_srid(geom)}
212 temp_db_cursor.insert_row('place', **args)
218 def place_postcode_table(temp_db_with_extensions, table_factory):
219 """ Create an empty version of the place_postcode table.
221 table_factory('place_postcode',
222 """osm_type char(1) NOT NULL,
223 osm_id bigint NOT NULL,
224 postcode text NOT NULL,
226 centroid Geometry(Point, 4326) NOT NULL,
227 geometry Geometry(Geometry, 4326)""")
231 def place_postcode_row(place_postcode_table, temp_db_cursor):
232 """ A factory for rows in the place_postcode table. The table is created as a
233 prerequisite to the fixture.
235 idseq = itertools.count(5001)
237 def _insert(osm_type='N', osm_id=None, postcode=None, country=None,
238 centroid='POINT(12.0 4.0)', geom=None):
239 temp_db_cursor.insert_row('place_postcode',
240 osm_type=osm_type, osm_id=osm_id or next(idseq),
241 postcode=postcode, country_code=country,
242 centroid=_with_srid(centroid),
243 geometry=_with_srid(geom))
249 def placex_table(temp_db_with_extensions, temp_db_conn, load_sql, place_table):
250 """ Create an empty version of the placex table.
252 load_sql('tables/placex.sql')
253 temp_db_conn.execute("CREATE SEQUENCE IF NOT EXISTS seq_place START 1")
257 def placex_row(placex_table, temp_db_cursor):
258 """ A factory for rows in the placex table. The table is created as a
259 prerequisite to the fixture.
261 idseq = itertools.count(1001)
263 def _add(osm_type='N', osm_id=None, cls='amenity', typ='cafe', names=None,
264 admin_level=None, address=None, extratags=None, geom='POINT(10 4)',
265 country=None, housenumber=None, rank_search=30, rank_address=30,
266 centroid='POINT(10 4)', indexed_status=0, indexed_date=None):
267 args = {'place_id': pysql.SQL("nextval('seq_place')"),
268 'osm_type': osm_type, 'osm_id': osm_id or next(idseq),
269 'class': cls, 'type': typ, 'name': names, 'admin_level': admin_level,
270 'address': address, 'housenumber': housenumber,
271 'rank_search': rank_search, 'rank_address': rank_address,
272 'extratags': extratags,
273 'centroid': _with_srid(centroid), 'geometry': _with_srid(geom),
274 'country_code': country,
275 'indexed_status': indexed_status, 'indexed_date': indexed_date,
276 'partition': pysql.Literal(0), 'geometry_sector': pysql.Literal(1)}
277 return temp_db_cursor.insert_row('placex', **args)
283 def osmline_table(temp_db_with_extensions, load_sql):
284 load_sql('tables/interpolation.sql')
288 def osmline_row(osmline_table, temp_db_cursor):
289 idseq = itertools.count(20001)
291 def _add(osm_id=None, geom='LINESTRING(12.0 11.0, 12.003 11.0)'):
292 return temp_db_cursor.insert_row(
293 'location_property_osmline',
294 place_id=pysql.SQL("nextval('seq_place')"),
295 osm_id=osm_id or next(idseq),
296 geometry_sector=pysql.Literal(20),
297 partition=pysql.Literal(0),
299 linegeo=_with_srid(geom))
305 def postcode_table(temp_db_with_extensions, load_sql):
306 load_sql('tables/postcodes.sql')
310 def postcode_row(postcode_table, temp_db_cursor):
311 def _add(country, postcode, x=34.5, y=-9.33):
312 geom = _with_srid(f"POINT({x} {y})")
313 return temp_db_cursor.insert_row(
314 'location_postcodes',
315 place_id=pysql.SQL("nextval('seq_place')"),
316 indexed_status=pysql.Literal(1),
317 country_code=country, postcode=postcode,
319 rank_search=pysql.Literal(16),
320 geometry=('ST_Expand(%s::geometry, 0.005)', geom))
326 def sql_preprocessor_cfg(tmp_path, table_factory, temp_db_with_extensions, country_row):
327 for part in range(3):
328 country_row(partition=part)
330 cfg = Configuration(None)
331 cfg.set_libdirs(sql=tmp_path)
336 def sql_preprocessor(sql_preprocessor_cfg, temp_db_conn):
337 return SQLPreprocessor(temp_db_conn, sql_preprocessor_cfg)
341 def tokenizer_mock(monkeypatch, property_table):
342 """ Sets up the configuration so that the test dummy tokenizer will be
343 loaded when the tokenizer factory is used. Also returns a factory
344 with which a new dummy tokenizer may be created.
346 monkeypatch.setenv('NOMINATIM_TOKENIZER', 'dummy')
348 def _import_dummy(*args, **kwargs):
349 return dummy_tokenizer
351 monkeypatch.setattr(nominatim_db.tokenizer.factory,
352 "_import_tokenizer", _import_dummy)
353 property_table.set('tokenizer', 'dummy')
355 def _create_tokenizer():
356 return dummy_tokenizer.DummyTokenizer(None)
358 return _create_tokenizer