1 # SPDX-License-Identifier: GPL-3.0-or-later
3 # This file is part of Nominatim. (https://nominatim.org)
5 # Copyright (C) 2025 by the Nominatim developer community.
6 # For a full list of authors see the git log.
8 Fixtures for BDD test steps
12 from pathlib import Path
15 from psycopg import sql as pysql
17 # always test against the source
18 SRC_DIR = (Path(__file__) / '..' / '..' / '..').resolve()
19 sys.path.insert(0, str(SRC_DIR / 'src'))
22 from pytest_bdd.parsers import re as step_parse
23 from pytest_bdd import given, when, then
25 pytest.register_assert_rewrite('utils')
27 from utils.api_runner import APIRunner
28 from utils.api_result import APIResult
29 from utils.checks import ResultAttr, COMPARATOR_TERMS
30 from utils.geometry_alias import ALIASES
31 from utils.grid import Grid
32 from utils.db import DBManager
34 from nominatim_db.config import Configuration
35 from nominatim_db.data.country_info import setup_country_config
39 return [s.strip() for s in inp.split(',')]
42 def _pretty_json(inp):
43 return json.dumps(inp, indent=2)
46 def pytest_addoption(parser, pluginmanager):
47 parser.addoption('--nominatim-purge', dest='NOMINATIM_PURGE', action='store_true',
48 help='Force recreation of test databases from scratch.')
49 parser.addoption('--nominatim-keep-db', dest='NOMINATIM_KEEP_DB', action='store_true',
50 help='Do not drop the database after tests are finished.')
51 parser.addoption('--nominatim-api-engine', dest='NOMINATIM_API_ENGINE',
53 help='Chose the API engine to use when sending requests.')
54 parser.addoption('--nominatim-tokenizer', dest='NOMINATIM_TOKENIZER',
56 help='Use the specified tokenizer for importing data into '
57 'a Nominatim database.')
59 parser.addini('nominatim_test_db', default='test_nominatim',
60 help='Name of the database used for running a single test.')
61 parser.addini('nominatim_api_test_db', default='test_api_nominatim',
62 help='Name of the database for storing API test data.')
63 parser.addini('nominatim_template_db', default='test_template_nominatim',
64 help='Name of database used as a template for test databases.')
69 """ Default fixture for datatables, so that their presence can be optional.
76 """ Default fixture for node grids. Nothing set.
78 return Grid([[]], None, None)
81 @pytest.fixture(scope='session', autouse=True)
82 def setup_country_info():
83 setup_country_config(Configuration(None))
86 @pytest.fixture(scope='session')
87 def template_db(pytestconfig):
88 """ Create a template database containing the extensions and base data
89 needed by Nominatim. Using the template instead of doing the full
90 setup can speed up the tests.
92 The template database will only be created if it does not exist yet
93 or a purge has been explicitly requested.
95 dbm = DBManager(purge=pytestconfig.option.NOMINATIM_PURGE)
97 template_db = pytestconfig.getini('nominatim_template_db')
99 template_config = Configuration(
100 None, environ={'NOMINATIM_DATABASE_DSN': f"pgsql:dbname={template_db}"})
102 dbm.setup_template_db(template_config)
108 def def_config(pytestconfig):
109 dbname = pytestconfig.getini('nominatim_test_db')
111 return Configuration(None,
112 environ={'NOMINATIM_DATABASE_DSN': f"pgsql:dbname={dbname}"})
116 def db(template_db, pytestconfig):
117 """ Set up an empty database for use with osm2pgsql.
119 dbm = DBManager(purge=pytestconfig.option.NOMINATIM_PURGE)
121 dbname = pytestconfig.getini('nominatim_test_db')
123 dbm.create_db_from_template(dbname, template_db)
127 if not pytestconfig.option.NOMINATIM_KEEP_DB:
132 def db_conn(db, def_config):
133 with psycopg.connect(def_config.get_libpq_dsn()) as conn:
134 info = psycopg.types.TypeInfo.fetch(conn, "hstore")
135 psycopg.types.hstore.register_hstore(info, conn)
139 @when(step_parse(r'reverse geocoding (?P<lat>[\d.-]*),(?P<lon>[\d.-]*)'),
140 target_fixture='nominatim_result')
141 def reverse_geocode_via_api(test_config_env, pytestconfig, datatable, lat, lon):
142 runner = APIRunner(test_config_env, pytestconfig.option.NOMINATIM_API_ENGINE)
143 api_response = runner.run_step('reverse',
144 {'lat': float(lat), 'lon': float(lon)},
145 datatable, 'jsonv2', {})
147 assert api_response.status == 200
148 assert api_response.headers['content-type'] == 'application/json; charset=utf-8'
150 result = APIResult('json', 'reverse', api_response.body)
151 assert result.is_simple()
153 assert isinstance(result.result['lat'], str)
154 assert isinstance(result.result['lon'], str)
155 result.result['centroid'] = f"POINT({result.result['lon']} {result.result['lat']})"
160 @when(step_parse(r'reverse geocoding at node (?P<node>[\d]+)'),
161 target_fixture='nominatim_result')
162 def reverse_geocode_via_api_and_grid(test_config_env, pytestconfig, node_grid, datatable, node):
163 coords = node_grid.get(node)
165 raise ValueError('Unknown node id')
167 return reverse_geocode_via_api(test_config_env, pytestconfig, datatable, coords[1], coords[0])
170 @when(step_parse(r'geocoding(?: "(?P<query>.*)")?'),
171 target_fixture='nominatim_result')
172 def forward_geocode_via_api(test_config_env, pytestconfig, datatable, query):
173 runner = APIRunner(test_config_env, pytestconfig.option.NOMINATIM_API_ENGINE)
175 params = {'addressdetails': '1'}
179 api_response = runner.run_step('search', params, datatable, 'jsonv2', {})
181 assert api_response.status == 200
182 assert api_response.headers['content-type'] == 'application/json; charset=utf-8'
184 result = APIResult('json', 'search', api_response.body)
185 assert not result.is_simple()
187 for res in result.result:
188 assert isinstance(res['lat'], str)
189 assert isinstance(res['lon'], str)
190 res['centroid'] = f"POINT({res['lon']} {res['lat']})"
195 @then(step_parse(r'(?P<op>[a-z ]+) (?P<num>\d+) results? (?:are|is) returned'),
196 converters={'num': int})
197 def check_number_of_results(nominatim_result, op, num):
198 assert not nominatim_result.is_simple()
199 assert COMPARATOR_TERMS[op](num, len(nominatim_result))
202 @then(step_parse('the result metadata contains'))
203 def check_metadata_for_fields(nominatim_result, datatable):
204 if datatable[0] == ['param', 'value']:
205 pairs = datatable[1:]
207 pairs = zip(datatable[0], datatable[1])
210 assert ResultAttr(nominatim_result.meta, k) == v
213 @then(step_parse('the result metadata has no attributes (?P<attributes>.*)'),
214 converters={'attributes': _strlist})
215 def check_metadata_for_field_presence(nominatim_result, attributes):
216 assert all(a not in nominatim_result.meta for a in attributes), \
217 f"Unexpectedly have one of the attributes '{attributes}' in\n" \
218 f"{_pretty_json(nominatim_result.meta)}"
221 @then(step_parse(r'the result contains(?: in field (?P<field>\S+))?'))
222 def check_result_for_fields(nominatim_result, datatable, node_grid, field):
223 assert nominatim_result.is_simple()
225 if datatable[0] == ['param', 'value']:
226 pairs = datatable[1:]
228 pairs = zip(datatable[0], datatable[1])
230 prefix = field + '+' if field else ''
233 assert ResultAttr(nominatim_result.result, prefix + k, grid=node_grid) == v
236 @then(step_parse('the result has attributes (?P<attributes>.*)'),
237 converters={'attributes': _strlist})
238 def check_result_for_field_presence(nominatim_result, attributes):
239 assert nominatim_result.is_simple()
240 assert all(a in nominatim_result.result for a in attributes)
243 @then(step_parse('the result has no attributes (?P<attributes>.*)'),
244 converters={'attributes': _strlist})
245 def check_result_for_field_absence(nominatim_result, attributes):
246 assert nominatim_result.is_simple()
247 assert all(a not in nominatim_result.result for a in attributes)
251 r'the result contains array field (?P<field>\S+) where element (?P<num>\d+) contains'),
252 converters={'num': int})
253 def check_result_array_field_for_attributes(nominatim_result, datatable, field, num):
254 assert nominatim_result.is_simple()
256 if datatable[0] == ['param', 'value']:
257 pairs = datatable[1:]
259 pairs = zip(datatable[0], datatable[1])
261 prefix = f"{field}+{num}+"
264 assert ResultAttr(nominatim_result.result, prefix + k) == v
267 @then(step_parse('the result set contains(?P<exact> exactly)?'))
268 def check_result_list_match(nominatim_result, datatable, exact):
269 assert not nominatim_result.is_simple()
271 result_set = set(range(len(nominatim_result.result)))
273 for row in datatable[1:]:
274 for idx in result_set:
275 for key, value in zip(datatable[0], row):
276 if ResultAttr(nominatim_result.result[idx], key) != value:
280 result_set.remove(idx)
283 assert False, f"Missing data row {row}. Full response:\n{nominatim_result}"
286 assert not [nominatim_result.result[i] for i in result_set]
289 @then(step_parse('all results have attributes (?P<attributes>.*)'),
290 converters={'attributes': _strlist})
291 def check_all_results_for_field_presence(nominatim_result, attributes):
292 assert not nominatim_result.is_simple()
293 assert len(nominatim_result) > 0
294 for res in nominatim_result.result:
295 assert all(a in res for a in attributes), \
296 f"Missing one of the attributes '{attributes}' in\n{_pretty_json(res)}"
299 @then(step_parse('all results have no attributes (?P<attributes>.*)'),
300 converters={'attributes': _strlist})
301 def check_all_result_for_field_absence(nominatim_result, attributes):
302 assert not nominatim_result.is_simple()
303 assert len(nominatim_result) > 0
304 for res in nominatim_result.result:
305 assert all(a not in res for a in attributes), \
306 f"Unexpectedly have one of the attributes '{attributes}' in\n{_pretty_json(res)}"
309 @then(step_parse(r'all results contain(?: in field (?P<field>\S+))?'))
310 def check_all_results_contain(nominatim_result, datatable, node_grid, field):
311 assert not nominatim_result.is_simple()
312 assert len(nominatim_result) > 0
314 if datatable[0] == ['param', 'value']:
315 pairs = datatable[1:]
317 pairs = zip(datatable[0], datatable[1])
319 prefix = field + '+' if field else ''
322 for r in nominatim_result.result:
323 assert ResultAttr(r, prefix + k, grid=node_grid) == v
326 @then(step_parse(r'result (?P<num>\d+) contains(?: in field (?P<field>\S+))?'),
327 converters={'num': int})
328 def check_specific_result_for_fields(nominatim_result, datatable, num, field):
329 assert not nominatim_result.is_simple()
330 assert len(nominatim_result) > num
332 if datatable[0] == ['param', 'value']:
333 pairs = datatable[1:]
335 pairs = zip(datatable[0], datatable[1])
337 prefix = field + '+' if field else ''
340 assert ResultAttr(nominatim_result.result[num], prefix + k) == v
343 @given(step_parse(r'the (?P<step>[0-9.]+ )?grid(?: with origin (?P<origin>.*))?'),
344 target_fixture='node_grid')
345 def set_node_grid(datatable, step, origin):
351 coords = origin.split(',')
353 raise RuntimeError('Grid origin expects origin with x,y coordinates.')
354 origin = list(map(float, coords))
355 elif origin in ALIASES:
356 origin = ALIASES[origin]
358 raise RuntimeError('Grid origin must be either coordinate or alias.')
360 return Grid(datatable, step, origin)
363 @then(step_parse('(?P<table>placex?) has no entry for '
364 r'(?P<osm_type>[NRW])(?P<osm_id>\d+)(?::(?P<osm_class>\S+))?'),
365 converters={'osm_id': int})
366 def check_place_missing_lines(db_conn, table, osm_type, osm_id, osm_class):
367 sql = pysql.SQL("""SELECT count(*) FROM {}
368 WHERE osm_type = %s and osm_id = %s""").format(pysql.Identifier(table))
369 params = [osm_type, int(osm_id)]
371 sql += pysql.SQL(' AND class = %s')
372 params.append(osm_class)
374 with db_conn.cursor() as cur:
375 assert cur.execute(sql, params).fetchone()[0] == 0