]> git.openstreetmap.org Git - nominatim.git/blob - test/bdd/conftest.py
Merge pull request #3833 from lonvia/rework-logging
[nominatim.git] / test / bdd / conftest.py
1 # SPDX-License-Identifier: GPL-3.0-or-later
2 #
3 # This file is part of Nominatim. (https://nominatim.org)
4 #
5 # Copyright (C) 2025 by the Nominatim developer community.
6 # For a full list of authors see the git log.
7 """
8 Fixtures for BDD test steps
9 """
10 import sys
11 import json
12 from pathlib import Path
13
14 import psycopg
15 from psycopg import sql as pysql
16
17 # always test against the source
18 SRC_DIR = (Path(__file__) / '..' / '..' / '..').resolve()
19 sys.path.insert(0, str(SRC_DIR / 'src'))
20
21 import pytest
22 from pytest_bdd.parsers import re as step_parse
23 from pytest_bdd import given, when, then
24
25 pytest.register_assert_rewrite('utils')
26
27 from utils.api_runner import APIRunner
28 from utils.api_result import APIResult
29 from utils.checks import ResultAttr, COMPARATOR_TERMS
30 from utils.geometry_alias import ALIASES
31 from utils.grid import Grid
32 from utils.db import DBManager
33
34 from nominatim_db.config import Configuration
35 from nominatim_db.data.country_info import setup_country_config
36
37
38 def _strlist(inp):
39     return [s.strip() for s in inp.split(',')]
40
41
42 def _pretty_json(inp):
43     return json.dumps(inp, indent=2)
44
45
46 def pytest_addoption(parser, pluginmanager):
47     parser.addoption('--nominatim-purge', dest='NOMINATIM_PURGE', action='store_true',
48                      help='Force recreation of test databases from scratch.')
49     parser.addoption('--nominatim-keep-db', dest='NOMINATIM_KEEP_DB', action='store_true',
50                      help='Do not drop the database after tests are finished.')
51     parser.addoption('--nominatim-api-engine', dest='NOMINATIM_API_ENGINE',
52                      default='falcon',
53                      help='Chose the API engine to use when sending requests.')
54     parser.addoption('--nominatim-tokenizer', dest='NOMINATIM_TOKENIZER',
55                      metavar='TOKENIZER',
56                      help='Use the specified tokenizer for importing data into '
57                           'a Nominatim database.')
58
59     parser.addini('nominatim_test_db', default='test_nominatim',
60                   help='Name of the database used for running a single test.')
61     parser.addini('nominatim_api_test_db', default='test_api_nominatim',
62                   help='Name of the database for storing API test data.')
63     parser.addini('nominatim_template_db', default='test_template_nominatim',
64                   help='Name of database used as a template for test databases.')
65
66
67 @pytest.fixture
68 def datatable():
69     """ Default fixture for datatables, so that their presence can be optional.
70     """
71     return None
72
73
74 @pytest.fixture
75 def node_grid():
76     """ Default fixture for node grids. Nothing set.
77     """
78     return Grid([[]], None, None)
79
80
81 @pytest.fixture(scope='session', autouse=True)
82 def setup_country_info():
83     setup_country_config(Configuration(None))
84
85
86 @pytest.fixture(scope='session')
87 def template_db(pytestconfig):
88     """ Create a template database containing the extensions and base data
89         needed by Nominatim. Using the template instead of doing the full
90         setup can speed up the tests.
91
92         The template database will only be created if it does not exist yet
93         or a purge has been explicitly requested.
94     """
95     dbm = DBManager(purge=pytestconfig.option.NOMINATIM_PURGE)
96
97     template_db = pytestconfig.getini('nominatim_template_db')
98
99     template_config = Configuration(
100         None, environ={'NOMINATIM_DATABASE_DSN': f"pgsql:dbname={template_db}"})
101
102     dbm.setup_template_db(template_config)
103
104     return template_db
105
106
107 @pytest.fixture
108 def def_config(pytestconfig):
109     dbname = pytestconfig.getini('nominatim_test_db')
110
111     return Configuration(None,
112                          environ={'NOMINATIM_DATABASE_DSN': f"pgsql:dbname={dbname}"})
113
114
115 @pytest.fixture
116 def db(template_db, pytestconfig):
117     """ Set up an empty database for use with osm2pgsql.
118     """
119     dbm = DBManager(purge=pytestconfig.option.NOMINATIM_PURGE)
120
121     dbname = pytestconfig.getini('nominatim_test_db')
122
123     dbm.create_db_from_template(dbname, template_db)
124
125     yield dbname
126
127     if not pytestconfig.option.NOMINATIM_KEEP_DB:
128         dbm.drop_db(dbname)
129
130
131 @pytest.fixture
132 def db_conn(db, def_config):
133     with psycopg.connect(def_config.get_libpq_dsn()) as conn:
134         info = psycopg.types.TypeInfo.fetch(conn, "hstore")
135         psycopg.types.hstore.register_hstore(info, conn)
136         yield conn
137
138
139 @when(step_parse(r'reverse geocoding (?P<lat>[\d.-]*),(?P<lon>[\d.-]*)'),
140       target_fixture='nominatim_result')
141 def reverse_geocode_via_api(test_config_env, pytestconfig, datatable, lat, lon):
142     runner = APIRunner(test_config_env, pytestconfig.option.NOMINATIM_API_ENGINE)
143     api_response = runner.run_step('reverse',
144                                    {'lat': float(lat), 'lon': float(lon)},
145                                    datatable, 'jsonv2', {})
146
147     assert api_response.status == 200
148     assert api_response.headers['content-type'] == 'application/json; charset=utf-8'
149
150     result = APIResult('json', 'reverse', api_response.body)
151     assert result.is_simple()
152
153     assert isinstance(result.result['lat'], str)
154     assert isinstance(result.result['lon'], str)
155     result.result['centroid'] = f"POINT({result.result['lon']} {result.result['lat']})"
156
157     return result
158
159
160 @when(step_parse(r'reverse geocoding at node (?P<node>[\d]+)'),
161       target_fixture='nominatim_result')
162 def reverse_geocode_via_api_and_grid(test_config_env, pytestconfig, node_grid, datatable, node):
163     coords = node_grid.get(node)
164     if coords is None:
165         raise ValueError('Unknown node id')
166
167     return reverse_geocode_via_api(test_config_env, pytestconfig, datatable, coords[1], coords[0])
168
169
170 @when(step_parse(r'geocoding(?: "(?P<query>.*)")?'),
171       target_fixture='nominatim_result')
172 def forward_geocode_via_api(test_config_env, pytestconfig, datatable, query):
173     runner = APIRunner(test_config_env, pytestconfig.option.NOMINATIM_API_ENGINE)
174
175     params = {'addressdetails': '1'}
176     if query:
177         params['q'] = query
178
179     api_response = runner.run_step('search', params, datatable, 'jsonv2', {})
180
181     assert api_response.status == 200
182     assert api_response.headers['content-type'] == 'application/json; charset=utf-8'
183
184     result = APIResult('json', 'search', api_response.body)
185     assert not result.is_simple()
186
187     for res in result.result:
188         assert isinstance(res['lat'], str)
189         assert isinstance(res['lon'], str)
190         res['centroid'] = f"POINT({res['lon']} {res['lat']})"
191
192     return result
193
194
195 @then(step_parse(r'(?P<op>[a-z ]+) (?P<num>\d+) results? (?:are|is) returned'),
196       converters={'num': int})
197 def check_number_of_results(nominatim_result, op, num):
198     assert not nominatim_result.is_simple()
199     assert COMPARATOR_TERMS[op](num, len(nominatim_result))
200
201
202 @then(step_parse('the result metadata contains'))
203 def check_metadata_for_fields(nominatim_result, datatable):
204     if datatable[0] == ['param', 'value']:
205         pairs = datatable[1:]
206     else:
207         pairs = zip(datatable[0], datatable[1])
208
209     for k, v in pairs:
210         assert ResultAttr(nominatim_result.meta, k) == v
211
212
213 @then(step_parse('the result metadata has no attributes (?P<attributes>.*)'),
214       converters={'attributes': _strlist})
215 def check_metadata_for_field_presence(nominatim_result, attributes):
216     assert all(a not in nominatim_result.meta for a in attributes), \
217         f"Unexpectedly have one of the attributes '{attributes}' in\n" \
218         f"{_pretty_json(nominatim_result.meta)}"
219
220
221 @then(step_parse(r'the result contains(?: in field (?P<field>\S+))?'))
222 def check_result_for_fields(nominatim_result, datatable, node_grid, field):
223     assert nominatim_result.is_simple()
224
225     if datatable[0] == ['param', 'value']:
226         pairs = datatable[1:]
227     else:
228         pairs = zip(datatable[0], datatable[1])
229
230     prefix = field + '+' if field else ''
231
232     for k, v in pairs:
233         assert ResultAttr(nominatim_result.result, prefix + k, grid=node_grid) == v
234
235
236 @then(step_parse('the result has attributes (?P<attributes>.*)'),
237       converters={'attributes': _strlist})
238 def check_result_for_field_presence(nominatim_result, attributes):
239     assert nominatim_result.is_simple()
240     assert all(a in nominatim_result.result for a in attributes)
241
242
243 @then(step_parse('the result has no attributes (?P<attributes>.*)'),
244       converters={'attributes': _strlist})
245 def check_result_for_field_absence(nominatim_result, attributes):
246     assert nominatim_result.is_simple()
247     assert all(a not in nominatim_result.result for a in attributes)
248
249
250 @then(step_parse(
251     r'the result contains array field (?P<field>\S+) where element (?P<num>\d+) contains'),
252       converters={'num': int})
253 def check_result_array_field_for_attributes(nominatim_result, datatable, field, num):
254     assert nominatim_result.is_simple()
255
256     if datatable[0] == ['param', 'value']:
257         pairs = datatable[1:]
258     else:
259         pairs = zip(datatable[0], datatable[1])
260
261     prefix = f"{field}+{num}+"
262
263     for k, v in pairs:
264         assert ResultAttr(nominatim_result.result, prefix + k) == v
265
266
267 @then(step_parse('the result set contains(?P<exact> exactly)?'))
268 def check_result_list_match(nominatim_result, datatable, exact):
269     assert not nominatim_result.is_simple()
270
271     result_set = set(range(len(nominatim_result.result)))
272
273     for row in datatable[1:]:
274         for idx in result_set:
275             for key, value in zip(datatable[0], row):
276                 if ResultAttr(nominatim_result.result[idx], key) != value:
277                     break
278             else:
279                 # found a match
280                 result_set.remove(idx)
281                 break
282         else:
283             assert False, f"Missing data row {row}. Full response:\n{nominatim_result}"
284
285     if exact:
286         assert not [nominatim_result.result[i] for i in result_set]
287
288
289 @then(step_parse('all results have attributes (?P<attributes>.*)'),
290       converters={'attributes': _strlist})
291 def check_all_results_for_field_presence(nominatim_result, attributes):
292     assert not nominatim_result.is_simple()
293     assert len(nominatim_result) > 0
294     for res in nominatim_result.result:
295         assert all(a in res for a in attributes), \
296             f"Missing one of the attributes '{attributes}' in\n{_pretty_json(res)}"
297
298
299 @then(step_parse('all results have no attributes (?P<attributes>.*)'),
300       converters={'attributes': _strlist})
301 def check_all_result_for_field_absence(nominatim_result, attributes):
302     assert not nominatim_result.is_simple()
303     assert len(nominatim_result) > 0
304     for res in nominatim_result.result:
305         assert all(a not in res for a in attributes), \
306             f"Unexpectedly have one of the attributes '{attributes}' in\n{_pretty_json(res)}"
307
308
309 @then(step_parse(r'all results contain(?: in field (?P<field>\S+))?'))
310 def check_all_results_contain(nominatim_result, datatable, node_grid, field):
311     assert not nominatim_result.is_simple()
312     assert len(nominatim_result) > 0
313
314     if datatable[0] == ['param', 'value']:
315         pairs = datatable[1:]
316     else:
317         pairs = zip(datatable[0], datatable[1])
318
319     prefix = field + '+' if field else ''
320
321     for k, v in pairs:
322         for r in nominatim_result.result:
323             assert ResultAttr(r, prefix + k, grid=node_grid) == v
324
325
326 @then(step_parse(r'result (?P<num>\d+) contains(?: in field (?P<field>\S+))?'),
327       converters={'num': int})
328 def check_specific_result_for_fields(nominatim_result, datatable, num, field):
329     assert not nominatim_result.is_simple()
330     assert len(nominatim_result) > num
331
332     if datatable[0] == ['param', 'value']:
333         pairs = datatable[1:]
334     else:
335         pairs = zip(datatable[0], datatable[1])
336
337     prefix = field + '+' if field else ''
338
339     for k, v in pairs:
340         assert ResultAttr(nominatim_result.result[num], prefix + k) == v
341
342
343 @given(step_parse(r'the (?P<step>[0-9.]+ )?grid(?: with origin (?P<origin>.*))?'),
344        target_fixture='node_grid')
345 def set_node_grid(datatable, step, origin):
346     if step is not None:
347         step = float(step)
348
349     if origin:
350         if ',' in origin:
351             coords = origin.split(',')
352             if len(coords) != 2:
353                 raise RuntimeError('Grid origin expects origin with x,y coordinates.')
354             origin = list(map(float, coords))
355         elif origin in ALIASES:
356             origin = ALIASES[origin]
357         else:
358             raise RuntimeError('Grid origin must be either coordinate or alias.')
359
360     return Grid(datatable, step, origin)
361
362
363 @then(step_parse('(?P<table>placex?) has no entry for '
364                  r'(?P<osm_type>[NRW])(?P<osm_id>\d+)(?::(?P<osm_class>\S+))?'),
365       converters={'osm_id': int})
366 def check_place_missing_lines(db_conn, table, osm_type, osm_id, osm_class):
367     sql = pysql.SQL("""SELECT count(*) FROM {}
368                        WHERE osm_type = %s and osm_id = %s""").format(pysql.Identifier(table))
369     params = [osm_type, int(osm_id)]
370     if osm_class:
371         sql += pysql.SQL(' AND class = %s')
372         params.append(osm_class)
373
374     with db_conn.cursor() as cur:
375         assert cur.execute(sql, params).fetchone()[0] == 0