1 # SPDX-License-Identifier: GPL-3.0-or-later
3 # This file is part of Nominatim. (https://nominatim.org)
5 # Copyright (C) 2025 by the Nominatim developer community.
6 # For a full list of authors see the git log.
8 Fixtures for BDD test steps
13 from pathlib import Path
16 from psycopg import sql as pysql
18 # always test against the source
19 SRC_DIR = (Path(__file__) / '..' / '..' / '..').resolve()
20 sys.path.insert(0, str(SRC_DIR / 'src'))
23 from pytest_bdd.parsers import re as step_parse
24 from pytest_bdd import given, when, then, scenario
25 from pytest_bdd.feature import get_features
27 pytest.register_assert_rewrite('utils')
29 from utils.api_runner import APIRunner
30 from utils.api_result import APIResult
31 from utils.checks import ResultAttr, COMPARATOR_TERMS
32 from utils.geometry_alias import ALIASES
33 from utils.grid import Grid
34 from utils.db import DBManager
36 from nominatim_db.config import Configuration
37 from nominatim_db.data.country_info import setup_country_config
41 return [s.strip() for s in inp.split(',')]
44 def _pretty_json(inp):
45 return json.dumps(inp, indent=2)
48 def pytest_addoption(parser, pluginmanager):
49 parser.addoption('--nominatim-purge', dest='NOMINATIM_PURGE', action='store_true',
50 help='Force recreation of test databases from scratch.')
51 parser.addoption('--nominatim-keep-db', dest='NOMINATIM_KEEP_DB', action='store_true',
52 help='Do not drop the database after tests are finished.')
53 parser.addoption('--nominatim-api-engine', dest='NOMINATIM_API_ENGINE',
55 help='Chose the API engine to use when sending requests.')
56 parser.addoption('--nominatim-tokenizer', dest='NOMINATIM_TOKENIZER',
58 help='Use the specified tokenizer for importing data into '
59 'a Nominatim database.')
61 parser.addini('nominatim_test_db', default='test_nominatim',
62 help='Name of the database used for running a single test.')
63 parser.addini('nominatim_api_test_db', default='test_api_nominatim',
64 help='Name of the database for storing API test data.')
65 parser.addini('nominatim_template_db', default='test_template_nominatim',
66 help='Name of database used as a template for test databases.')
71 """ Default fixture for datatables, so that their presence can be optional.
78 """ Default fixture for node grids. Nothing set.
80 return Grid([[]], None, None)
83 @pytest.fixture(scope='session', autouse=True)
84 def setup_country_info():
85 setup_country_config(Configuration(None))
88 @pytest.fixture(scope='session')
89 def template_db(pytestconfig):
90 """ Create a template database containing the extensions and base data
91 needed by Nominatim. Using the template instead of doing the full
92 setup can speed up the tests.
94 The template database will only be created if it does not exist yet
95 or a purge has been explicitly requested.
97 dbm = DBManager(purge=pytestconfig.option.NOMINATIM_PURGE)
99 template_db = pytestconfig.getini('nominatim_template_db')
101 template_config = Configuration(
102 None, environ={'NOMINATIM_DATABASE_DSN': f"pgsql:dbname={template_db}"})
104 dbm.setup_template_db(template_config)
110 def def_config(pytestconfig):
111 dbname = pytestconfig.getini('nominatim_test_db')
113 return Configuration(None,
114 environ={'NOMINATIM_DATABASE_DSN': f"pgsql:dbname={dbname}"})
118 def db(template_db, pytestconfig):
119 """ Set up an empty database for use with osm2pgsql.
121 dbm = DBManager(purge=pytestconfig.option.NOMINATIM_PURGE)
123 dbname = pytestconfig.getini('nominatim_test_db')
125 dbm.create_db_from_template(dbname, template_db)
129 if not pytestconfig.option.NOMINATIM_KEEP_DB:
134 def db_conn(db, def_config):
135 with psycopg.connect(def_config.get_libpq_dsn()) as conn:
136 info = psycopg.types.TypeInfo.fetch(conn, "hstore")
137 psycopg.types.hstore.register_hstore(info, conn)
141 @when(step_parse(r'reverse geocoding (?P<lat>[\d.-]*),(?P<lon>[\d.-]*)'),
142 target_fixture='nominatim_result')
143 def reverse_geocode_via_api(test_config_env, pytestconfig, datatable, lat, lon):
144 runner = APIRunner(test_config_env, pytestconfig.option.NOMINATIM_API_ENGINE)
145 api_response = runner.run_step('reverse',
146 {'lat': float(lat), 'lon': float(lon)},
147 datatable, 'jsonv2', {})
149 assert api_response.status == 200
150 assert api_response.headers['content-type'] == 'application/json; charset=utf-8'
152 result = APIResult('json', 'reverse', api_response.body)
153 assert result.is_simple()
155 assert isinstance(result.result['lat'], str)
156 assert isinstance(result.result['lon'], str)
157 result.result['centroid'] = f"POINT({result.result['lon']} {result.result['lat']})"
162 @when(step_parse(r'reverse geocoding at node (?P<node>[\d]+)'),
163 target_fixture='nominatim_result')
164 def reverse_geocode_via_api_and_grid(test_config_env, pytestconfig, node_grid, datatable, node):
165 coords = node_grid.get(node)
167 raise ValueError('Unknown node id')
169 return reverse_geocode_via_api(test_config_env, pytestconfig, datatable, coords[1], coords[0])
172 @when(step_parse(r'geocoding(?: "(?P<query>.*)")?'),
173 target_fixture='nominatim_result')
174 def forward_geocode_via_api(test_config_env, pytestconfig, datatable, query):
175 runner = APIRunner(test_config_env, pytestconfig.option.NOMINATIM_API_ENGINE)
177 params = {'addressdetails': '1'}
181 api_response = runner.run_step('search', params, datatable, 'jsonv2', {})
183 assert api_response.status == 200
184 assert api_response.headers['content-type'] == 'application/json; charset=utf-8'
186 result = APIResult('json', 'search', api_response.body)
187 assert not result.is_simple()
189 for res in result.result:
190 assert isinstance(res['lat'], str)
191 assert isinstance(res['lon'], str)
192 res['centroid'] = f"POINT({res['lon']} {res['lat']})"
197 @then(step_parse(r'(?P<op>[a-z ]+) (?P<num>\d+) results? (?:are|is) returned'),
198 converters={'num': int})
199 def check_number_of_results(nominatim_result, op, num):
200 assert not nominatim_result.is_simple()
201 assert COMPARATOR_TERMS[op](num, len(nominatim_result))
204 @then(step_parse('the result metadata contains'))
205 def check_metadata_for_fields(nominatim_result, datatable):
206 if datatable[0] == ['param', 'value']:
207 pairs = datatable[1:]
209 pairs = zip(datatable[0], datatable[1])
212 assert ResultAttr(nominatim_result.meta, k) == v
215 @then(step_parse('the result metadata has no attributes (?P<attributes>.*)'),
216 converters={'attributes': _strlist})
217 def check_metadata_for_field_presence(nominatim_result, attributes):
218 assert all(a not in nominatim_result.meta for a in attributes), \
219 f"Unexpectedly have one of the attributes '{attributes}' in\n" \
220 f"{_pretty_json(nominatim_result.meta)}"
223 @then(step_parse(r'the result contains(?: in field (?P<field>\S+))?'))
224 def check_result_for_fields(nominatim_result, datatable, node_grid, field):
225 assert nominatim_result.is_simple()
227 if datatable[0] == ['param', 'value']:
228 pairs = datatable[1:]
230 pairs = zip(datatable[0], datatable[1])
232 prefix = field + '+' if field else ''
235 assert ResultAttr(nominatim_result.result, prefix + k, grid=node_grid) == v
238 @then(step_parse('the result has attributes (?P<attributes>.*)'),
239 converters={'attributes': _strlist})
240 def check_result_for_field_presence(nominatim_result, attributes):
241 assert nominatim_result.is_simple()
242 assert all(a in nominatim_result.result for a in attributes)
245 @then(step_parse('the result has no attributes (?P<attributes>.*)'),
246 converters={'attributes': _strlist})
247 def check_result_for_field_absence(nominatim_result, attributes):
248 assert nominatim_result.is_simple()
249 assert all(a not in nominatim_result.result for a in attributes)
253 r'the result contains array field (?P<field>\S+) where element (?P<num>\d+) contains'),
254 converters={'num': int})
255 def check_result_array_field_for_attributes(nominatim_result, datatable, field, num):
256 assert nominatim_result.is_simple()
258 if datatable[0] == ['param', 'value']:
259 pairs = datatable[1:]
261 pairs = zip(datatable[0], datatable[1])
263 prefix = f"{field}+{num}+"
266 assert ResultAttr(nominatim_result.result, prefix + k) == v
269 @then(step_parse('the result set contains(?P<exact> exactly)?'))
270 def check_result_list_match(nominatim_result, datatable, exact):
271 assert not nominatim_result.is_simple()
273 result_set = set(range(len(nominatim_result.result)))
275 for row in datatable[1:]:
276 for idx in result_set:
277 for key, value in zip(datatable[0], row):
278 if ResultAttr(nominatim_result.result[idx], key) != value:
282 result_set.remove(idx)
285 assert False, f"Missing data row {row}. Full response:\n{nominatim_result}"
288 assert not [nominatim_result.result[i] for i in result_set]
291 @then(step_parse('all results have attributes (?P<attributes>.*)'),
292 converters={'attributes': _strlist})
293 def check_all_results_for_field_presence(nominatim_result, attributes):
294 assert not nominatim_result.is_simple()
295 assert len(nominatim_result) > 0
296 for res in nominatim_result.result:
297 assert all(a in res for a in attributes), \
298 f"Missing one of the attributes '{attributes}' in\n{_pretty_json(res)}"
301 @then(step_parse('all results have no attributes (?P<attributes>.*)'),
302 converters={'attributes': _strlist})
303 def check_all_result_for_field_absence(nominatim_result, attributes):
304 assert not nominatim_result.is_simple()
305 assert len(nominatim_result) > 0
306 for res in nominatim_result.result:
307 assert all(a not in res for a in attributes), \
308 f"Unexpectedly have one of the attributes '{attributes}' in\n{_pretty_json(res)}"
311 @then(step_parse(r'all results contain(?: in field (?P<field>\S+))?'))
312 def check_all_results_contain(nominatim_result, datatable, node_grid, field):
313 assert not nominatim_result.is_simple()
314 assert len(nominatim_result) > 0
316 if datatable[0] == ['param', 'value']:
317 pairs = datatable[1:]
319 pairs = zip(datatable[0], datatable[1])
321 prefix = field + '+' if field else ''
324 for r in nominatim_result.result:
325 assert ResultAttr(r, prefix + k, grid=node_grid) == v
328 @then(step_parse(r'result (?P<num>\d+) contains(?: in field (?P<field>\S+))?'),
329 converters={'num': int})
330 def check_specific_result_for_fields(nominatim_result, datatable, num, field):
331 assert not nominatim_result.is_simple()
332 assert len(nominatim_result) > num
334 if datatable[0] == ['param', 'value']:
335 pairs = datatable[1:]
337 pairs = zip(datatable[0], datatable[1])
339 prefix = field + '+' if field else ''
342 assert ResultAttr(nominatim_result.result[num], prefix + k) == v
345 @given(step_parse(r'the (?P<step>[0-9.]+ )?grid(?: with origin (?P<origin>.*))?'),
346 target_fixture='node_grid')
347 def set_node_grid(datatable, step, origin):
353 coords = origin.split(',')
355 raise RuntimeError('Grid origin expects origin with x,y coordinates.')
356 origin = list(map(float, coords))
357 elif origin in ALIASES:
358 origin = ALIASES[origin]
360 raise RuntimeError('Grid origin must be either coordinate or alias.')
362 return Grid(datatable, step, origin)
365 @then(step_parse('(?P<table>placex?) has no entry for '
366 r'(?P<osm_type>[NRW])(?P<osm_id>\d+)(?::(?P<osm_class>\S+))?'),
367 converters={'osm_id': int})
368 def check_place_missing_lines(db_conn, table, osm_type, osm_id, osm_class):
369 sql = pysql.SQL("""SELECT count(*) FROM {}
370 WHERE osm_type = %s and osm_id = %s""").format(pysql.Identifier(table))
371 params = [osm_type, int(osm_id)]
373 sql += pysql.SQL(' AND class = %s')
374 params.append(osm_class)
376 with db_conn.cursor() as cur:
377 assert cur.execute(sql, params).fetchone()[0] == 0
380 if pytest.version_tuple >= (8, 0, 0):
381 def pytest_pycollect_makemodule(module_path, parent):
382 return BddTestCollector.from_parent(parent, path=module_path)
385 class BddTestCollector(pytest.Module):
387 def __init__(self, **kwargs):
388 super().__init__(**kwargs)
391 for item in super().collect():
394 if hasattr(self.obj, 'PYTEST_BDD_SCENARIOS'):
395 for path in self.obj.PYTEST_BDD_SCENARIOS:
396 for feature in get_features([str(Path(self.path.parent, path).resolve())]):
397 yield FeatureFile.from_parent(self,
398 name=str(Path(path, feature.rel_filename)),
399 path=Path(feature.filename),
403 # borrowed from pytest-bdd: src/pytest_bdd/scenario.py
404 def make_python_name(string: str) -> str:
405 """Make python attribute name out of a given string."""
406 string = re.sub(r"\W", "", string.replace(" ", "_"))
407 return re.sub(r"^\d+_*", "", string).lower()
410 class FeatureFile(pytest.File):
414 def __init__(self, feature, **kwargs):
415 self.feature = feature
416 super().__init__(**kwargs)
419 for sname, sobject in self.feature.scenarios.items():
420 class_name = f"L{sobject.line_number}"
421 test_name = "test_" + make_python_name(sname)
423 @scenario(self.feature.filename, sname)
427 tclass = type(class_name, (),
428 {test_name: staticmethod(_test)})
429 setattr(self.obj, class_name, tclass)
431 yield pytest.Class.from_parent(self, name=class_name, obj=tclass)