1 # SPDX-License-Identifier: GPL-3.0-or-later
 
   3 # This file is part of Nominatim. (https://nominatim.org)
 
   5 # Copyright (C) 2025 by the Nominatim developer community.
 
   6 # For a full list of authors see the git log.
 
   8 Fixtures for BDD test steps
 
  12 from pathlib import Path
 
  15 from psycopg import sql as pysql
 
  17 # always test against the source
 
  18 SRC_DIR = (Path(__file__) / '..' / '..' / '..').resolve()
 
  19 sys.path.insert(0, str(SRC_DIR / 'src'))
 
  22 from pytest_bdd.parsers import re as step_parse
 
  23 from pytest_bdd import given, when, then
 
  25 pytest.register_assert_rewrite('utils')
 
  27 from utils.api_runner import APIRunner
 
  28 from utils.api_result import APIResult
 
  29 from utils.checks import ResultAttr, COMPARATOR_TERMS
 
  30 from utils.geometry_alias import ALIASES
 
  31 from utils.grid import Grid
 
  32 from utils.db import DBManager
 
  34 from nominatim_db.config import Configuration
 
  35 from nominatim_db.data.country_info import setup_country_config
 
  39     return [s.strip() for s in inp.split(',')]
 
  42 def _pretty_json(inp):
 
  43     return json.dumps(inp, indent=2)
 
  46 def pytest_addoption(parser, pluginmanager):
 
  47     parser.addoption('--nominatim-purge', dest='NOMINATIM_PURGE', action='store_true',
 
  48                      help='Force recreation of test databases from scratch.')
 
  49     parser.addoption('--nominatim-keep-db', dest='NOMINATIM_KEEP_DB', action='store_true',
 
  50                      help='Do not drop the database after tests are finished.')
 
  51     parser.addoption('--nominatim-api-engine', dest='NOMINATIM_API_ENGINE',
 
  53                      help='Chose the API engine to use when sending requests.')
 
  54     parser.addoption('--nominatim-tokenizer', dest='NOMINATIM_TOKENIZER',
 
  56                      help='Use the specified tokenizer for importing data into '
 
  57                           'a Nominatim database.')
 
  59     parser.addini('nominatim_test_db', default='test_nominatim',
 
  60                   help='Name of the database used for running a single test.')
 
  61     parser.addini('nominatim_api_test_db', default='test_api_nominatim',
 
  62                   help='Name of the database for storing API test data.')
 
  63     parser.addini('nominatim_template_db', default='test_template_nominatim',
 
  64                   help='Name of database used as a template for test databases.')
 
  69     """ Default fixture for datatables, so that their presence can be optional.
 
  76     """ Default fixture for node grids. Nothing set.
 
  78     return Grid([[]], None, None)
 
  81 @pytest.fixture(scope='session', autouse=True)
 
  82 def setup_country_info():
 
  83     setup_country_config(Configuration(None))
 
  86 @pytest.fixture(scope='session')
 
  87 def template_db(pytestconfig):
 
  88     """ Create a template database containing the extensions and base data
 
  89         needed by Nominatim. Using the template instead of doing the full
 
  90         setup can speed up the tests.
 
  92         The template database will only be created if it does not exist yet
 
  93         or a purge has been explicitly requested.
 
  95     dbm = DBManager(purge=pytestconfig.option.NOMINATIM_PURGE)
 
  97     template_db = pytestconfig.getini('nominatim_template_db')
 
  99     template_config = Configuration(
 
 100         None, environ={'NOMINATIM_DATABASE_DSN': f"pgsql:dbname={template_db}"})
 
 102     dbm.setup_template_db(template_config)
 
 108 def def_config(pytestconfig):
 
 109     dbname = pytestconfig.getini('nominatim_test_db')
 
 111     return Configuration(None,
 
 112                          environ={'NOMINATIM_DATABASE_DSN': f"pgsql:dbname={dbname}"})
 
 116 def db(template_db, pytestconfig):
 
 117     """ Set up an empty database for use with osm2pgsql.
 
 119     dbm = DBManager(purge=pytestconfig.option.NOMINATIM_PURGE)
 
 121     dbname = pytestconfig.getini('nominatim_test_db')
 
 123     dbm.create_db_from_template(dbname, template_db)
 
 127     if not pytestconfig.option.NOMINATIM_KEEP_DB:
 
 132 def db_conn(db, def_config):
 
 133     with psycopg.connect(def_config.get_libpq_dsn()) as conn:
 
 134         info = psycopg.types.TypeInfo.fetch(conn, "hstore")
 
 135         psycopg.types.hstore.register_hstore(info, conn)
 
 139 @when(step_parse(r'reverse geocoding (?P<lat>[\d.-]*),(?P<lon>[\d.-]*)'),
 
 140       target_fixture='nominatim_result')
 
 141 def reverse_geocode_via_api(test_config_env, pytestconfig, datatable, lat, lon):
 
 142     runner = APIRunner(test_config_env, pytestconfig.option.NOMINATIM_API_ENGINE)
 
 143     api_response = runner.run_step('reverse',
 
 144                                    {'lat': float(lat), 'lon': float(lon)},
 
 145                                    datatable, 'jsonv2', {})
 
 147     assert api_response.status == 200
 
 148     assert api_response.headers['content-type'] == 'application/json; charset=utf-8'
 
 150     result = APIResult('json', 'reverse', api_response.body)
 
 151     assert result.is_simple()
 
 153     assert isinstance(result.result['lat'], str)
 
 154     assert isinstance(result.result['lon'], str)
 
 155     result.result['centroid'] = f"POINT({result.result['lon']} {result.result['lat']})"
 
 160 @when(step_parse(r'reverse geocoding at node (?P<node>[\d]+)'),
 
 161       target_fixture='nominatim_result')
 
 162 def reverse_geocode_via_api_and_grid(test_config_env, pytestconfig, node_grid, datatable, node):
 
 163     coords = node_grid.get(node)
 
 165         raise ValueError('Unknown node id')
 
 167     return reverse_geocode_via_api(test_config_env, pytestconfig, datatable, coords[1], coords[0])
 
 170 @when(step_parse(r'geocoding(?: "(?P<query>.*)")?'),
 
 171       target_fixture='nominatim_result')
 
 172 def forward_geocode_via_api(test_config_env, pytestconfig, datatable, query):
 
 173     runner = APIRunner(test_config_env, pytestconfig.option.NOMINATIM_API_ENGINE)
 
 175     params = {'addressdetails': '1'}
 
 179     api_response = runner.run_step('search', params, datatable, 'jsonv2', {})
 
 181     assert api_response.status == 200
 
 182     assert api_response.headers['content-type'] == 'application/json; charset=utf-8'
 
 184     result = APIResult('json', 'search', api_response.body)
 
 185     assert not result.is_simple()
 
 187     for res in result.result:
 
 188         assert isinstance(res['lat'], str)
 
 189         assert isinstance(res['lon'], str)
 
 190         res['centroid'] = f"POINT({res['lon']} {res['lat']})"
 
 195 @then(step_parse(r'(?P<op>[a-z ]+) (?P<num>\d+) results? (?:are|is) returned'),
 
 196       converters={'num': int})
 
 197 def check_number_of_results(nominatim_result, op, num):
 
 198     assert not nominatim_result.is_simple()
 
 199     assert COMPARATOR_TERMS[op](num, len(nominatim_result))
 
 202 @then(step_parse('the result metadata contains'))
 
 203 def check_metadata_for_fields(nominatim_result, datatable):
 
 204     if datatable[0] == ['param', 'value']:
 
 205         pairs = datatable[1:]
 
 207         pairs = zip(datatable[0], datatable[1])
 
 210         assert ResultAttr(nominatim_result.meta, k) == v
 
 213 @then(step_parse('the result metadata has no attributes (?P<attributes>.*)'),
 
 214       converters={'attributes': _strlist})
 
 215 def check_metadata_for_field_presence(nominatim_result, attributes):
 
 216     assert all(a not in nominatim_result.meta for a in attributes), \
 
 217         f"Unexpectedly have one of the attributes '{attributes}' in\n" \
 
 218         f"{_pretty_json(nominatim_result.meta)}"
 
 221 @then(step_parse(r'the result contains(?: in field (?P<field>\S+))?'))
 
 222 def check_result_for_fields(nominatim_result, datatable, node_grid, field):
 
 223     assert nominatim_result.is_simple()
 
 225     if datatable[0] == ['param', 'value']:
 
 226         pairs = datatable[1:]
 
 228         pairs = zip(datatable[0], datatable[1])
 
 230     prefix = field + '+' if field else ''
 
 233         assert ResultAttr(nominatim_result.result, prefix + k, grid=node_grid) == v
 
 236 @then(step_parse('the result has attributes (?P<attributes>.*)'),
 
 237       converters={'attributes': _strlist})
 
 238 def check_result_for_field_presence(nominatim_result, attributes):
 
 239     assert nominatim_result.is_simple()
 
 240     assert all(a in nominatim_result.result for a in attributes)
 
 243 @then(step_parse('the result has no attributes (?P<attributes>.*)'),
 
 244       converters={'attributes': _strlist})
 
 245 def check_result_for_field_absence(nominatim_result, attributes):
 
 246     assert nominatim_result.is_simple()
 
 247     assert all(a not in nominatim_result.result for a in attributes)
 
 250 @then(step_parse('the result set contains(?P<exact> exactly)?'))
 
 251 def check_result_list_match(nominatim_result, datatable, exact):
 
 252     assert not nominatim_result.is_simple()
 
 254     result_set = set(range(len(nominatim_result.result)))
 
 256     for row in datatable[1:]:
 
 257         for idx in result_set:
 
 258             for key, value in zip(datatable[0], row):
 
 259                 if ResultAttr(nominatim_result.result[idx], key) != value:
 
 263                 result_set.remove(idx)
 
 266             assert False, f"Missing data row {row}. Full response:\n{nominatim_result}"
 
 269         assert not [nominatim_result.result[i] for i in result_set]
 
 272 @then(step_parse('all results have attributes (?P<attributes>.*)'),
 
 273       converters={'attributes': _strlist})
 
 274 def check_all_results_for_field_presence(nominatim_result, attributes):
 
 275     assert not nominatim_result.is_simple()
 
 276     assert len(nominatim_result) > 0
 
 277     for res in nominatim_result.result:
 
 278         assert all(a in res for a in attributes), \
 
 279             f"Missing one of the attributes '{attributes}' in\n{_pretty_json(res)}"
 
 282 @then(step_parse('all results have no attributes (?P<attributes>.*)'),
 
 283       converters={'attributes': _strlist})
 
 284 def check_all_result_for_field_absence(nominatim_result, attributes):
 
 285     assert not nominatim_result.is_simple()
 
 286     assert len(nominatim_result) > 0
 
 287     for res in nominatim_result.result:
 
 288         assert all(a not in res for a in attributes), \
 
 289             f"Unexpectedly have one of the attributes '{attributes}' in\n{_pretty_json(res)}"
 
 292 @then(step_parse(r'all results contain(?: in field (?P<field>\S+))?'))
 
 293 def check_all_results_contain(nominatim_result, datatable, node_grid, field):
 
 294     assert not nominatim_result.is_simple()
 
 295     assert len(nominatim_result) > 0
 
 297     if datatable[0] == ['param', 'value']:
 
 298         pairs = datatable[1:]
 
 300         pairs = zip(datatable[0], datatable[1])
 
 302     prefix = field + '+' if field else ''
 
 305         for r in nominatim_result.result:
 
 306             assert ResultAttr(r, prefix + k, grid=node_grid) == v
 
 309 @then(step_parse(r'result (?P<num>\d+) contains(?: in field (?P<field>\S+))?'),
 
 310       converters={'num': int})
 
 311 def check_specific_result_for_fields(nominatim_result, datatable, num, field):
 
 312     assert not nominatim_result.is_simple()
 
 313     assert len(nominatim_result) > num
 
 315     if datatable[0] == ['param', 'value']:
 
 316         pairs = datatable[1:]
 
 318         pairs = zip(datatable[0], datatable[1])
 
 320     prefix = field + '+' if field else ''
 
 323         assert ResultAttr(nominatim_result.result[num], prefix + k) == v
 
 326 @given(step_parse(r'the (?P<step>[0-9.]+ )?grid(?: with origin (?P<origin>.*))?'),
 
 327        target_fixture='node_grid')
 
 328 def set_node_grid(datatable, step, origin):
 
 334             coords = origin.split(',')
 
 336                 raise RuntimeError('Grid origin expects origin with x,y coordinates.')
 
 337             origin = list(map(float, coords))
 
 338         elif origin in ALIASES:
 
 339             origin = ALIASES[origin]
 
 341             raise RuntimeError('Grid origin must be either coordinate or alias.')
 
 343     return Grid(datatable, step, origin)
 
 346 @then(step_parse('(?P<table>placex?) has no entry for '
 
 347                  r'(?P<osm_type>[NRW])(?P<osm_id>\d+)(?::(?P<osm_class>\S+))?'),
 
 348       converters={'osm_id': int})
 
 349 def check_place_missing_lines(db_conn, table, osm_type, osm_id, osm_class):
 
 350     sql = pysql.SQL("""SELECT count(*) FROM {}
 
 351                        WHERE osm_type = %s and osm_id = %s""").format(pysql.Identifier(table))
 
 352     params = [osm_type, int(osm_id)]
 
 354         sql += pysql.SQL(' AND class = %s')
 
 355         params.append(osm_class)
 
 357     with db_conn.cursor() as cur:
 
 358         assert cur.execute(sql, params).fetchone()[0] == 0