1 # SPDX-License-Identifier: GPL-3.0-or-later
3 # This file is part of Nominatim. (https://nominatim.org)
5 # Copyright (C) 2025 by the Nominatim developer community.
6 # For a full list of authors see the git log.
8 Fixtures for BDD test steps
12 from pathlib import Path
15 from psycopg import sql as pysql
17 # always test against the source
18 SRC_DIR = (Path(__file__) / '..' / '..' / '..').resolve()
19 sys.path.insert(0, str(SRC_DIR / 'src'))
22 from pytest_bdd.parsers import re as step_parse
23 from pytest_bdd import given, when, then
25 pytest.register_assert_rewrite('utils')
27 from utils.api_runner import APIRunner
28 from utils.api_result import APIResult
29 from utils.checks import ResultAttr, COMPARATOR_TERMS
30 from utils.geometry_alias import ALIASES
31 from utils.grid import Grid
32 from utils.db import DBManager
34 from nominatim_db.config import Configuration
35 from nominatim_db.data.country_info import setup_country_config
39 return [s.strip() for s in inp.split(',')]
42 def _pretty_json(inp):
43 return json.dumps(inp, indent=2)
46 def pytest_addoption(parser, pluginmanager):
47 parser.addoption('--nominatim-purge', dest='NOMINATIM_PURGE', action='store_true',
48 help='Force recreation of test databases from scratch.')
49 parser.addoption('--nominatim-keep-db', dest='NOMINATIM_KEEP_DB', action='store_true',
50 help='Do not drop the database after tests are finished.')
51 parser.addoption('--nominatim-api-engine', dest='NOMINATIM_API_ENGINE',
53 help='Chose the API engine to use when sending requests.')
54 parser.addoption('--nominatim-tokenizer', dest='NOMINATIM_TOKENIZER',
56 help='Use the specified tokenizer for importing data into '
57 'a Nominatim database.')
59 parser.addini('nominatim_test_db', default='test_nominatim',
60 help='Name of the database used for running a single test.')
61 parser.addini('nominatim_api_test_db', default='test_api_nominatim',
62 help='Name of the database for storing API test data.')
63 parser.addini('nominatim_template_db', default='test_template_nominatim',
64 help='Name of database used as a template for test databases.')
69 """ Default fixture for datatables, so that their presence can be optional.
76 """ Default fixture for node grids. Nothing set.
78 return Grid([[]], None, None)
81 @pytest.fixture(scope='session', autouse=True)
82 def setup_country_info():
83 setup_country_config(Configuration(None))
86 @pytest.fixture(scope='session')
87 def template_db(pytestconfig):
88 """ Create a template database containing the extensions and base data
89 needed by Nominatim. Using the template instead of doing the full
90 setup can speed up the tests.
92 The template database will only be created if it does not exist yet
93 or a purge has been explicitly requested.
95 dbm = DBManager(purge=pytestconfig.option.NOMINATIM_PURGE)
97 template_db = pytestconfig.getini('nominatim_template_db')
99 template_config = Configuration(
100 None, environ={'NOMINATIM_DATABASE_DSN': f"pgsql:dbname={template_db}"})
102 dbm.setup_template_db(template_config)
108 def def_config(pytestconfig):
109 dbname = pytestconfig.getini('nominatim_test_db')
111 return Configuration(None,
112 environ={'NOMINATIM_DATABASE_DSN': f"pgsql:dbname={dbname}"})
116 def db(template_db, pytestconfig):
117 """ Set up an empty database for use with osm2pgsql.
119 dbm = DBManager(purge=pytestconfig.option.NOMINATIM_PURGE)
121 dbname = pytestconfig.getini('nominatim_test_db')
123 dbm.create_db_from_template(dbname, template_db)
127 if not pytestconfig.option.NOMINATIM_KEEP_DB:
132 def db_conn(db, def_config):
133 with psycopg.connect(def_config.get_libpq_dsn()) as conn:
134 info = psycopg.types.TypeInfo.fetch(conn, "hstore")
135 psycopg.types.hstore.register_hstore(info, conn)
139 @when(step_parse(r'reverse geocoding (?P<lat>[\d.-]*),(?P<lon>[\d.-]*)'),
140 target_fixture='nominatim_result')
141 def reverse_geocode_via_api(test_config_env, pytestconfig, datatable, lat, lon):
142 runner = APIRunner(test_config_env, pytestconfig.option.NOMINATIM_API_ENGINE)
143 api_response = runner.run_step('reverse',
144 {'lat': float(lat), 'lon': float(lon)},
145 datatable, 'jsonv2', {})
147 assert api_response.status == 200
148 assert api_response.headers['content-type'] == 'application/json; charset=utf-8'
150 result = APIResult('json', 'reverse', api_response.body)
151 assert result.is_simple()
156 @when(step_parse(r'reverse geocoding at node (?P<node>[\d]+)'),
157 target_fixture='nominatim_result')
158 def reverse_geocode_via_api_and_grid(test_config_env, pytestconfig, node_grid, datatable, node):
159 coords = node_grid.get(node)
161 raise ValueError('Unknown node id')
162 runner = APIRunner(test_config_env, pytestconfig.option.NOMINATIM_API_ENGINE)
163 api_response = runner.run_step('reverse',
164 {'lat': coords[1], 'lon': coords[0]},
165 datatable, 'jsonv2', {})
167 assert api_response.status == 200
168 assert api_response.headers['content-type'] == 'application/json; charset=utf-8'
170 result = APIResult('json', 'reverse', api_response.body)
171 assert result.is_simple()
173 result.result['centroid'] = f"POINT({result.result['lon']:.7f} {result.result['lat']:.7f})"
178 @when(step_parse(r'geocoding(?: "(?P<query>.*)")?'),
179 target_fixture='nominatim_result')
180 def forward_geocode_via_api(test_config_env, pytestconfig, datatable, query):
181 runner = APIRunner(test_config_env, pytestconfig.option.NOMINATIM_API_ENGINE)
183 params = {'addressdetails': '1'}
187 api_response = runner.run_step('search', params, datatable, 'jsonv2', {})
189 assert api_response.status == 200
190 assert api_response.headers['content-type'] == 'application/json; charset=utf-8'
192 result = APIResult('json', 'search', api_response.body)
193 assert not result.is_simple()
195 for res in result.result:
196 res['centroid'] = f"POINT({res['lon']:.7f} {res['lat']:.7f})"
201 @then(step_parse(r'(?P<op>[a-z ]+) (?P<num>\d+) results? (?:are|is) returned'),
202 converters={'num': int})
203 def check_number_of_results(nominatim_result, op, num):
204 assert not nominatim_result.is_simple()
205 assert COMPARATOR_TERMS[op](num, len(nominatim_result))
208 @then(step_parse('the result metadata contains'))
209 def check_metadata_for_fields(nominatim_result, datatable):
210 if datatable[0] == ['param', 'value']:
211 pairs = datatable[1:]
213 pairs = zip(datatable[0], datatable[1])
216 assert ResultAttr(nominatim_result.meta, k) == v
219 @then(step_parse('the result metadata has no attributes (?P<attributes>.*)'),
220 converters={'attributes': _strlist})
221 def check_metadata_for_field_presence(nominatim_result, attributes):
222 assert all(a not in nominatim_result.meta for a in attributes), \
223 f"Unexpectedly have one of the attributes '{attributes}' in\n" \
224 f"{_pretty_json(nominatim_result.meta)}"
227 @then(step_parse(r'the result contains(?: in field (?P<field>\S+))?'))
228 def check_result_for_fields(nominatim_result, datatable, node_grid, field):
229 assert nominatim_result.is_simple()
231 if datatable[0] == ['param', 'value']:
232 pairs = datatable[1:]
234 pairs = zip(datatable[0], datatable[1])
236 prefix = field + '+' if field else ''
239 assert ResultAttr(nominatim_result.result, prefix + k, grid=node_grid) == v
242 @then(step_parse('the result has attributes (?P<attributes>.*)'),
243 converters={'attributes': _strlist})
244 def check_result_for_field_presence(nominatim_result, attributes):
245 assert nominatim_result.is_simple()
246 assert all(a in nominatim_result.result for a in attributes)
249 @then(step_parse('the result has no attributes (?P<attributes>.*)'),
250 converters={'attributes': _strlist})
251 def check_result_for_field_absence(nominatim_result, attributes):
252 assert nominatim_result.is_simple()
253 assert all(a not in nominatim_result.result for a in attributes)
256 @then(step_parse('the result set contains(?P<exact> exactly)?'))
257 def check_result_list_match(nominatim_result, datatable, exact):
258 assert not nominatim_result.is_simple()
260 result_set = set(range(len(nominatim_result.result)))
262 for row in datatable[1:]:
263 for idx in result_set:
264 for key, value in zip(datatable[0], row):
265 if ResultAttr(nominatim_result.result[idx], key) != value:
269 result_set.remove(idx)
272 assert False, f"Missing data row {row}. Full response:\n{nominatim_result}"
275 assert not [nominatim_result.result[i] for i in result_set]
278 @then(step_parse('all results have attributes (?P<attributes>.*)'),
279 converters={'attributes': _strlist})
280 def check_all_results_for_field_presence(nominatim_result, attributes):
281 assert not nominatim_result.is_simple()
282 assert len(nominatim_result) > 0
283 for res in nominatim_result.result:
284 assert all(a in res for a in attributes), \
285 f"Missing one of the attributes '{attributes}' in\n{_pretty_json(res)}"
288 @then(step_parse('all results have no attributes (?P<attributes>.*)'),
289 converters={'attributes': _strlist})
290 def check_all_result_for_field_absence(nominatim_result, attributes):
291 assert not nominatim_result.is_simple()
292 assert len(nominatim_result) > 0
293 for res in nominatim_result.result:
294 assert all(a not in res for a in attributes), \
295 f"Unexpectedly have one of the attributes '{attributes}' in\n{_pretty_json(res)}"
298 @then(step_parse(r'all results contain(?: in field (?P<field>\S+))?'))
299 def check_all_results_contain(nominatim_result, datatable, node_grid, field):
300 assert not nominatim_result.is_simple()
301 assert len(nominatim_result) > 0
303 if datatable[0] == ['param', 'value']:
304 pairs = datatable[1:]
306 pairs = zip(datatable[0], datatable[1])
308 prefix = field + '+' if field else ''
311 for r in nominatim_result.result:
312 assert ResultAttr(r, prefix + k, grid=node_grid) == v
315 @then(step_parse(r'result (?P<num>\d+) contains(?: in field (?P<field>\S+))?'),
316 converters={'num': int})
317 def check_specific_result_for_fields(nominatim_result, datatable, num, field):
318 assert not nominatim_result.is_simple()
319 assert len(nominatim_result) > num
321 if datatable[0] == ['param', 'value']:
322 pairs = datatable[1:]
324 pairs = zip(datatable[0], datatable[1])
326 prefix = field + '+' if field else ''
329 assert ResultAttr(nominatim_result.result[num], prefix + k) == v
332 @given(step_parse(r'the (?P<step>[0-9.]+ )?grid(?: with origin (?P<origin>.*))?'),
333 target_fixture='node_grid')
334 def set_node_grid(datatable, step, origin):
340 coords = origin.split(',')
342 raise RuntimeError('Grid origin expects origin with x,y coordinates.')
343 origin = list(map(float, coords))
344 elif origin in ALIASES:
345 origin = ALIASES[origin]
347 raise RuntimeError('Grid origin must be either coordinate or alias.')
349 return Grid(datatable, step, origin)
352 @then(step_parse('(?P<table>placex?) has no entry for '
353 r'(?P<osm_type>[NRW])(?P<osm_id>\d+)(?::(?P<osm_class>\S+))?'),
354 converters={'osm_id': int})
355 def check_place_missing_lines(db_conn, table, osm_type, osm_id, osm_class):
356 sql = pysql.SQL("""SELECT count(*) FROM {}
357 WHERE osm_type = %s and osm_id = %s""").format(pysql.Identifier(table))
358 params = [osm_type, int(osm_id)]
360 sql += pysql.SQL(' AND class = %s')
361 params.append(osm_class)
363 with db_conn.cursor() as cur:
364 assert cur.execute(sql, params).fetchone()[0] == 0