3 from pathlib import Path
 
   8 SRC_DIR = (Path(__file__) / '..' / '..' / '..').resolve()
 
  10 # always test against the source
 
  11 sys.path.insert(0, str(SRC_DIR.resolve()))
 
  13 from nominatim.config import Configuration
 
  14 from nominatim.db import connection
 
  15 from nominatim.db.sql_preprocessor import SQLPreprocessor
 
  16 import nominatim.tokenizer.factory
 
  19 import dummy_tokenizer
 
  21 from cursor import CursorForTesting
 
  25 def temp_db(monkeypatch):
 
  26     """ Create an empty database for the test. The database name is also
 
  27         exported into NOMINATIM_DATABASE_DSN.
 
  29     name = 'test_nominatim_python_unittest'
 
  30     conn = psycopg2.connect(database='postgres')
 
  32     conn.set_isolation_level(0)
 
  33     with conn.cursor() as cur:
 
  34         cur.execute('DROP DATABASE IF EXISTS {}'.format(name))
 
  35         cur.execute('CREATE DATABASE {}'.format(name))
 
  39     monkeypatch.setenv('NOMINATIM_DATABASE_DSN', 'dbname=' + name)
 
  43     conn = psycopg2.connect(database='postgres')
 
  45     conn.set_isolation_level(0)
 
  46     with conn.cursor() as cur:
 
  47         cur.execute('DROP DATABASE IF EXISTS {}'.format(name))
 
  54     return 'dbname=' + temp_db
 
  58 def temp_db_with_extensions(temp_db):
 
  59     conn = psycopg2.connect(database=temp_db)
 
  60     with conn.cursor() as cur:
 
  61         cur.execute('CREATE EXTENSION hstore; CREATE EXTENSION postgis;')
 
  68 def temp_db_conn(temp_db):
 
  69     """ Connection to the test database.
 
  71     with connection.connect('dbname=' + temp_db) as conn:
 
  76 def temp_db_cursor(temp_db):
 
  77     """ Connection and cursor towards the test database. The connection will
 
  78         be in auto-commit mode.
 
  80     conn = psycopg2.connect('dbname=' + temp_db)
 
  81     conn.set_isolation_level(0)
 
  82     with conn.cursor(cursor_factory=CursorForTesting) as cur:
 
  88 def table_factory(temp_db_cursor):
 
  89     """ A fixture that creates new SQL tables, potentially filled with
 
  92     def mk_table(name, definition='id INT', content=None):
 
  93         temp_db_cursor.execute('CREATE TABLE {} ({})'.format(name, definition))
 
  94         if content is not None:
 
  95             temp_db_cursor.execute_values("INSERT INTO {} VALUES %s".format(name), content)
 
 102     cfg = Configuration(None, SRC_DIR.resolve() / 'settings')
 
 103     cfg.set_libdirs(module='.', osm2pgsql='.',
 
 104                     php=SRC_DIR / 'lib-php',
 
 105                     sql=SRC_DIR / 'lib-sql',
 
 106                     data=SRC_DIR / 'data')
 
 112     return SRC_DIR.resolve()
 
 117     def _call_nominatim(*args):
 
 118         return nominatim.cli.nominatim(module_dir='MODULE NOT AVAILABLE',
 
 119                                        osm2pgsql_path='OSM2PGSQL NOT AVAILABLE',
 
 120                                        phplib_dir=str(SRC_DIR / 'lib-php'),
 
 121                                        data_dir=str(SRC_DIR / 'data'),
 
 122                                        phpcgi_path='/usr/bin/php-cgi',
 
 123                                        sqllib_dir=str(SRC_DIR / 'lib-sql'),
 
 124                                        config_dir=str(SRC_DIR / 'settings'),
 
 127     return _call_nominatim
 
 131 def property_table(table_factory, temp_db_conn):
 
 132     table_factory('nominatim_properties', 'property TEXT, value TEXT')
 
 134     return mocks.MockPropertyTable(temp_db_conn)
 
 138 def status_table(table_factory):
 
 139     """ Create an empty version of the status table and
 
 140         the status logging table.
 
 142     table_factory('import_status',
 
 143                   """lastimportdate timestamp with time zone NOT NULL,
 
 146     table_factory('import_osmosis_log',
 
 147                   """batchend timestamp,
 
 156 def place_table(temp_db_with_extensions, table_factory):
 
 157     """ Create an empty version of the place table.
 
 159     table_factory('place',
 
 160                   """osm_id int8 NOT NULL,
 
 161                      osm_type char(1) NOT NULL,
 
 165                      admin_level smallint,
 
 168                      geometry Geometry(Geometry,4326) NOT NULL""")
 
 172 def place_row(place_table, temp_db_cursor):
 
 173     """ A factory for rows in the place table. The table is created as a
 
 174         prerequisite to the fixture.
 
 176     psycopg2.extras.register_hstore(temp_db_cursor)
 
 177     idseq = itertools.count(1001)
 
 178     def _insert(osm_type='N', osm_id=None, cls='amenity', typ='cafe', names=None,
 
 179                 admin_level=None, address=None, extratags=None, geom=None):
 
 180         temp_db_cursor.execute("INSERT INTO place VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s)",
 
 181                                (osm_id or next(idseq), osm_type, cls, typ, names,
 
 182                                 admin_level, address, extratags,
 
 183                                 geom or 'SRID=4326;POINT(0 0)'))
 
 188 def placex_table(temp_db_with_extensions, temp_db_conn):
 
 189     """ Create an empty version of the place table.
 
 191     return mocks.MockPlacexTable(temp_db_conn)
 
 195 def osmline_table(temp_db_with_extensions, table_factory):
 
 196     table_factory('location_property_osmline',
 
 199                      parent_place_id BIGINT,
 
 200                      geometry_sector INTEGER,
 
 201                      indexed_date TIMESTAMP,
 
 205                      indexed_status SMALLINT,
 
 207                      interpolationtype TEXT,
 
 210                      country_code VARCHAR(2)""")
 
 214 def word_table(temp_db_conn):
 
 215     return mocks.MockWordTable(temp_db_conn)
 
 219 def osm2pgsql_options(temp_db):
 
 220     return dict(osm2pgsql='echo',
 
 222                 osm2pgsql_style='style.file',
 
 224                 dsn='dbname=' + temp_db,
 
 226                 tablespaces=dict(slim_data='', slim_index='',
 
 227                                  main_data='', main_index=''))
 
 231 def sql_preprocessor_cfg(tmp_path, table_factory, temp_db_with_extensions):
 
 232     table_factory('country_name', 'partition INT', ((0, ), (1, ), (2, )))
 
 233     cfg = Configuration(None, SRC_DIR.resolve() / 'settings')
 
 234     cfg.set_libdirs(module='.', osm2pgsql='.', php=SRC_DIR / 'lib-php',
 
 235                     sql=tmp_path, data=SRC_DIR / 'data')
 
 240 def sql_preprocessor(sql_preprocessor_cfg, temp_db_conn):
 
 241     return SQLPreprocessor(temp_db_conn, sql_preprocessor_cfg)
 
 245 def tokenizer_mock(monkeypatch, property_table):
 
 246     """ Sets up the configuration so that the test dummy tokenizer will be
 
 247         loaded when the tokenizer factory is used. Also returns a factory
 
 248         with which a new dummy tokenizer may be created.
 
 250     monkeypatch.setenv('NOMINATIM_TOKENIZER', 'dummy')
 
 252     def _import_dummy(*args, **kwargs):
 
 253         return dummy_tokenizer
 
 255     monkeypatch.setattr(nominatim.tokenizer.factory, "_import_tokenizer", _import_dummy)
 
 256     property_table.set('tokenizer', 'dummy')
 
 258     def _create_tokenizer():
 
 259         return dummy_tokenizer.DummyTokenizer(None, None)
 
 261     return _create_tokenizer