]> git.openstreetmap.org Git - nominatim.git/blob - test/bdd/conftest.py
move database setup to generic conftest.py
[nominatim.git] / test / bdd / conftest.py
1 # SPDX-License-Identifier: GPL-3.0-or-later
2 #
3 # This file is part of Nominatim. (https://nominatim.org)
4 #
5 # Copyright (C) 2025 by the Nominatim developer community.
6 # For a full list of authors see the git log.
7 """
8 Fixtures for BDD test steps
9 """
10 import sys
11 import json
12 from pathlib import Path
13
14 import psycopg
15
16 # always test against the source
17 SRC_DIR = (Path(__file__) / '..' / '..' / '..').resolve()
18 sys.path.insert(0, str(SRC_DIR / 'src'))
19
20 import pytest
21 from pytest_bdd.parsers import re as step_parse
22 from pytest_bdd import given, when, then
23
24 pytest.register_assert_rewrite('utils')
25
26 from utils.api_runner import APIRunner
27 from utils.api_result import APIResult
28 from utils.checks import ResultAttr, COMPARATOR_TERMS, check_table_content, check_table_has_lines
29 from utils.geometry_alias import ALIASES
30 from utils.grid import Grid
31 from utils.db import DBManager
32
33 from nominatim_db.config import Configuration
34 from nominatim_db import cli
35
36
37 def _strlist(inp):
38     return [s.strip() for s in inp.split(',')]
39
40
41 def _pretty_json(inp):
42     return json.dumps(inp, indent=2)
43
44
45 def pytest_addoption(parser, pluginmanager):
46     parser.addoption('--nominatim-purge', dest='NOMINATIM_PURGE', action='store_true',
47                      help='Force recreation of test databases from scratch.')
48     parser.addoption('--nominatim-keep-db', dest='NOMINATIM_KEEP_DB', action='store_true',
49                      help='Do not drop the database after tests are finished.')
50     parser.addoption('--nominatim-api-engine', dest='NOMINATIM_API_ENGINE',
51                      default='falcon',
52                      help='Chose the API engine to use when sending requests.')
53     parser.addoption('--nominatim-tokenizer', dest='NOMINATIM_TOKENIZER',
54                      metavar='TOKENIZER',
55                      help='Use the specified tokenizer for importing data into '
56                           'a Nominatim database.')
57
58     parser.addini('nominatim_test_db', default='test_nominatim',
59                   help='Name of the database used for running a single test.')
60     parser.addini('nominatim_api_test_db', default='test_api_nominatim',
61                   help='Name of the database for storing API test data.')
62     parser.addini('nominatim_template_db', default='test_template_nominatim',
63                   help='Name of database used as a template for test databases.')
64
65
66 @pytest.fixture
67 def datatable():
68     """ Default fixture for datatables, so that their presence can be optional.
69     """
70     return None
71
72
73 @pytest.fixture
74 def node_grid():
75     """ Default fixture for node grids. Nothing set.
76     """
77     return Grid([[]], None, None)
78
79
80 @pytest.fixture(scope='session')
81 def template_db(pytestconfig):
82     """ Create a template database containing the extensions and base data
83         needed by Nominatim. Using the template instead of doing the full
84         setup can speed up the tests.
85
86         The template database will only be created if it does not exist yet
87         or a purge has been explicitly requested.
88     """
89     dbm = DBManager(purge=pytestconfig.option.NOMINATIM_PURGE)
90
91     template_db = pytestconfig.getini('nominatim_template_db')
92
93     template_config = Configuration(
94         None, environ={'NOMINATIM_DATABASE_DSN': f"pgsql:dbname={template_db}"})
95
96     dbm.setup_template_db(template_config)
97
98     return template_db
99
100
101 @pytest.fixture
102 def def_config(pytestconfig):
103     dbname = pytestconfig.getini('nominatim_test_db')
104
105     return Configuration(None,
106                          environ={'NOMINATIM_DATABASE_DSN': f"pgsql:dbname={dbname}"})
107
108
109 @pytest.fixture
110 def db(template_db, pytestconfig):
111     """ Set up an empty database for use with osm2pgsql.
112     """
113     dbm = DBManager(purge=pytestconfig.option.NOMINATIM_PURGE)
114
115     dbname = pytestconfig.getini('nominatim_test_db')
116
117     dbm.create_db_from_template(dbname, template_db)
118
119     yield dbname
120
121     if not pytestconfig.option.NOMINATIM_KEEP_DB:
122         dbm.drop_db(dbname)
123
124
125 @pytest.fixture
126 def db_conn(def_config):
127     with psycopg.connect(def_config.get_libpq_dsn()) as conn:
128         info = psycopg.types.TypeInfo.fetch(conn, "hstore")
129         psycopg.types.hstore.register_hstore(info, conn)
130         yield conn
131
132
133 @when(step_parse(r'reverse geocoding (?P<lat>[\d.-]*),(?P<lon>[\d.-]*)'),
134       target_fixture='nominatim_result')
135 def reverse_geocode_via_api(test_config_env, pytestconfig, datatable, lat, lon):
136     runner = APIRunner(test_config_env, pytestconfig.option.NOMINATIM_API_ENGINE)
137     api_response = runner.run_step('reverse',
138                                    {'lat': float(lat), 'lon': float(lon)},
139                                    datatable, 'jsonv2', {})
140
141     assert api_response.status == 200
142     assert api_response.headers['content-type'] == 'application/json; charset=utf-8'
143
144     result = APIResult('json', 'reverse', api_response.body)
145     assert result.is_simple()
146
147     return result
148
149
150 @when(step_parse(r'geocoding(?: "(?P<query>.*)")?'),
151       target_fixture='nominatim_result')
152 def forward_geocode_via_api(test_config_env, pytestconfig, datatable, query):
153     runner = APIRunner(test_config_env, pytestconfig.option.NOMINATIM_API_ENGINE)
154
155     params = {'addressdetails': '1'}
156     if query:
157         params['q'] = query
158
159     api_response = runner.run_step('search', params, datatable, 'jsonv2', {})
160
161     assert api_response.status == 200
162     assert api_response.headers['content-type'] == 'application/json; charset=utf-8'
163
164     result = APIResult('json', 'search', api_response.body)
165     assert not result.is_simple()
166
167     return result
168
169
170 @then(step_parse(r'(?P<op>[a-z ]+) (?P<num>\d+) results? (?:are|is) returned'),
171       converters={'num': int})
172 def check_number_of_results(nominatim_result, op, num):
173     assert not nominatim_result.is_simple()
174     assert COMPARATOR_TERMS[op](num, len(nominatim_result))
175
176
177 @then(step_parse('the result metadata contains'))
178 def check_metadata_for_fields(nominatim_result, datatable):
179     if datatable[0] == ['param', 'value']:
180         pairs = datatable[1:]
181     else:
182         pairs = zip(datatable[0], datatable[1])
183
184     for k, v in pairs:
185         assert ResultAttr(nominatim_result.meta, k) == v
186
187
188 @then(step_parse('the result metadata has no attributes (?P<attributes>.*)'),
189       converters={'attributes': _strlist})
190 def check_metadata_for_field_presence(nominatim_result, attributes):
191     assert all(a not in nominatim_result.meta for a in attributes), \
192         f"Unexpectedly have one of the attributes '{attributes}' in\n" \
193         f"{_pretty_json(nominatim_result.meta)}"
194
195
196 @then(step_parse(r'the result contains(?: in field (?P<field>\S+))?'))
197 def check_result_for_fields(nominatim_result, datatable, field):
198     assert nominatim_result.is_simple()
199
200     if datatable[0] == ['param', 'value']:
201         pairs = datatable[1:]
202     else:
203         pairs = zip(datatable[0], datatable[1])
204
205     prefix = field + '+' if field else ''
206
207     for k, v in pairs:
208         assert ResultAttr(nominatim_result.result, prefix + k) == v
209
210
211 @then(step_parse('the result has attributes (?P<attributes>.*)'),
212       converters={'attributes': _strlist})
213 def check_result_for_field_presence(nominatim_result, attributes):
214     assert nominatim_result.is_simple()
215     assert all(a in nominatim_result.result for a in attributes)
216
217
218 @then(step_parse('the result has no attributes (?P<attributes>.*)'),
219       converters={'attributes': _strlist})
220 def check_result_for_field_absence(nominatim_result, attributes):
221     assert nominatim_result.is_simple()
222     assert all(a not in nominatim_result.result for a in attributes)
223
224
225 @then(step_parse('the result set contains(?P<exact> exactly)?'))
226 def check_result_list_match(nominatim_result, datatable, exact):
227     assert not nominatim_result.is_simple()
228
229     result_set = set(range(len(nominatim_result.result)))
230
231     for row in datatable[1:]:
232         for idx in result_set:
233             for key, value in zip(datatable[0], row):
234                 if ResultAttr(nominatim_result.result[idx], key) != value:
235                     break
236             else:
237                 # found a match
238                 result_set.remove(idx)
239                 break
240         else:
241             assert False, f"Missing data row {row}. Full response:\n{nominatim_result}"
242
243     if exact:
244         assert not [nominatim_result.result[i] for i in result_set]
245
246
247 @then(step_parse('all results have attributes (?P<attributes>.*)'),
248       converters={'attributes': _strlist})
249 def check_all_results_for_field_presence(nominatim_result, attributes):
250     assert not nominatim_result.is_simple()
251     for res in nominatim_result.result:
252         assert all(a in res for a in attributes), \
253             f"Missing one of the attributes '{attributes}' in\n{_pretty_json(res)}"
254
255
256 @then(step_parse('all results have no attributes (?P<attributes>.*)'),
257       converters={'attributes': _strlist})
258 def check_all_result_for_field_absence(nominatim_result, attributes):
259     assert not nominatim_result.is_simple()
260     for res in nominatim_result.result:
261         assert all(a not in res for a in attributes), \
262             f"Unexpectedly have one of the attributes '{attributes}' in\n{_pretty_json(res)}"
263
264
265 @then(step_parse(r'all results contain(?: in field (?P<field>\S+))?'))
266 def check_all_results_contain(nominatim_result, datatable, field):
267     assert not nominatim_result.is_simple()
268
269     if datatable[0] == ['param', 'value']:
270         pairs = datatable[1:]
271     else:
272         pairs = zip(datatable[0], datatable[1])
273
274     prefix = field + '+' if field else ''
275
276     for k, v in pairs:
277         for r in nominatim_result.result:
278             assert ResultAttr(r, prefix + k) == v
279
280
281 @then(step_parse(r'result (?P<num>\d+) contains(?: in field (?P<field>\S+))?'),
282       converters={'num': int})
283 def check_specific_result_for_fields(nominatim_result, datatable, num, field):
284     assert not nominatim_result.is_simple()
285     assert len(nominatim_result) >= num + 1
286
287     if datatable[0] == ['param', 'value']:
288         pairs = datatable[1:]
289     else:
290         pairs = zip(datatable[0], datatable[1])
291
292     prefix = field + '+' if field else ''
293
294     for k, v in pairs:
295         assert ResultAttr(nominatim_result.result[num], prefix + k) == v
296
297
298 @given(step_parse(r'the (?P<step>[0-9.]+ )?grid(?: with origin (?P<origin>.*))?'),
299        target_fixture='node_grid')
300 def set_node_grid(datatable, step, origin):
301     if step is not None:
302         step = float(step)
303
304     if origin:
305         if ',' in origin:
306             coords = origin.split(',')
307             if len(coords) != 2:
308                 raise RuntimeError('Grid origin expects origin with x,y coordinates.')
309             origin = list(map(float, coords))
310         elif origin in ALIASES:
311             origin = ALIASES[origin]
312         else:
313             raise RuntimeError('Grid origin must be either coordinate or alias.')
314
315     return Grid(datatable, step, origin)
316
317
318 @when('indexing')
319 def do_index(def_config):
320     """ Run Nominatim's indexing step.
321     """
322     cli.nominatim(['index'], def_config.environ)
323
324
325 @then(step_parse(r'(?P<table>\w+) contains(?P<exact> exactly)?'))
326 def check_place_content(db_conn, datatable, node_grid, table, exact):
327     check_table_content(db_conn, table, datatable, grid=node_grid, exact=bool(exact))
328
329
330 @then(step_parse('(?P<table>placex?) has no entry for '
331                  r'(?P<osm_type>[NRW])(?P<osm_id>\d+)(?::(?P<osm_class>\S+))?'),
332       converters={'osm_id': int})
333 def check_place_missing_lines(db_conn, table, osm_type, osm_id, osm_class):
334     check_table_has_lines(db_conn, table, osm_type, osm_id, osm_class)