]> git.openstreetmap.org Git - nominatim.git/blob - test/bdd/conftest.py
add BDD tests for DB
[nominatim.git] / test / bdd / conftest.py
1 # SPDX-License-Identifier: GPL-3.0-or-later
2 #
3 # This file is part of Nominatim. (https://nominatim.org)
4 #
5 # Copyright (C) 2025 by the Nominatim developer community.
6 # For a full list of authors see the git log.
7 """
8 Fixtures for BDD test steps
9 """
10 import sys
11 import json
12 from pathlib import Path
13
14 import psycopg
15 from psycopg import sql as pysql
16
17 # always test against the source
18 SRC_DIR = (Path(__file__) / '..' / '..' / '..').resolve()
19 sys.path.insert(0, str(SRC_DIR / 'src'))
20
21 import pytest
22 from pytest_bdd.parsers import re as step_parse
23 from pytest_bdd import given, when, then
24
25 pytest.register_assert_rewrite('utils')
26
27 from utils.api_runner import APIRunner
28 from utils.api_result import APIResult
29 from utils.checks import ResultAttr, COMPARATOR_TERMS
30 from utils.geometry_alias import ALIASES
31 from utils.grid import Grid
32 from utils.db import DBManager
33
34 from nominatim_db.config import Configuration
35 from nominatim_db.data.country_info import setup_country_config
36
37
38 def _strlist(inp):
39     return [s.strip() for s in inp.split(',')]
40
41
42 def _pretty_json(inp):
43     return json.dumps(inp, indent=2)
44
45
46 def pytest_addoption(parser, pluginmanager):
47     parser.addoption('--nominatim-purge', dest='NOMINATIM_PURGE', action='store_true',
48                      help='Force recreation of test databases from scratch.')
49     parser.addoption('--nominatim-keep-db', dest='NOMINATIM_KEEP_DB', action='store_true',
50                      help='Do not drop the database after tests are finished.')
51     parser.addoption('--nominatim-api-engine', dest='NOMINATIM_API_ENGINE',
52                      default='falcon',
53                      help='Chose the API engine to use when sending requests.')
54     parser.addoption('--nominatim-tokenizer', dest='NOMINATIM_TOKENIZER',
55                      metavar='TOKENIZER',
56                      help='Use the specified tokenizer for importing data into '
57                           'a Nominatim database.')
58
59     parser.addini('nominatim_test_db', default='test_nominatim',
60                   help='Name of the database used for running a single test.')
61     parser.addini('nominatim_api_test_db', default='test_api_nominatim',
62                   help='Name of the database for storing API test data.')
63     parser.addini('nominatim_template_db', default='test_template_nominatim',
64                   help='Name of database used as a template for test databases.')
65
66
67 @pytest.fixture
68 def datatable():
69     """ Default fixture for datatables, so that their presence can be optional.
70     """
71     return None
72
73
74 @pytest.fixture
75 def node_grid():
76     """ Default fixture for node grids. Nothing set.
77     """
78     return Grid([[]], None, None)
79
80
81 @pytest.fixture(scope='session', autouse=True)
82 def setup_country_info():
83     setup_country_config(Configuration(None))
84
85
86 @pytest.fixture(scope='session')
87 def template_db(pytestconfig):
88     """ Create a template database containing the extensions and base data
89         needed by Nominatim. Using the template instead of doing the full
90         setup can speed up the tests.
91
92         The template database will only be created if it does not exist yet
93         or a purge has been explicitly requested.
94     """
95     dbm = DBManager(purge=pytestconfig.option.NOMINATIM_PURGE)
96
97     template_db = pytestconfig.getini('nominatim_template_db')
98
99     template_config = Configuration(
100         None, environ={'NOMINATIM_DATABASE_DSN': f"pgsql:dbname={template_db}"})
101
102     dbm.setup_template_db(template_config)
103
104     return template_db
105
106
107 @pytest.fixture
108 def def_config(pytestconfig):
109     dbname = pytestconfig.getini('nominatim_test_db')
110
111     return Configuration(None,
112                          environ={'NOMINATIM_DATABASE_DSN': f"pgsql:dbname={dbname}"})
113
114
115 @pytest.fixture
116 def db(template_db, pytestconfig):
117     """ Set up an empty database for use with osm2pgsql.
118     """
119     dbm = DBManager(purge=pytestconfig.option.NOMINATIM_PURGE)
120
121     dbname = pytestconfig.getini('nominatim_test_db')
122
123     dbm.create_db_from_template(dbname, template_db)
124
125     yield dbname
126
127     if not pytestconfig.option.NOMINATIM_KEEP_DB:
128         dbm.drop_db(dbname)
129
130
131 @pytest.fixture
132 def db_conn(db, def_config):
133     with psycopg.connect(def_config.get_libpq_dsn()) as conn:
134         info = psycopg.types.TypeInfo.fetch(conn, "hstore")
135         psycopg.types.hstore.register_hstore(info, conn)
136         yield conn
137
138
139 @when(step_parse(r'reverse geocoding (?P<lat>[\d.-]*),(?P<lon>[\d.-]*)'),
140       target_fixture='nominatim_result')
141 def reverse_geocode_via_api(test_config_env, pytestconfig, datatable, lat, lon):
142     runner = APIRunner(test_config_env, pytestconfig.option.NOMINATIM_API_ENGINE)
143     api_response = runner.run_step('reverse',
144                                    {'lat': float(lat), 'lon': float(lon)},
145                                    datatable, 'jsonv2', {})
146
147     assert api_response.status == 200
148     assert api_response.headers['content-type'] == 'application/json; charset=utf-8'
149
150     result = APIResult('json', 'reverse', api_response.body)
151     assert result.is_simple()
152
153     return result
154
155
156 @when(step_parse(r'reverse geocoding at node (?P<node>[\d]+)'),
157       target_fixture='nominatim_result')
158 def reverse_geocode_via_api_and_grid(test_config_env, pytestconfig, node_grid, datatable, node):
159     coords = node_grid.get(node)
160     if coords is None:
161         raise ValueError('Unknown node id')
162     runner = APIRunner(test_config_env, pytestconfig.option.NOMINATIM_API_ENGINE)
163     api_response = runner.run_step('reverse',
164                                    {'lat': coords[1], 'lon': coords[0]},
165                                    datatable, 'jsonv2', {})
166
167     assert api_response.status == 200
168     assert api_response.headers['content-type'] == 'application/json; charset=utf-8'
169
170     result = APIResult('json', 'reverse', api_response.body)
171     assert result.is_simple()
172
173     result.result['centroid'] = f"POINT({result.result['lon']:.7f} {result.result['lat']:.7f})"
174
175     return result
176
177
178 @when(step_parse(r'geocoding(?: "(?P<query>.*)")?'),
179       target_fixture='nominatim_result')
180 def forward_geocode_via_api(test_config_env, pytestconfig, datatable, query):
181     runner = APIRunner(test_config_env, pytestconfig.option.NOMINATIM_API_ENGINE)
182
183     params = {'addressdetails': '1'}
184     if query:
185         params['q'] = query
186
187     api_response = runner.run_step('search', params, datatable, 'jsonv2', {})
188
189     assert api_response.status == 200
190     assert api_response.headers['content-type'] == 'application/json; charset=utf-8'
191
192     result = APIResult('json', 'search', api_response.body)
193     assert not result.is_simple()
194
195     for res in result.result:
196         res['centroid'] = f"POINT({res['lon']:.7f} {res['lat']:.7f})"
197
198     return result
199
200
201 @then(step_parse(r'(?P<op>[a-z ]+) (?P<num>\d+) results? (?:are|is) returned'),
202       converters={'num': int})
203 def check_number_of_results(nominatim_result, op, num):
204     assert not nominatim_result.is_simple()
205     assert COMPARATOR_TERMS[op](num, len(nominatim_result))
206
207
208 @then(step_parse('the result metadata contains'))
209 def check_metadata_for_fields(nominatim_result, datatable):
210     if datatable[0] == ['param', 'value']:
211         pairs = datatable[1:]
212     else:
213         pairs = zip(datatable[0], datatable[1])
214
215     for k, v in pairs:
216         assert ResultAttr(nominatim_result.meta, k) == v
217
218
219 @then(step_parse('the result metadata has no attributes (?P<attributes>.*)'),
220       converters={'attributes': _strlist})
221 def check_metadata_for_field_presence(nominatim_result, attributes):
222     assert all(a not in nominatim_result.meta for a in attributes), \
223         f"Unexpectedly have one of the attributes '{attributes}' in\n" \
224         f"{_pretty_json(nominatim_result.meta)}"
225
226
227 @then(step_parse(r'the result contains(?: in field (?P<field>\S+))?'))
228 def check_result_for_fields(nominatim_result, datatable, node_grid, field):
229     assert nominatim_result.is_simple()
230
231     if datatable[0] == ['param', 'value']:
232         pairs = datatable[1:]
233     else:
234         pairs = zip(datatable[0], datatable[1])
235
236     prefix = field + '+' if field else ''
237
238     for k, v in pairs:
239         assert ResultAttr(nominatim_result.result, prefix + k, grid=node_grid) == v
240
241
242 @then(step_parse('the result has attributes (?P<attributes>.*)'),
243       converters={'attributes': _strlist})
244 def check_result_for_field_presence(nominatim_result, attributes):
245     assert nominatim_result.is_simple()
246     assert all(a in nominatim_result.result for a in attributes)
247
248
249 @then(step_parse('the result has no attributes (?P<attributes>.*)'),
250       converters={'attributes': _strlist})
251 def check_result_for_field_absence(nominatim_result, attributes):
252     assert nominatim_result.is_simple()
253     assert all(a not in nominatim_result.result for a in attributes)
254
255
256 @then(step_parse('the result set contains(?P<exact> exactly)?'))
257 def check_result_list_match(nominatim_result, datatable, exact):
258     assert not nominatim_result.is_simple()
259
260     result_set = set(range(len(nominatim_result.result)))
261
262     for row in datatable[1:]:
263         for idx in result_set:
264             for key, value in zip(datatable[0], row):
265                 if ResultAttr(nominatim_result.result[idx], key) != value:
266                     break
267             else:
268                 # found a match
269                 result_set.remove(idx)
270                 break
271         else:
272             assert False, f"Missing data row {row}. Full response:\n{nominatim_result}"
273
274     if exact:
275         assert not [nominatim_result.result[i] for i in result_set]
276
277
278 @then(step_parse('all results have attributes (?P<attributes>.*)'),
279       converters={'attributes': _strlist})
280 def check_all_results_for_field_presence(nominatim_result, attributes):
281     assert not nominatim_result.is_simple()
282     assert len(nominatim_result) > 0
283     for res in nominatim_result.result:
284         assert all(a in res for a in attributes), \
285             f"Missing one of the attributes '{attributes}' in\n{_pretty_json(res)}"
286
287
288 @then(step_parse('all results have no attributes (?P<attributes>.*)'),
289       converters={'attributes': _strlist})
290 def check_all_result_for_field_absence(nominatim_result, attributes):
291     assert not nominatim_result.is_simple()
292     assert len(nominatim_result) > 0
293     for res in nominatim_result.result:
294         assert all(a not in res for a in attributes), \
295             f"Unexpectedly have one of the attributes '{attributes}' in\n{_pretty_json(res)}"
296
297
298 @then(step_parse(r'all results contain(?: in field (?P<field>\S+))?'))
299 def check_all_results_contain(nominatim_result, datatable, node_grid, field):
300     assert not nominatim_result.is_simple()
301     assert len(nominatim_result) > 0
302
303     if datatable[0] == ['param', 'value']:
304         pairs = datatable[1:]
305     else:
306         pairs = zip(datatable[0], datatable[1])
307
308     prefix = field + '+' if field else ''
309
310     for k, v in pairs:
311         for r in nominatim_result.result:
312             assert ResultAttr(r, prefix + k, grid=node_grid) == v
313
314
315 @then(step_parse(r'result (?P<num>\d+) contains(?: in field (?P<field>\S+))?'),
316       converters={'num': int})
317 def check_specific_result_for_fields(nominatim_result, datatable, num, field):
318     assert not nominatim_result.is_simple()
319     assert len(nominatim_result) > num
320
321     if datatable[0] == ['param', 'value']:
322         pairs = datatable[1:]
323     else:
324         pairs = zip(datatable[0], datatable[1])
325
326     prefix = field + '+' if field else ''
327
328     for k, v in pairs:
329         assert ResultAttr(nominatim_result.result[num], prefix + k) == v
330
331
332 @given(step_parse(r'the (?P<step>[0-9.]+ )?grid(?: with origin (?P<origin>.*))?'),
333        target_fixture='node_grid')
334 def set_node_grid(datatable, step, origin):
335     if step is not None:
336         step = float(step)
337
338     if origin:
339         if ',' in origin:
340             coords = origin.split(',')
341             if len(coords) != 2:
342                 raise RuntimeError('Grid origin expects origin with x,y coordinates.')
343             origin = list(map(float, coords))
344         elif origin in ALIASES:
345             origin = ALIASES[origin]
346         else:
347             raise RuntimeError('Grid origin must be either coordinate or alias.')
348
349     return Grid(datatable, step, origin)
350
351
352 @then(step_parse('(?P<table>placex?) has no entry for '
353                  r'(?P<osm_type>[NRW])(?P<osm_id>\d+)(?::(?P<osm_class>\S+))?'),
354       converters={'osm_id': int})
355 def check_place_missing_lines(db_conn, table, osm_type, osm_id, osm_class):
356     sql = pysql.SQL("""SELECT count(*) FROM {}
357                        WHERE osm_type = %s and osm_id = %s""").format(pysql.Identifier(table))
358     params = [osm_type, int(osm_id)]
359     if osm_class:
360         sql += pysql.SQL(' AND class = %s')
361         params.append(osm_class)
362
363     with db_conn.cursor() as cur:
364         assert cur.execute(sql, params).fetchone()[0] == 0