2 Functions for setting up and importing a new Nominatim database.
 
   9 from pathlib import Path
 
  14 from ..db.connection import connect, get_pg_env
 
  15 from ..db import utils as db_utils
 
  16 from ..db.async_connection import DBConnection
 
  17 from .exec_utils import run_osm2pgsql
 
  18 from ..errors import UsageError
 
  19 from ..version import POSTGRESQL_REQUIRED_VERSION, POSTGIS_REQUIRED_VERSION
 
  21 LOG = logging.getLogger()
 
  23 def setup_database_skeleton(dsn, data_dir, no_partitions, rouser=None):
 
  24     """ Create a new database for Nominatim and populate it with the
 
  25         essential extensions and data.
 
  27     LOG.warning('Creating database')
 
  28     create_db(dsn, rouser)
 
  30     LOG.warning('Setting up database')
 
  31     with connect(dsn) as conn:
 
  32         setup_extensions(conn)
 
  34     LOG.warning('Loading basic data')
 
  35     import_base_data(dsn, data_dir, no_partitions)
 
  38 def create_db(dsn, rouser=None):
 
  39     """ Create a new database for the given DSN. Fails when the database
 
  40         already exists or the PostgreSQL version is too old.
 
  41         Uses `createdb` to create the database.
 
  43         If 'rouser' is given, then the function also checks that the user
 
  44         with that given name exists.
 
  46         Requires superuser rights by the caller.
 
  48     proc = subprocess.run(['createdb'], env=get_pg_env(dsn), check=False)
 
  50     if proc.returncode != 0:
 
  51         raise UsageError('Creating new database failed.')
 
  53     with connect(dsn) as conn:
 
  54         postgres_version = conn.server_version_tuple()
 
  55         if postgres_version < POSTGRESQL_REQUIRED_VERSION:
 
  56             LOG.fatal('Minimum supported version of Postgresql is %d.%d. '
 
  57                       'Found version %d.%d.',
 
  58                       POSTGRESQL_REQUIRED_VERSION[0], POSTGRESQL_REQUIRED_VERSION[1],
 
  59                       postgres_version[0], postgres_version[1])
 
  60             raise UsageError('PostgreSQL server is too old.')
 
  62         if rouser is not None:
 
  63             with conn.cursor() as cur:
 
  64                 cnt = cur.scalar('SELECT count(*) FROM pg_user where usename = %s',
 
  67                     LOG.fatal("Web user '%s' does not exists. Create it with:\n"
 
  68                               "\n      createuser %s", rouser, rouser)
 
  69                     raise UsageError('Missing read-only user.')
 
  73 def setup_extensions(conn):
 
  74     """ Set up all extensions needed for Nominatim. Also checks that the
 
  75         versions of the extensions are sufficient.
 
  77     with conn.cursor() as cur:
 
  78         cur.execute('CREATE EXTENSION IF NOT EXISTS hstore')
 
  79         cur.execute('CREATE EXTENSION IF NOT EXISTS postgis')
 
  82     postgis_version = conn.postgis_version_tuple()
 
  83     if postgis_version < POSTGIS_REQUIRED_VERSION:
 
  84         LOG.fatal('Minimum supported version of PostGIS is %d.%d. '
 
  85                   'Found version %d.%d.',
 
  86                   POSTGIS_REQUIRED_VERSION[0], POSTGIS_REQUIRED_VERSION[1],
 
  87                   postgis_version[0], postgis_version[1])
 
  88         raise UsageError('PostGIS version is too old.')
 
  91 def install_module(src_dir, project_dir, module_dir, conn=None):
 
  92     """ Copy the normalization module from src_dir into the project
 
  93         directory under the '/module' directory. If 'module_dir' is set, then
 
  94         use the module from there instead and check that it is accessible
 
  97         The function detects when the installation is run from the
 
  98         build directory. It doesn't touch the module in that case.
 
 100         If 'conn' is given, then the function also tests if the module
 
 101         can be access via the given database.
 
 104         module_dir = project_dir / 'module'
 
 106         if not module_dir.exists() or not src_dir.samefile(module_dir):
 
 108             if not module_dir.exists():
 
 111             destfile = module_dir / 'nominatim.so'
 
 112             shutil.copy(str(src_dir / 'nominatim.so'), str(destfile))
 
 113             destfile.chmod(0o755)
 
 115             LOG.info('Database module installed at %s', str(destfile))
 
 117             LOG.info('Running from build directory. Leaving database module as is.')
 
 119         LOG.info("Using custom path for database module at '%s'", module_dir)
 
 122         with conn.cursor() as cur:
 
 124                 cur.execute("""CREATE FUNCTION nominatim_test_import_func(text)
 
 125                                RETURNS text AS '{}/nominatim.so', 'transliteration'
 
 126                                LANGUAGE c IMMUTABLE STRICT;
 
 127                                DROP FUNCTION nominatim_test_import_func(text)
 
 128                             """.format(module_dir))
 
 129             except psycopg2.DatabaseError as err:
 
 130                 LOG.fatal("Error accessing database module: %s", err)
 
 131                 raise UsageError("Database module cannot be accessed.") from err
 
 134 def import_base_data(dsn, sql_dir, ignore_partitions=False):
 
 135     """ Create and populate the tables with basic static data that provides
 
 136         the background for geocoding. Data is assumed to not yet exist.
 
 138     db_utils.execute_file(dsn, sql_dir / 'country_name.sql')
 
 139     db_utils.execute_file(dsn, sql_dir / 'country_osm_grid.sql.gz')
 
 141     if ignore_partitions:
 
 142         with connect(dsn) as conn:
 
 143             with conn.cursor() as cur:
 
 144                 cur.execute('UPDATE country_name SET partition = 0')
 
 148 def import_osm_data(osm_file, options, drop=False, ignore_errors=False):
 
 149     """ Import the given OSM file. 'options' contains the list of
 
 150         default settings for osm2pgsql.
 
 152     options['import_file'] = osm_file
 
 153     options['append'] = False
 
 154     options['threads'] = 1
 
 156     if not options['flatnode_file'] and options['osm2pgsql_cache'] == 0:
 
 157         # Make some educated guesses about cache size based on the size
 
 158         # of the import file and the available memory.
 
 159         mem = psutil.virtual_memory()
 
 160         fsize = os.stat(str(osm_file)).st_size
 
 161         options['osm2pgsql_cache'] = int(min((mem.available + mem.cached) * 0.75,
 
 162                                              fsize * 2) / 1024 / 1024) + 1
 
 164     run_osm2pgsql(options)
 
 166     with connect(options['dsn']) as conn:
 
 167         if not ignore_errors:
 
 168             with conn.cursor() as cur:
 
 169                 cur.execute('SELECT * FROM place LIMIT 1')
 
 170                 if cur.rowcount == 0:
 
 171                     raise UsageError('No data imported by osm2pgsql.')
 
 174             conn.drop_table('planet_osm_nodes')
 
 177         if options['flatnode_file']:
 
 178             Path(options['flatnode_file']).unlink()
 
 181 def truncate_data_tables(conn, max_word_frequency=None):
 
 182     """ Truncate all data tables to prepare for a fresh load.
 
 184     with conn.cursor() as cur:
 
 185         cur.execute('TRUNCATE word')
 
 186         cur.execute('TRUNCATE placex')
 
 187         cur.execute('TRUNCATE place_addressline')
 
 188         cur.execute('TRUNCATE location_area')
 
 189         cur.execute('TRUNCATE location_area_country')
 
 190         cur.execute('TRUNCATE location_property')
 
 191         cur.execute('TRUNCATE location_property_tiger')
 
 192         cur.execute('TRUNCATE location_property_osmline')
 
 193         cur.execute('TRUNCATE location_postcode')
 
 194         if conn.table_exists('search_name'):
 
 195             cur.execute('TRUNCATE search_name')
 
 196         cur.execute('DROP SEQUENCE IF EXISTS seq_place')
 
 197         cur.execute('CREATE SEQUENCE seq_place start 100000')
 
 199         cur.execute("""SELECT tablename FROM pg_tables
 
 200                        WHERE tablename LIKE 'location_road_%'""")
 
 202         for table in [r[0] for r in list(cur)]:
 
 203             cur.execute('TRUNCATE ' + table)
 
 205         if max_word_frequency is not None:
 
 206             # Used by getorcreate_word_id to ignore frequent partial words.
 
 207             cur.execute("""CREATE OR REPLACE FUNCTION get_maxwordfreq()
 
 208                            RETURNS integer AS $$
 
 209                              SELECT {} as maxwordfreq;
 
 210                            $$ LANGUAGE SQL IMMUTABLE
 
 211                         """.format(max_word_frequency))
 
 214 _COPY_COLUMNS = 'osm_type, osm_id, class, type, name, admin_level, address, extratags, geometry'
 
 216 def load_data(dsn, data_dir, threads):
 
 217     """ Copy data into the word and placex table.
 
 219     # Pre-calculate the most important terms in the word list.
 
 220     db_utils.execute_file(dsn, data_dir / 'words.sql')
 
 222     sel = selectors.DefaultSelector()
 
 223     # Then copy data from place to placex in <threads - 1> chunks.
 
 224     place_threads = max(1, threads - 1)
 
 225     for imod in range(place_threads):
 
 226         conn = DBConnection(dsn)
 
 228         conn.perform("""INSERT INTO placex ({0})
 
 229                          SELECT {0} FROM place
 
 230                          WHERE osm_id % {1} = {2}
 
 231                            AND NOT (class='place' and type='houses')
 
 232                            AND ST_IsValid(geometry)
 
 233                      """.format(_COPY_COLUMNS, place_threads, imod))
 
 234         sel.register(conn, selectors.EVENT_READ, conn)
 
 236     # Address interpolations go into another table.
 
 237     conn = DBConnection(dsn)
 
 239     conn.perform("""INSERT INTO location_property_osmline (osm_id, address, linegeo)
 
 240                       SELECT osm_id, address, geometry FROM place
 
 241                       WHERE class='place' and type='houses' and osm_type='W'
 
 242                             and ST_GeometryType(geometry) = 'ST_LineString'
 
 244     sel.register(conn, selectors.EVENT_READ, conn)
 
 246     # Now wait for all of them to finish.
 
 247     todo = place_threads + 1
 
 249         for key, _ in sel.select(1):
 
 255         print('.', end='', flush=True)
 
 258     with connect(dsn) as conn:
 
 259         with conn.cursor() as cur:
 
 260             cur.execute('ANALYSE')