1 # SPDX-License-Identifier: GPL-3.0-or-later
 
   3 # This file is part of Nominatim. (https://nominatim.org)
 
   5 # Copyright (C) 2025 by the Nominatim developer community.
 
   6 # For a full list of authors see the git log.
 
   8 Collector for BDD import acceptance tests.
 
  10 These tests check the Nominatim import chain after the osm2pgsql import.
 
  18 from pytest_bdd import scenarios, when, then, given
 
  19 from pytest_bdd.parsers import re as step_parse
 
  21 from utils.place_inserter import PlaceColumn
 
  22 from utils.checks import check_table_content
 
  24 from nominatim_db.config import Configuration
 
  25 from nominatim_db import cli
 
  26 from nominatim_db.tools.database_import import load_data, create_table_triggers
 
  27 from nominatim_db.tools.postcodes import update_postcodes
 
  28 from nominatim_db.tokenizer import factory as tokenizer_factory
 
  31 def _rewrite_placeid_field(field, new_field, datatable, place_ids):
 
  33         oidx = datatable[0].index(field)
 
  34         datatable[0][oidx] = new_field
 
  35         for line in datatable[1:]:
 
  36             line[oidx] = None if line[oidx] == '-' else place_ids[line[oidx]]
 
  41 def _collect_place_ids(conn):
 
  43     with conn.cursor() as cur:
 
  44         for row in cur.execute('SELECT place_id, osm_type, osm_id, class FROM placex'):
 
  45             pids[f"{row[1]}{row[2]}"] = row[0]
 
  46             pids[f"{row[1]}{row[2]}:{row[3]}"] = row[0]
 
  52 def test_config_env(pytestconfig):
 
  53     dbname = pytestconfig.getini('nominatim_test_db')
 
  55     config = Configuration(None).get_os_env()
 
  56     config['NOMINATIM_DATABASE_DSN'] = f"pgsql:dbname={dbname}"
 
  57     config['NOMINATIM_LANGUAGES'] = 'en,de,fr,ja'
 
  58     config['NOMINATIM_USE_US_TIGER_DATA'] = 'yes'
 
  59     if pytestconfig.option.NOMINATIM_TOKENIZER is not None:
 
  60         config['NOMINATIM_TOKENIZER'] = pytestconfig.option.NOMINATIM_TOKENIZER
 
  66 def update_config(def_config):
 
  67     """ Prepare the database for being updatable and return the config.
 
  69     cli.nominatim(['refresh', '--functions'], def_config.environ)
 
  74 @given(step_parse('the (?P<named>named )?places'), target_fixture=None)
 
  75 def import_places(db_conn, named, datatable, node_grid):
 
  76     """ Insert todo rows into the place table.
 
  77         When 'named' is given, then a random name will be generated for all
 
  80     with db_conn.cursor() as cur:
 
  81         for row in datatable[1:]:
 
  82             PlaceColumn(node_grid).add_row(datatable[0], row, named is not None).db_insert(cur)
 
  85 @given(step_parse('the entrances'), target_fixture=None)
 
  86 def import_place_entrances(db_conn, datatable, node_grid):
 
  87     """ Insert todo rows into the place_entrance table.
 
  89     with db_conn.cursor() as cur:
 
  90         for row in datatable[1:]:
 
  91             data = PlaceColumn(node_grid).add_row(datatable[0], row, False)
 
  92             assert data.columns['osm_type'] == 'N'
 
  94             cur.execute("""INSERT INTO place_entrance (osm_id, type, extratags, geometry)
 
  95                            VALUES (%s, %s, %s, {})""".format(data.get_wkt()),
 
  96                         (data.columns['osm_id'], data.columns['type'],
 
  97                          data.columns.get('extratags')))
 
 100 @given('the ways', target_fixture=None)
 
 101 def import_ways(db_conn, datatable):
 
 102     """ Import raw ways into the osm2pgsql way middle table.
 
 104     with db_conn.cursor() as cur:
 
 105         id_idx = datatable[0].index('id')
 
 106         node_idx = datatable[0].index('nodes')
 
 107         for line in datatable[1:]:
 
 108             tags = psycopg.types.json.Json(
 
 109                 {k[5:]: v for k, v in zip(datatable[0], line)
 
 110                  if k.startswith("tags+")})
 
 111             nodes = [int(x) for x in line[node_idx].split(',')]
 
 113             cur.execute("INSERT INTO planet_osm_ways (id, nodes, tags) VALUES (%s, %s, %s)",
 
 114                         (line[id_idx], nodes, tags))
 
 117 @given('the relations', target_fixture=None)
 
 118 def import_rels(db_conn, datatable):
 
 119     """ Import raw relations into the osm2pgsql relation middle table.
 
 121     with db_conn.cursor() as cur:
 
 122         id_idx = datatable[0].index('id')
 
 123         memb_idx = datatable[0].index('members')
 
 124         for line in datatable[1:]:
 
 125             tags = psycopg.types.json.Json(
 
 126                 {k[5:]: v for k, v in zip(datatable[0], line)
 
 127                  if k.startswith("tags+")})
 
 130                 for member in line[memb_idx].split(','):
 
 131                     m = re.fullmatch(r'\s*([RWN])(\d+)(?::(\S+))?\s*', member)
 
 133                         raise ValueError(f'Illegal member {member}.')
 
 134                     members.append({'ref': int(m[2]), 'role': m[3] or '', 'type': m[1]})
 
 136             cur.execute('INSERT INTO planet_osm_rels (id, tags, members) VALUES (%s, %s, %s)',
 
 137                         (int(line[id_idx]), tags, psycopg.types.json.Json(members)))
 
 140 @when('importing', target_fixture='place_ids')
 
 141 def do_import(db_conn, def_config):
 
 142     """ Run a reduced version of the Nominatim import.
 
 144     create_table_triggers(db_conn, def_config)
 
 145     asyncio.run(load_data(def_config.get_libpq_dsn(), 1))
 
 146     tokenizer = tokenizer_factory.get_tokenizer_for_db(def_config)
 
 147     update_postcodes(def_config.get_libpq_dsn(), None, tokenizer)
 
 148     cli.nominatim(['index', '-q'], def_config.environ)
 
 150     return _collect_place_ids(db_conn)
 
 153 @when('updating places', target_fixture='place_ids')
 
 154 def do_update(db_conn, update_config, node_grid, datatable):
 
 155     """ Update the place table with the given data. Also runs all triggers
 
 156         related to updates and reindexes the new data.
 
 158     with db_conn.cursor() as cur:
 
 159         for row in datatable[1:]:
 
 160             PlaceColumn(node_grid).add_row(datatable[0], row, False).db_insert(cur)
 
 161         cur.execute('SELECT flush_deleted_places()')
 
 164     cli.nominatim(['index', '-q'], update_config.environ)
 
 166     return _collect_place_ids(db_conn)
 
 169 @when('updating entrances', target_fixture=None)
 
 170 def update_place_entrances(db_conn, datatable, node_grid):
 
 171     """ Insert todo rows into the place_entrance table.
 
 173     with db_conn.cursor() as cur:
 
 174         for row in datatable[1:]:
 
 175             data = PlaceColumn(node_grid).add_row(datatable[0], row, False)
 
 176             assert data.columns['osm_type'] == 'N'
 
 178             cur.execute("DELETE FROM place_entrance WHERE osm_id = %s",
 
 179                         (data.columns['osm_id'],))
 
 180             cur.execute("""INSERT INTO place_entrance (osm_id, type, extratags, geometry)
 
 181                            VALUES (%s, %s, %s, {})""".format(data.get_wkt()),
 
 182                         (data.columns['osm_id'], data.columns['type'],
 
 183                          data.columns.get('extratags')))
 
 186 @when('updating postcodes')
 
 187 def do_postcode_update(update_config):
 
 188     """ Recompute the postcode centroids.
 
 190     cli.nominatim(['refresh', '--postcodes'], update_config.environ)
 
 193 @when(step_parse(r'marking for delete (?P<otype>[NRW])(?P<oid>\d+)'),
 
 194       converters={'oid': int})
 
 195 def do_delete_place(db_conn, update_config, node_grid, otype, oid):
 
 196     """ Remove the given place from the database.
 
 198     with db_conn.cursor() as cur:
 
 199         cur.execute('TRUNCATE place_to_be_deleted')
 
 200         cur.execute('DELETE FROM place WHERE osm_type = %s and osm_id = %s',
 
 202         cur.execute('SELECT flush_deleted_places()')
 
 204             cur.execute('DELETE FROM place_entrance WHERE osm_id = %s',
 
 208     cli.nominatim(['index', '-q'], update_config.environ)
 
 211 @then(step_parse(r'(?P<table>\w+) contains(?P<exact> exactly)?'))
 
 212 def then_check_table_content(db_conn, place_ids, datatable, node_grid, table, exact):
 
 213     _rewrite_placeid_field('object', 'place_id', datatable, place_ids)
 
 214     _rewrite_placeid_field('parent_place_id', 'parent_place_id', datatable, place_ids)
 
 215     _rewrite_placeid_field('linked_place_id', 'linked_place_id', datatable, place_ids)
 
 216     if table == 'place_addressline':
 
 217         _rewrite_placeid_field('address', 'address_place_id', datatable, place_ids)
 
 219     for i, title in enumerate(datatable[0]):
 
 220         if title.startswith('addr+'):
 
 221             datatable[0][i] = f"address+{title[5:]}"
 
 223     check_table_content(db_conn, table, datatable, grid=node_grid, exact=bool(exact))
 
 226 @then(step_parse(r'(DISABLED?P<table>placex?) has no entry for (?P<oid>[NRW]\d+(?::\S+)?)'))
 
 227 def then_check_place_missing_lines(db_conn, place_ids, table, oid):
 
 228     assert oid in place_ids
 
 230     sql = pysql.SQL("""SELECT count(*) FROM {}
 
 231                        WHERE place_id = %s""").format(pysql.Identifier(tablename))
 
 233     with conn.cursor(row_factory=tuple_row) as cur:
 
 234         assert cur.execute(sql, [place_ids[oid]]).fetchone()[0] == 0
 
 237 @then(step_parse(r'W(?P<oid>\d+) expands to interpolation'),
 
 238       converters={'oid': int})
 
 239 def then_check_interpolation_table(db_conn, node_grid, place_ids, oid, datatable):
 
 240     with db_conn.cursor() as cur:
 
 241         cur.execute('SELECT count(*) FROM location_property_osmline WHERE osm_id = %s',
 
 243         assert cur.fetchone()[0] == len(datatable) - 1
 
 245     converted = [['osm_id', 'startnumber', 'endnumber', 'linegeo!wkt']]
 
 246     start_idx = datatable[0].index('start') if 'start' in datatable[0] else None
 
 247     end_idx = datatable[0].index('end') if 'end' in datatable[0] else None
 
 248     geom_idx = datatable[0].index('geometry') if 'geometry' in datatable[0] else None
 
 249     converted = [['osm_id']]
 
 250     for val, col in zip((start_idx, end_idx, geom_idx),
 
 251                         ('startnumber', 'endnumber', 'linegeo!wkt')):
 
 253             converted[0].append(col)
 
 255     for line in datatable[1:]:
 
 257         for val in (start_idx, end_idx):
 
 259                 convline.append(line[val])
 
 260         if geom_idx is not None:
 
 261             convline.append(line[geom_idx])
 
 262         converted.append(convline)
 
 264     _rewrite_placeid_field('parent_place_id', 'parent_place_id', converted, place_ids)
 
 266     check_table_content(db_conn, 'location_property_osmline', converted, grid=node_grid)
 
 269 @then(step_parse(r'W(?P<oid>\d+) expands to no interpolation'),
 
 270       converters={'oid': int})
 
 271 def then_check_interpolation_table_negative(db_conn, oid):
 
 272     with db_conn.cursor() as cur:
 
 273         cur.execute("""SELECT count(*) FROM location_property_osmline
 
 274                        WHERE osm_id = %s and startnumber is not null""",
 
 276         assert cur.fetchone()[0] == 0
 
 279 scenarios('features/db')