1 # SPDX-License-Identifier: GPL-3.0-or-later
 
   3 # This file is part of Nominatim. (https://nominatim.org)
 
   5 # Copyright (C) 2024 by the Nominatim developer community.
 
   6 # For a full list of authors see the git log.
 
   7 from pathlib import Path
 
  12 from psycopg import sql as pysql
 
  14 from nominatim_db import cli
 
  15 from nominatim_db.config import Configuration
 
  16 from nominatim_db.db.connection import Connection, register_hstore, execute_scalar
 
  17 from nominatim_db.tools import refresh
 
  18 from nominatim_db.tokenizer import factory as tokenizer_factory
 
  19 from steps.utils import run_script
 
  21 class NominatimEnvironment:
 
  22     """ Collects all functions for the execution of Nominatim functions.
 
  25     def __init__(self, config):
 
  26         self.src_dir = (Path(__file__) / '..' / '..' / '..' / '..').resolve()
 
  27         self.db_host = config['DB_HOST']
 
  28         self.db_port = config['DB_PORT']
 
  29         self.db_user = config['DB_USER']
 
  30         self.db_pass = config['DB_PASS']
 
  31         self.template_db = config['TEMPLATE_DB']
 
  32         self.test_db = config['TEST_DB']
 
  33         self.api_test_db = config['API_TEST_DB']
 
  34         self.api_test_file = config['API_TEST_FILE']
 
  35         self.tokenizer = config['TOKENIZER']
 
  36         self.import_style = config['STYLE']
 
  37         self.reuse_template = not config['REMOVE_TEMPLATE']
 
  38         self.keep_scenario_db = config['KEEP_TEST_DB']
 
  40         self.default_config = Configuration(None).get_os_env()
 
  42         self.template_db_done = False
 
  43         self.api_db_done = False
 
  44         self.website_dir = None
 
  46         if not hasattr(self, f"create_api_request_func_{config['API_ENGINE']}"):
 
  47             raise RuntimeError(f"Unknown API engine '{config['API_ENGINE']}'")
 
  48         self.api_engine = getattr(self, f"create_api_request_func_{config['API_ENGINE']}")()
 
  50     def connect_database(self, dbname):
 
  51         """ Return a connection to the database with the given name.
 
  52             Uses configured host, user and port.
 
  54         dbargs = {'dbname': dbname, 'row_factory': psycopg.rows.dict_row}
 
  56             dbargs['host'] = self.db_host
 
  58             dbargs['port'] = self.db_port
 
  60             dbargs['user'] = self.db_user
 
  62             dbargs['password'] = self.db_pass
 
  63         return psycopg.connect(**dbargs)
 
  66     def write_nominatim_config(self, dbname):
 
  67         """ Set up a custom test configuration that connects to the given
 
  68             database. This sets up the environment variables so that they can
 
  69             be picked up by dotenv and creates a project directory with the
 
  70             appropriate website scripts.
 
  72         if dbname.startswith('sqlite:'):
 
  73             dsn = 'sqlite:dbname={}'.format(dbname[7:])
 
  75             dsn = 'pgsql:dbname={}'.format(dbname)
 
  77             dsn += ';host=' + self.db_host
 
  79             dsn += ';port=' + self.db_port
 
  81             dsn += ';user=' + self.db_user
 
  83             dsn += ';password=' + self.db_pass
 
  85         self.test_env = dict(self.default_config)
 
  86         self.test_env['NOMINATIM_DATABASE_DSN'] = dsn
 
  87         self.test_env['NOMINATIM_LANGUAGES'] = 'en,de,fr,ja'
 
  88         self.test_env['NOMINATIM_FLATNODE_FILE'] = ''
 
  89         self.test_env['NOMINATIM_IMPORT_STYLE'] = 'full'
 
  90         self.test_env['NOMINATIM_USE_US_TIGER_DATA'] = 'yes'
 
  91         self.test_env['NOMINATIM_DATADIR'] = str((self.src_dir / 'data').resolve())
 
  92         self.test_env['NOMINATIM_SQLDIR'] = str((self.src_dir / 'lib-sql').resolve())
 
  93         self.test_env['NOMINATIM_CONFIGDIR'] = str((self.src_dir / 'settings').resolve())
 
  94         if self.tokenizer is not None:
 
  95             self.test_env['NOMINATIM_TOKENIZER'] = self.tokenizer
 
  96         if self.import_style is not None:
 
  97             self.test_env['NOMINATIM_IMPORT_STYLE'] = self.import_style
 
  99         if self.website_dir is not None:
 
 100             self.website_dir.cleanup()
 
 102         self.website_dir = tempfile.TemporaryDirectory()
 
 105     def get_test_config(self):
 
 106         cfg = Configuration(Path(self.website_dir.name), environ=self.test_env)
 
 109     def get_libpq_dsn(self):
 
 110         dsn = self.test_env['NOMINATIM_DATABASE_DSN']
 
 112         def quote_param(param):
 
 113             key, val = param.split('=')
 
 114             val = val.replace('\\', '\\\\').replace("'", "\\'")
 
 116                 val = "'" + val + "'"
 
 117             return key + '=' + val
 
 119         if dsn.startswith('pgsql:'):
 
 120             # Old PHP DSN format. Convert before returning.
 
 121             return ' '.join([quote_param(p) for p in dsn[6:].split(';')])
 
 126     def db_drop_database(self, name):
 
 127         """ Drop the database with the given name.
 
 129         with self.connect_database('postgres') as conn:
 
 130             conn.autocommit = True
 
 131             conn.execute(pysql.SQL('DROP DATABASE IF EXISTS')
 
 132                          +  pysql.Identifier(name))
 
 134     def setup_template_db(self):
 
 135         """ Setup a template database that already contains common test data.
 
 136             Having a template database speeds up tests considerably but at
 
 137             the price that the tests sometimes run with stale data.
 
 139         if self.template_db_done:
 
 142         self.template_db_done = True
 
 144         self.write_nominatim_config(self.template_db)
 
 146         if not self._reuse_or_drop_db(self.template_db):
 
 148                 # execute nominatim import on an empty file to get the right tables
 
 149                 with tempfile.NamedTemporaryFile(dir='/tmp', suffix='.xml') as fd:
 
 150                     fd.write(b'<osm version="0.6"></osm>')
 
 152                     self.run_nominatim('import', '--osm-file', fd.name,
 
 153                                                  '--osm2pgsql-cache', '1',
 
 155                                                  '--offline', '--index-noanalyse')
 
 157                 self.db_drop_database(self.template_db)
 
 160         self.run_nominatim('refresh', '--functions')
 
 163     def setup_api_db(self):
 
 164         """ Setup a test against the API test database.
 
 166         self.write_nominatim_config(self.api_test_db)
 
 168         if self.api_test_db.startswith('sqlite:'):
 
 171         if not self.api_db_done:
 
 172             self.api_db_done = True
 
 174             if not self._reuse_or_drop_db(self.api_test_db):
 
 175                 testdata = (Path(__file__) / '..' / '..' / '..' / 'testdb').resolve()
 
 176                 self.test_env['NOMINATIM_WIKIPEDIA_DATA_PATH'] = str(testdata)
 
 177                 simp_file = Path(self.website_dir.name) / 'secondary_importance.sql.gz'
 
 178                 simp_file.symlink_to(testdata / 'secondary_importance.sql.gz')
 
 181                     self.run_nominatim('import', '--osm-file', str(self.api_test_file))
 
 182                     self.run_nominatim('add-data', '--tiger-data', str(testdata / 'tiger'))
 
 183                     self.run_nominatim('freeze')
 
 185                     csv_path = str(testdata / 'full_en_phrases_test.csv')
 
 186                     self.run_nominatim('special-phrases', '--import-from-csv', csv_path)
 
 188                     self.db_drop_database(self.api_test_db)
 
 191         tokenizer_factory.get_tokenizer_for_db(self.get_test_config())
 
 194     def setup_unknown_db(self):
 
 195         """ Setup a test against a non-existing database.
 
 197         # The tokenizer needs an existing database to function.
 
 198         # So start with the usual database
 
 203         self.setup_db(context)
 
 204         tokenizer_factory.create_tokenizer(self.get_test_config(), init_db=False)
 
 206         # Then drop the DB again
 
 207         self.teardown_db(context, force_drop=True)
 
 209     def setup_db(self, context):
 
 210         """ Setup a test against a fresh, empty test database.
 
 212         self.setup_template_db()
 
 213         with self.connect_database(self.template_db) as conn:
 
 214             conn.autocommit = True
 
 215             conn.execute(pysql.SQL('DROP DATABASE IF EXISTS')
 
 216                                    + pysql.Identifier(self.test_db))
 
 217             conn.execute(pysql.SQL('CREATE DATABASE {} TEMPLATE = {}').format(
 
 218                            pysql.Identifier(self.test_db),
 
 219                            pysql.Identifier(self.template_db)))
 
 221         self.write_nominatim_config(self.test_db)
 
 222         context.db = self.connect_database(self.test_db)
 
 223         context.db.autocommit = True
 
 224         register_hstore(context.db)
 
 226     def teardown_db(self, context, force_drop=False):
 
 227         """ Remove the test database, if it exists.
 
 229         if hasattr(context, 'db'):
 
 232         if force_drop or not self.keep_scenario_db:
 
 233             self.db_drop_database(self.test_db)
 
 235     def _reuse_or_drop_db(self, name):
 
 236         """ Check for the existence of the given DB. If reuse is enabled,
 
 237             then the function checks for existnce and returns True if the
 
 238             database is already there. Otherwise an existing database is
 
 239             dropped and always false returned.
 
 241         if self.reuse_template:
 
 242             with self.connect_database('postgres') as conn:
 
 243                 num = execute_scalar(conn,
 
 244                                      'select count(*) from pg_database where datname = %s',
 
 249             self.db_drop_database(name)
 
 254     def reindex_placex(self, db):
 
 255         """ Run the indexing step until all data in the placex has
 
 256             been processed. Indexing during updates can produce more data
 
 257             to index under some circumstances. That is why indexing may have
 
 258             to be run multiple times.
 
 260         self.run_nominatim('index')
 
 263     def run_nominatim(self, *cmdline):
 
 264         """ Run the nominatim command-line tool via the library.
 
 266         if self.website_dir is not None:
 
 267             cmdline = list(cmdline) + ['--project-dir', self.website_dir.name]
 
 269         cli.nominatim(osm2pgsql_path=None,
 
 271                       environ=self.test_env)
 
 274     def copy_from_place(self, db):
 
 275         """ Copy data from place to the placex and location_property_osmline
 
 276             tables invoking the appropriate triggers.
 
 278         self.run_nominatim('refresh', '--functions', '--no-diff-updates')
 
 280         with db.cursor() as cur:
 
 281             cur.execute("""INSERT INTO placex (osm_type, osm_id, class, type,
 
 282                                                name, admin_level, address,
 
 284                              SELECT osm_type, osm_id, class, type,
 
 285                                     name, admin_level, address,
 
 288                                WHERE not (class='place' and type='houses' and osm_type='W')""")
 
 289             cur.execute("""INSERT INTO location_property_osmline (osm_id, address, linegeo)
 
 290                              SELECT osm_id, address, geometry
 
 292                               WHERE class='place' and type='houses'
 
 294                                     and ST_GeometryType(geometry) = 'ST_LineString'""")
 
 297     def create_api_request_func_starlette(self):
 
 298         import nominatim_api.server.starlette.server
 
 299         from asgi_lifespan import LifespanManager
 
 302         async def _request(endpoint, params, project_dir, environ, http_headers):
 
 303             app = nominatim_api.server.starlette.server.get_application(project_dir, environ)
 
 305             async with LifespanManager(app):
 
 306                 async with httpx.AsyncClient(app=app, base_url="http://nominatim.test") as client:
 
 307                     response = await client.get(f"/{endpoint}", params=params,
 
 308                                                 headers=http_headers)
 
 310             return response.text, response.status_code
 
 315     def create_api_request_func_falcon(self):
 
 316         import nominatim_api.server.falcon.server
 
 317         import falcon.testing
 
 319         async def _request(endpoint, params, project_dir, environ, http_headers):
 
 320             app = nominatim_api.server.falcon.server.get_application(project_dir, environ)
 
 322             async with falcon.testing.ASGIConductor(app) as conductor:
 
 323                 response = await conductor.get(f"/{endpoint}", params=params,
 
 324                                                headers=http_headers)
 
 326             return response.text, response.status_code