1 # SPDX-License-Identifier: GPL-2.0-only
3 # This file is part of Nominatim. (https://nominatim.org)
5 # Copyright (C) 2022 by the Nominatim developer community.
6 # For a full list of authors see the git log.
7 from pathlib import Path
12 import psycopg2.extras
14 sys.path.insert(1, str((Path(__file__) / '..' / '..' / '..' / '..').resolve()))
16 from nominatim import cli
17 from nominatim.config import Configuration
18 from nominatim.db.connection import Connection
19 from nominatim.tools import refresh
20 from nominatim.tokenizer import factory as tokenizer_factory
21 from steps.utils import run_script
23 class NominatimEnvironment:
24 """ Collects all functions for the execution of Nominatim functions.
27 def __init__(self, config):
28 self.build_dir = Path(config['BUILDDIR']).resolve()
29 self.src_dir = (Path(__file__) / '..' / '..' / '..' / '..').resolve()
30 self.db_host = config['DB_HOST']
31 self.db_port = config['DB_PORT']
32 self.db_user = config['DB_USER']
33 self.db_pass = config['DB_PASS']
34 self.template_db = config['TEMPLATE_DB']
35 self.test_db = config['TEST_DB']
36 self.api_test_db = config['API_TEST_DB']
37 self.api_test_file = config['API_TEST_FILE']
38 self.tokenizer = config['TOKENIZER']
39 self.import_style = config['STYLE']
40 self.server_module_path = config['SERVER_MODULE_PATH']
41 self.reuse_template = not config['REMOVE_TEMPLATE']
42 self.keep_scenario_db = config['KEEP_TEST_DB']
43 self.code_coverage_path = config['PHPCOV']
44 self.code_coverage_id = 1
46 self.default_config = Configuration(None).get_os_env()
48 self.template_db_done = False
49 self.api_db_done = False
50 self.website_dir = None
52 def connect_database(self, dbname):
53 """ Return a connection to the database with the given name.
54 Uses configured host, user and port.
56 dbargs = {'database': dbname}
58 dbargs['host'] = self.db_host
60 dbargs['port'] = self.db_port
62 dbargs['user'] = self.db_user
64 dbargs['password'] = self.db_pass
65 conn = psycopg2.connect(connection_factory=Connection, **dbargs)
68 def next_code_coverage_file(self):
69 """ Generate the next name for a coverage file.
71 fn = Path(self.code_coverage_path) / "{:06d}.cov".format(self.code_coverage_id)
72 self.code_coverage_id += 1
76 def write_nominatim_config(self, dbname):
77 """ Set up a custom test configuration that connects to the given
78 database. This sets up the environment variables so that they can
79 be picked up by dotenv and creates a project directory with the
80 appropriate website scripts.
82 dsn = 'pgsql:dbname={}'.format(dbname)
84 dsn += ';host=' + self.db_host
86 dsn += ';port=' + self.db_port
88 dsn += ';user=' + self.db_user
90 dsn += ';password=' + self.db_pass
92 self.test_env = dict(self.default_config)
93 self.test_env['NOMINATIM_DATABASE_DSN'] = dsn
94 self.test_env['NOMINATIM_LANGUAGES'] = 'en,de,fr,ja'
95 self.test_env['NOMINATIM_FLATNODE_FILE'] = ''
96 self.test_env['NOMINATIM_IMPORT_STYLE'] = 'full'
97 self.test_env['NOMINATIM_USE_US_TIGER_DATA'] = 'yes'
98 self.test_env['NOMINATIM_DATADIR'] = str((self.src_dir / 'data').resolve())
99 self.test_env['NOMINATIM_SQLDIR'] = str((self.src_dir / 'lib-sql').resolve())
100 self.test_env['NOMINATIM_CONFIGDIR'] = str((self.src_dir / 'settings').resolve())
101 self.test_env['NOMINATIM_DATABASE_MODULE_SRC_PATH'] = str((self.build_dir / 'module').resolve())
102 self.test_env['NOMINATIM_OSM2PGSQL_BINARY'] = str((self.build_dir / 'osm2pgsql' / 'osm2pgsql').resolve())
103 if self.tokenizer is not None:
104 self.test_env['NOMINATIM_TOKENIZER'] = self.tokenizer
105 if self.import_style is not None:
106 self.test_env['NOMINATIM_IMPORT_STYLE'] = self.import_style
108 if self.server_module_path:
109 self.test_env['NOMINATIM_DATABASE_MODULE_PATH'] = self.server_module_path
111 # avoid module being copied into the temporary environment
112 self.test_env['NOMINATIM_DATABASE_MODULE_PATH'] = str((self.build_dir / 'module').resolve())
114 if self.website_dir is not None:
115 self.website_dir.cleanup()
117 self.website_dir = tempfile.TemporaryDirectory()
120 conn = self.connect_database(dbname)
123 refresh.setup_website(Path(self.website_dir.name) / 'website',
124 self.get_test_config(), conn)
127 def get_test_config(self):
128 cfg = Configuration(Path(self.website_dir.name), environ=self.test_env)
129 cfg.set_libdirs(module=self.build_dir / 'module',
130 osm2pgsql=self.build_dir / 'osm2pgsql' / 'osm2pgsql')
133 def get_libpq_dsn(self):
134 dsn = self.test_env['NOMINATIM_DATABASE_DSN']
136 def quote_param(param):
137 key, val = param.split('=')
138 val = val.replace('\\', '\\\\').replace("'", "\\'")
140 val = "'" + val + "'"
141 return key + '=' + val
143 if dsn.startswith('pgsql:'):
144 # Old PHP DSN format. Convert before returning.
145 return ' '.join([quote_param(p) for p in dsn[6:].split(';')])
150 def db_drop_database(self, name):
151 """ Drop the database with the given name.
153 conn = self.connect_database('postgres')
154 conn.set_isolation_level(0)
156 cur.execute('DROP DATABASE IF EXISTS {}'.format(name))
159 def setup_template_db(self):
160 """ Setup a template database that already contains common test data.
161 Having a template database speeds up tests considerably but at
162 the price that the tests sometimes run with stale data.
164 if self.template_db_done:
167 self.template_db_done = True
169 self.write_nominatim_config(self.template_db)
171 if not self._reuse_or_drop_db(self.template_db):
173 # execute nominatim import on an empty file to get the right tables
174 with tempfile.NamedTemporaryFile(dir='/tmp', suffix='.xml') as fd:
175 fd.write(b'<osm version="0.6"></osm>')
177 self.run_nominatim('import', '--osm-file', fd.name,
178 '--osm2pgsql-cache', '1',
180 '--offline', '--index-noanalyse')
182 self.db_drop_database(self.template_db)
185 self.run_nominatim('refresh', '--functions')
188 def setup_api_db(self):
189 """ Setup a test against the API test database.
191 self.write_nominatim_config(self.api_test_db)
193 if not self.api_db_done:
194 self.api_db_done = True
196 if not self._reuse_or_drop_db(self.api_test_db):
197 testdata = (Path(__file__) / '..' / '..' / '..' / 'testdb').resolve()
198 self.test_env['NOMINATIM_WIKIPEDIA_DATA_PATH'] = str(testdata)
199 simp_file = Path(self.website_dir.name) / 'secondary_importance.sql.gz'
200 simp_file.symlink_to(testdata / 'secondary_importance.sql.gz')
203 self.run_nominatim('import', '--osm-file', str(self.api_test_file))
204 self.run_nominatim('add-data', '--tiger-data', str(testdata / 'tiger'))
205 self.run_nominatim('freeze')
207 if self.tokenizer == 'legacy':
208 phrase_file = str(testdata / 'specialphrases_testdb.sql')
209 run_script(['psql', '-d', self.api_test_db, '-f', phrase_file])
211 csv_path = str(testdata / 'full_en_phrases_test.csv')
212 self.run_nominatim('special-phrases', '--import-from-csv', csv_path)
214 self.db_drop_database(self.api_test_db)
217 tokenizer_factory.get_tokenizer_for_db(self.get_test_config())
220 def setup_unknown_db(self):
221 """ Setup a test against a non-existing database.
223 # The tokenizer needs an existing database to function.
224 # So start with the usual database
229 self.setup_db(context)
230 tokenizer_factory.create_tokenizer(self.get_test_config(), init_db=False)
232 # Then drop the DB again
233 self.teardown_db(context, force_drop=True)
235 def setup_db(self, context):
236 """ Setup a test against a fresh, empty test database.
238 self.setup_template_db()
239 conn = self.connect_database(self.template_db)
240 conn.set_isolation_level(0)
242 cur.execute('DROP DATABASE IF EXISTS {}'.format(self.test_db))
243 cur.execute('CREATE DATABASE {} TEMPLATE = {}'.format(self.test_db, self.template_db))
245 self.write_nominatim_config(self.test_db)
246 context.db = self.connect_database(self.test_db)
247 context.db.autocommit = True
248 psycopg2.extras.register_hstore(context.db, globally=False)
250 def teardown_db(self, context, force_drop=False):
251 """ Remove the test database, if it exists.
253 if hasattr(context, 'db'):
256 if force_drop or not self.keep_scenario_db:
257 self.db_drop_database(self.test_db)
259 def _reuse_or_drop_db(self, name):
260 """ Check for the existance of the given DB. If reuse is enabled,
261 then the function checks for existance and returns True if the
262 database is already there. Otherwise an existing database is
263 dropped and always false returned.
265 if self.reuse_template:
266 conn = self.connect_database('postgres')
267 with conn.cursor() as cur:
268 cur.execute('select count(*) from pg_database where datname = %s',
270 if cur.fetchone()[0] == 1:
274 self.db_drop_database(name)
278 def reindex_placex(self, db):
279 """ Run the indexing step until all data in the placex has
280 been processed. Indexing during updates can produce more data
281 to index under some circumstances. That is why indexing may have
282 to be run multiple times.
284 with db.cursor() as cur:
286 self.run_nominatim('index')
288 cur.execute("SELECT 'a' FROM placex WHERE indexed_status != 0 LIMIT 1")
289 if cur.rowcount == 0:
292 def run_nominatim(self, *cmdline):
293 """ Run the nominatim command-line tool via the library.
295 if self.website_dir is not None:
296 cmdline = list(cmdline) + ['--project-dir', self.website_dir.name]
298 cli.nominatim(module_dir='',
299 osm2pgsql_path=str(self.build_dir / 'osm2pgsql' / 'osm2pgsql'),
302 environ=self.test_env)
305 def copy_from_place(self, db):
306 """ Copy data from place to the placex and location_property_osmline
307 tables invoking the appropriate triggers.
309 self.run_nominatim('refresh', '--functions', '--no-diff-updates')
311 with db.cursor() as cur:
312 cur.execute("""INSERT INTO placex (osm_type, osm_id, class, type,
313 name, admin_level, address,
315 SELECT osm_type, osm_id, class, type,
316 name, admin_level, address,
319 WHERE not (class='place' and type='houses' and osm_type='W')""")
320 cur.execute("""INSERT INTO location_property_osmline (osm_id, address, linegeo)
321 SELECT osm_id, address, geometry
323 WHERE class='place' and type='houses'
325 and ST_GeometryType(geometry) = 'ST_LineString'""")