1 # SPDX-License-Identifier: GPL-3.0-or-later
 
   3 # This file is part of Nominatim. (https://nominatim.org)
 
   5 # Copyright (C) 2025 by the Nominatim developer community.
 
   6 # For a full list of authors see the git log.
 
   8 Collector for BDD osm2pgsql import style tests.
 
  14 from pytest_bdd import scenarios, when, then, given
 
  15 from pytest_bdd.parsers import re as step_parse
 
  17 from nominatim_db import cli
 
  18 from nominatim_db.tools.exec_utils import run_osm2pgsql
 
  19 from nominatim_db.tools.database_import import load_data, create_table_triggers
 
  20 from nominatim_db.tools.replication import run_osm2pgsql_updates
 
  22 from utils.checks import check_table_content
 
  26 def osm2pgsql_options(def_config):
 
  27     return dict(osm2pgsql='osm2pgsql',
 
  29                 osm2pgsql_style=str(def_config.get_import_style_file()),
 
  30                 osm2pgsql_style_path=def_config.lib_dir.lua,
 
  32                 dsn=def_config.get_libpq_dsn(),
 
  34                 tablespaces=dict(slim_data='', slim_index='',
 
  35                                  main_data='', main_index=''),
 
  40 def opl_writer(tmp_path, node_grid):
 
  44         fname = tmp_path / f"test_osm_{nr[0]}.opl"
 
  46         with fname.open('wt') as fd:
 
  47             for line in data.split('\n'):
 
  48                 if line.startswith('n') and ' x' not in line:
 
  49                     coord = node_grid.get(line[1:].split(' ')[0]) \
 
  50                             or (random.uniform(-180, 180), random.uniform(-90, 90))
 
  51                     line = f"{line} x{coord[0]:.7f} y{coord[1]:.7f}"
 
  59 @given('the lua style file', target_fixture='osm2pgsql_options')
 
  60 def set_lua_style_file(osm2pgsql_options, docstring, tmp_path):
 
  61     style = tmp_path / 'custom.lua'
 
  62     style.write_text(docstring)
 
  63     osm2pgsql_options['osm2pgsql_style'] = str(style)
 
  65     return osm2pgsql_options
 
  68 @when('loading osm data')
 
  69 def load_from_osm_file(db, osm2pgsql_options, opl_writer, docstring):
 
  70     """ Load the given data into a freshly created test database using osm2pgsql.
 
  71         No further indexing is done.
 
  73         The data is expected as attached text in OPL format.
 
  75     osm2pgsql_options['import_file'] = opl_writer(docstring.replace(r'//', r'/'))
 
  76     osm2pgsql_options['append'] = False
 
  77     run_osm2pgsql(osm2pgsql_options)
 
  80 @when('updating osm data')
 
  81 def update_from_osm_file(db_conn, def_config, osm2pgsql_options, opl_writer, docstring):
 
  82     """ Update a database previously populated with 'loading osm data'.
 
  83         Needs to run indexing on the existing data first to yield the correct
 
  86         The data is expected as attached text in OPL format.
 
  88     create_table_triggers(db_conn, def_config)
 
  89     asyncio.run(load_data(def_config.get_libpq_dsn(), 1))
 
  90     cli.nominatim(['index'], def_config.environ)
 
  91     cli.nominatim(['refresh', '--functions'], def_config.environ)
 
  93     osm2pgsql_options['import_file'] = opl_writer(docstring.replace(r'//', r'/'))
 
  94     run_osm2pgsql_updates(db_conn, osm2pgsql_options)
 
  98 def do_index(def_config):
 
  99     """ Run Nominatim's indexing step.
 
 101     cli.nominatim(['index'], def_config.environ)
 
 104 @then(step_parse(r'(?P<table>\w+) contains(?P<exact> exactly)?'))
 
 105 def check_place_content(db_conn, datatable, node_grid, table, exact):
 
 106     check_table_content(db_conn, table, datatable, grid=node_grid, exact=bool(exact))
 
 109 scenarios('features/osm2pgsql')