2 Functions for setting up and importing a new Nominatim database.
 
   9 from pathlib import Path
 
  14 from ..db.connection import connect, get_pg_env
 
  15 from ..db import utils as db_utils
 
  16 from ..db.async_connection import DBConnection
 
  17 from ..db.sql_preprocessor import SQLPreprocessor
 
  18 from .exec_utils import run_osm2pgsql
 
  19 from ..errors import UsageError
 
  20 from ..version import POSTGRESQL_REQUIRED_VERSION, POSTGIS_REQUIRED_VERSION
 
  22 LOG = logging.getLogger()
 
  24 def setup_database_skeleton(dsn, data_dir, no_partitions, rouser=None):
 
  25     """ Create a new database for Nominatim and populate it with the
 
  26         essential extensions and data.
 
  28     LOG.warning('Creating database')
 
  29     create_db(dsn, rouser)
 
  31     LOG.warning('Setting up database')
 
  32     with connect(dsn) as conn:
 
  33         setup_extensions(conn)
 
  35     LOG.warning('Loading basic data')
 
  36     import_base_data(dsn, data_dir, no_partitions)
 
  39 def create_db(dsn, rouser=None):
 
  40     """ Create a new database for the given DSN. Fails when the database
 
  41         already exists or the PostgreSQL version is too old.
 
  42         Uses `createdb` to create the database.
 
  44         If 'rouser' is given, then the function also checks that the user
 
  45         with that given name exists.
 
  47         Requires superuser rights by the caller.
 
  49     proc = subprocess.run(['createdb'], env=get_pg_env(dsn), check=False)
 
  51     if proc.returncode != 0:
 
  52         raise UsageError('Creating new database failed.')
 
  54     with connect(dsn) as conn:
 
  55         postgres_version = conn.server_version_tuple()
 
  56         if postgres_version < POSTGRESQL_REQUIRED_VERSION:
 
  57             LOG.fatal('Minimum supported version of Postgresql is %d.%d. '
 
  58                       'Found version %d.%d.',
 
  59                       POSTGRESQL_REQUIRED_VERSION[0], POSTGRESQL_REQUIRED_VERSION[1],
 
  60                       postgres_version[0], postgres_version[1])
 
  61             raise UsageError('PostgreSQL server is too old.')
 
  63         if rouser is not None:
 
  64             with conn.cursor() as cur:
 
  65                 cnt = cur.scalar('SELECT count(*) FROM pg_user where usename = %s',
 
  68                     LOG.fatal("Web user '%s' does not exists. Create it with:\n"
 
  69                               "\n      createuser %s", rouser, rouser)
 
  70                     raise UsageError('Missing read-only user.')
 
  74 def setup_extensions(conn):
 
  75     """ Set up all extensions needed for Nominatim. Also checks that the
 
  76         versions of the extensions are sufficient.
 
  78     with conn.cursor() as cur:
 
  79         cur.execute('CREATE EXTENSION IF NOT EXISTS hstore')
 
  80         cur.execute('CREATE EXTENSION IF NOT EXISTS postgis')
 
  83     postgis_version = conn.postgis_version_tuple()
 
  84     if postgis_version < POSTGIS_REQUIRED_VERSION:
 
  85         LOG.fatal('Minimum supported version of PostGIS is %d.%d. '
 
  86                   'Found version %d.%d.',
 
  87                   POSTGIS_REQUIRED_VERSION[0], POSTGIS_REQUIRED_VERSION[1],
 
  88                   postgis_version[0], postgis_version[1])
 
  89         raise UsageError('PostGIS version is too old.')
 
  92 def install_module(src_dir, project_dir, module_dir, conn=None):
 
  93     """ Copy the normalization module from src_dir into the project
 
  94         directory under the '/module' directory. If 'module_dir' is set, then
 
  95         use the module from there instead and check that it is accessible
 
  98         The function detects when the installation is run from the
 
  99         build directory. It doesn't touch the module in that case.
 
 101         If 'conn' is given, then the function also tests if the module
 
 102         can be access via the given database.
 
 105         module_dir = project_dir / 'module'
 
 107         if not module_dir.exists() or not src_dir.samefile(module_dir):
 
 109             if not module_dir.exists():
 
 112             destfile = module_dir / 'nominatim.so'
 
 113             shutil.copy(str(src_dir / 'nominatim.so'), str(destfile))
 
 114             destfile.chmod(0o755)
 
 116             LOG.info('Database module installed at %s', str(destfile))
 
 118             LOG.info('Running from build directory. Leaving database module as is.')
 
 120         LOG.info("Using custom path for database module at '%s'", module_dir)
 
 123         with conn.cursor() as cur:
 
 125                 cur.execute("""CREATE FUNCTION nominatim_test_import_func(text)
 
 126                                RETURNS text AS '{}/nominatim.so', 'transliteration'
 
 127                                LANGUAGE c IMMUTABLE STRICT;
 
 128                                DROP FUNCTION nominatim_test_import_func(text)
 
 129                             """.format(module_dir))
 
 130             except psycopg2.DatabaseError as err:
 
 131                 LOG.fatal("Error accessing database module: %s", err)
 
 132                 raise UsageError("Database module cannot be accessed.") from err
 
 135 def import_base_data(dsn, sql_dir, ignore_partitions=False):
 
 136     """ Create and populate the tables with basic static data that provides
 
 137         the background for geocoding. Data is assumed to not yet exist.
 
 139     db_utils.execute_file(dsn, sql_dir / 'country_name.sql')
 
 140     db_utils.execute_file(dsn, sql_dir / 'country_osm_grid.sql.gz')
 
 142     if ignore_partitions:
 
 143         with connect(dsn) as conn:
 
 144             with conn.cursor() as cur:
 
 145                 cur.execute('UPDATE country_name SET partition = 0')
 
 149 def import_osm_data(osm_file, options, drop=False, ignore_errors=False):
 
 150     """ Import the given OSM file. 'options' contains the list of
 
 151         default settings for osm2pgsql.
 
 153     options['import_file'] = osm_file
 
 154     options['append'] = False
 
 155     options['threads'] = 1
 
 157     if not options['flatnode_file'] and options['osm2pgsql_cache'] == 0:
 
 158         # Make some educated guesses about cache size based on the size
 
 159         # of the import file and the available memory.
 
 160         mem = psutil.virtual_memory()
 
 161         fsize = os.stat(str(osm_file)).st_size
 
 162         options['osm2pgsql_cache'] = int(min((mem.available + mem.cached) * 0.75,
 
 163                                              fsize * 2) / 1024 / 1024) + 1
 
 165     run_osm2pgsql(options)
 
 167     with connect(options['dsn']) as conn:
 
 168         if not ignore_errors:
 
 169             with conn.cursor() as cur:
 
 170                 cur.execute('SELECT * FROM place LIMIT 1')
 
 171                 if cur.rowcount == 0:
 
 172                     raise UsageError('No data imported by osm2pgsql.')
 
 175             conn.drop_table('planet_osm_nodes')
 
 178         if options['flatnode_file']:
 
 179             Path(options['flatnode_file']).unlink()
 
 182 def create_tables(conn, config, sqllib_dir, reverse_only=False):
 
 183     """ Create the set of basic tables.
 
 184         When `reverse_only` is True, then the main table for searching will
 
 185         be skipped and only reverse search is possible.
 
 187     sql = SQLPreprocessor(conn, config, sqllib_dir)
 
 188     sql.env.globals['db']['reverse_only'] = reverse_only
 
 190     sql.run_sql_file(conn, 'tables.sql')
 
 193 def create_table_triggers(conn, config, sqllib_dir):
 
 194     """ Create the triggers for the tables. The trigger functions must already
 
 195         have been imported with refresh.create_functions().
 
 197     sql = SQLPreprocessor(conn, config, sqllib_dir)
 
 198     sql.run_sql_file(conn, 'table-triggers.sql')
 
 201 def create_partition_tables(conn, config, sqllib_dir):
 
 202     """ Create tables that have explicit partitioning.
 
 204     sql = SQLPreprocessor(conn, config, sqllib_dir)
 
 205     sql.run_sql_file(conn, 'partition-tables.src.sql')
 
 208 def truncate_data_tables(conn, max_word_frequency=None):
 
 209     """ Truncate all data tables to prepare for a fresh load.
 
 211     with conn.cursor() as cur:
 
 212         cur.execute('TRUNCATE word')
 
 213         cur.execute('TRUNCATE placex')
 
 214         cur.execute('TRUNCATE place_addressline')
 
 215         cur.execute('TRUNCATE location_area')
 
 216         cur.execute('TRUNCATE location_area_country')
 
 217         cur.execute('TRUNCATE location_property')
 
 218         cur.execute('TRUNCATE location_property_tiger')
 
 219         cur.execute('TRUNCATE location_property_osmline')
 
 220         cur.execute('TRUNCATE location_postcode')
 
 221         if conn.table_exists('search_name'):
 
 222             cur.execute('TRUNCATE search_name')
 
 223         cur.execute('DROP SEQUENCE IF EXISTS seq_place')
 
 224         cur.execute('CREATE SEQUENCE seq_place start 100000')
 
 226         cur.execute("""SELECT tablename FROM pg_tables
 
 227                        WHERE tablename LIKE 'location_road_%'""")
 
 229         for table in [r[0] for r in list(cur)]:
 
 230             cur.execute('TRUNCATE ' + table)
 
 232         if max_word_frequency is not None:
 
 233             # Used by getorcreate_word_id to ignore frequent partial words.
 
 234             cur.execute("""CREATE OR REPLACE FUNCTION get_maxwordfreq()
 
 235                            RETURNS integer AS $$
 
 236                              SELECT {} as maxwordfreq;
 
 237                            $$ LANGUAGE SQL IMMUTABLE
 
 238                         """.format(max_word_frequency))
 
 241 _COPY_COLUMNS = 'osm_type, osm_id, class, type, name, admin_level, address, extratags, geometry'
 
 243 def load_data(dsn, data_dir, threads):
 
 244     """ Copy data into the word and placex table.
 
 246     # Pre-calculate the most important terms in the word list.
 
 247     db_utils.execute_file(dsn, data_dir / 'words.sql')
 
 249     sel = selectors.DefaultSelector()
 
 250     # Then copy data from place to placex in <threads - 1> chunks.
 
 251     place_threads = max(1, threads - 1)
 
 252     for imod in range(place_threads):
 
 253         conn = DBConnection(dsn)
 
 255         conn.perform("""INSERT INTO placex ({0})
 
 256                          SELECT {0} FROM place
 
 257                          WHERE osm_id % {1} = {2}
 
 258                            AND NOT (class='place' and type='houses')
 
 259                            AND ST_IsValid(geometry)
 
 260                      """.format(_COPY_COLUMNS, place_threads, imod))
 
 261         sel.register(conn, selectors.EVENT_READ, conn)
 
 263     # Address interpolations go into another table.
 
 264     conn = DBConnection(dsn)
 
 266     conn.perform("""INSERT INTO location_property_osmline (osm_id, address, linegeo)
 
 267                       SELECT osm_id, address, geometry FROM place
 
 268                       WHERE class='place' and type='houses' and osm_type='W'
 
 269                             and ST_GeometryType(geometry) = 'ST_LineString'
 
 271     sel.register(conn, selectors.EVENT_READ, conn)
 
 273     # Now wait for all of them to finish.
 
 274     todo = place_threads + 1
 
 276         for key, _ in sel.select(1):
 
 282         print('.', end='', flush=True)
 
 285     with connect(dsn) as conn:
 
 286         with conn.cursor() as cur:
 
 287             cur.execute('ANALYSE')
 
 290 def create_search_indices(conn, config, sqllib_dir, drop=False):
 
 291     """ Create tables that have explicit partitioning.
 
 294     # If index creation failed and left an index invalid, they need to be
 
 295     # cleaned out first, so that the script recreates them.
 
 296     with conn.cursor() as cur:
 
 297         cur.execute("""SELECT relname FROM pg_class, pg_index
 
 298                        WHERE pg_index.indisvalid = false
 
 299                              AND pg_index.indexrelid = pg_class.oid""")
 
 300         bad_indices = [row[0] for row in list(cur)]
 
 301         for idx in bad_indices:
 
 302             LOG.info("Drop invalid index %s.", idx)
 
 303             cur.execute('DROP INDEX "{}"'.format(idx))
 
 306     sql = SQLPreprocessor(conn, config, sqllib_dir)
 
 308     sql.run_sql_file(conn, 'indices.sql', drop=drop)
 
 310 def create_country_names(conn, config):
 
 311     """ Create search index for default country names.
 
 314     with conn.cursor() as cur:
 
 315         cur.execute("""SELECT getorcreate_country(make_standard_name('uk'), 'gb')""")
 
 316         cur.execute("""SELECT getorcreate_country(make_standard_name('united states'), 'us')""")
 
 317         cur.execute("""SELECT COUNT(*) FROM
 
 318                        (SELECT getorcreate_country(make_standard_name(country_code),
 
 319                        country_code) FROM country_name WHERE country_code is not null) AS x""")
 
 320         cur.execute("""SELECT COUNT(*) FROM
 
 321                        (SELECT getorcreate_country(make_standard_name(name->'name'), country_code) 
 
 322                        FROM country_name WHERE name ? 'name') AS x""")
 
 323         sql_statement = """SELECT COUNT(*) FROM (SELECT getorcreate_country(make_standard_name(v),
 
 324                            country_code) FROM (SELECT country_code, skeys(name)
 
 325                            AS k, svals(name) AS v FROM country_name) x WHERE k"""
 
 327         languages = config.LANGUAGES
 
 330             sql_statement = "{} IN (".format(sql_statement)
 
 332             for language in languages.split(','):
 
 333                 sql_statement = "{}{}'name:{}'".format(sql_statement, delim, language)
 
 335             sql_statement = '{})'.format(sql_statement)
 
 337             sql_statement = "{} LIKE 'name:%'".format(sql_statement)
 
 338         sql_statement = "{}) v".format(sql_statement)
 
 339         cur.execute(sql_statement)