2 Functions for setting up and importing a new Nominatim database.
9 from pathlib import Path
14 from ..db.connection import connect, get_pg_env
15 from ..db import utils as db_utils
16 from ..db.async_connection import DBConnection
17 from .exec_utils import run_osm2pgsql
18 from ..errors import UsageError
19 from ..version import POSTGRESQL_REQUIRED_VERSION, POSTGIS_REQUIRED_VERSION
21 LOG = logging.getLogger()
23 def setup_database_skeleton(dsn, data_dir, no_partitions, rouser=None):
24 """ Create a new database for Nominatim and populate it with the
25 essential extensions and data.
27 LOG.warning('Creating database')
28 create_db(dsn, rouser)
30 LOG.warning('Setting up database')
31 with connect(dsn) as conn:
32 setup_extensions(conn)
34 LOG.warning('Loading basic data')
35 import_base_data(dsn, data_dir, no_partitions)
38 def create_db(dsn, rouser=None):
39 """ Create a new database for the given DSN. Fails when the database
40 already exists or the PostgreSQL version is too old.
41 Uses `createdb` to create the database.
43 If 'rouser' is given, then the function also checks that the user
44 with that given name exists.
46 Requires superuser rights by the caller.
48 proc = subprocess.run(['createdb'], env=get_pg_env(dsn), check=False)
50 if proc.returncode != 0:
51 raise UsageError('Creating new database failed.')
53 with connect(dsn) as conn:
54 postgres_version = conn.server_version_tuple()
55 if postgres_version < POSTGRESQL_REQUIRED_VERSION:
56 LOG.fatal('Minimum supported version of Postgresql is %d.%d. '
57 'Found version %d.%d.',
58 POSTGRESQL_REQUIRED_VERSION[0], POSTGRESQL_REQUIRED_VERSION[1],
59 postgres_version[0], postgres_version[1])
60 raise UsageError('PostgreSQL server is too old.')
62 if rouser is not None:
63 with conn.cursor() as cur:
64 cnt = cur.scalar('SELECT count(*) FROM pg_user where usename = %s',
67 LOG.fatal("Web user '%s' does not exists. Create it with:\n"
68 "\n createuser %s", rouser, rouser)
69 raise UsageError('Missing read-only user.')
73 def setup_extensions(conn):
74 """ Set up all extensions needed for Nominatim. Also checks that the
75 versions of the extensions are sufficient.
77 with conn.cursor() as cur:
78 cur.execute('CREATE EXTENSION IF NOT EXISTS hstore')
79 cur.execute('CREATE EXTENSION IF NOT EXISTS postgis')
82 postgis_version = conn.postgis_version_tuple()
83 if postgis_version < POSTGIS_REQUIRED_VERSION:
84 LOG.fatal('Minimum supported version of PostGIS is %d.%d. '
85 'Found version %d.%d.',
86 POSTGIS_REQUIRED_VERSION[0], POSTGIS_REQUIRED_VERSION[1],
87 postgis_version[0], postgis_version[1])
88 raise UsageError('PostGIS version is too old.')
91 def install_module(src_dir, project_dir, module_dir, conn=None):
92 """ Copy the normalization module from src_dir into the project
93 directory under the '/module' directory. If 'module_dir' is set, then
94 use the module from there instead and check that it is accessible
97 The function detects when the installation is run from the
98 build directory. It doesn't touch the module in that case.
100 If 'conn' is given, then the function also tests if the module
101 can be access via the given database.
104 module_dir = project_dir / 'module'
106 if not module_dir.exists() or not src_dir.samefile(module_dir):
108 if not module_dir.exists():
111 destfile = module_dir / 'nominatim.so'
112 shutil.copy(str(src_dir / 'nominatim.so'), str(destfile))
113 destfile.chmod(0o755)
115 LOG.info('Database module installed at %s', str(destfile))
117 LOG.info('Running from build directory. Leaving database module as is.')
119 LOG.info("Using custom path for database module at '%s'", module_dir)
122 with conn.cursor() as cur:
124 cur.execute("""CREATE FUNCTION nominatim_test_import_func(text)
125 RETURNS text AS '{}/nominatim.so', 'transliteration'
126 LANGUAGE c IMMUTABLE STRICT;
127 DROP FUNCTION nominatim_test_import_func(text)
128 """.format(module_dir))
129 except psycopg2.DatabaseError as err:
130 LOG.fatal("Error accessing database module: %s", err)
131 raise UsageError("Database module cannot be accessed.") from err
134 def import_base_data(dsn, sql_dir, ignore_partitions=False):
135 """ Create and populate the tables with basic static data that provides
136 the background for geocoding. Data is assumed to not yet exist.
138 db_utils.execute_file(dsn, sql_dir / 'country_name.sql')
139 db_utils.execute_file(dsn, sql_dir / 'country_osm_grid.sql.gz')
141 if ignore_partitions:
142 with connect(dsn) as conn:
143 with conn.cursor() as cur:
144 cur.execute('UPDATE country_name SET partition = 0')
148 def import_osm_data(osm_file, options, drop=False):
149 """ Import the given OSM file. 'options' contains the list of
150 default settings for osm2pgsql.
152 options['import_file'] = osm_file
153 options['append'] = False
154 options['threads'] = 1
156 if not options['flatnode_file'] and options['osm2pgsql_cache'] == 0:
157 # Make some educated guesses about cache size based on the size
158 # of the import file and the available memory.
159 mem = psutil.virtual_memory()
160 fsize = os.stat(str(osm_file)).st_size
161 options['osm2pgsql_cache'] = int(min((mem.available + mem.cached) * 0.75,
162 fsize * 2) / 1024 / 1024) + 1
164 run_osm2pgsql(options)
166 with connect(options['dsn']) as conn:
167 with conn.cursor() as cur:
168 cur.execute('SELECT * FROM place LIMIT 1')
169 if cur.rowcount == 0:
170 raise UsageError('No data imported by osm2pgsql.')
173 conn.drop_table('planet_osm_nodes')
176 if options['flatnode_file']:
177 Path(options['flatnode_file']).unlink()
180 def truncate_data_tables(conn, max_word_frequency=None):
181 """ Truncate all data tables to prepare for a fresh load.
183 with conn.cursor() as cur:
184 cur.execute('TRUNCATE word')
185 cur.execute('TRUNCATE placex')
186 cur.execute('TRUNCATE place_addressline')
187 cur.execute('TRUNCATE location_area')
188 cur.execute('TRUNCATE location_area_country')
189 cur.execute('TRUNCATE location_property')
190 cur.execute('TRUNCATE location_property_tiger')
191 cur.execute('TRUNCATE location_property_osmline')
192 cur.execute('TRUNCATE location_postcode')
193 cur.execute('TRUNCATE search_name')
194 cur.execute('DROP SEQUENCE IF EXISTS seq_place')
195 cur.execute('CREATE SEQUENCE seq_place start 100000')
197 cur.execute("""SELECT tablename FROM pg_tables
198 WHERE tablename LIKE 'location_road_%'""")
200 for table in [r[0] for r in list(cur)]:
201 cur.execute('TRUNCATE ' + table)
203 if max_word_frequency is not None:
204 # Used by getorcreate_word_id to ignore frequent partial words.
205 cur.execute("""CREATE OR REPLACE FUNCTION get_maxwordfreq()
206 RETURNS integer AS $$
207 SELECT {} as maxwordfreq;
208 $$ LANGUAGE SQL IMMUTABLE
209 """.format(max_word_frequency))
212 _COPY_COLUMNS = 'osm_type, osm_id, class, type, name, admin_level, address, extratags, geometry'
214 def load_data(dsn, data_dir, threads):
215 """ Copy data into the word and placex table.
217 # Pre-calculate the most important terms in the word list.
218 db_utils.execute_file(dsn, data_dir / 'words.sql')
220 sel = selectors.DefaultSelector()
221 # Then copy data from place to placex in <threads - 1> chunks.
222 place_threads = max(1, threads - 1)
223 for imod in range(place_threads):
224 conn = DBConnection(dsn)
226 conn.perform("""INSERT INTO placex ({0})
227 SELECT {0} FROM place
228 WHERE osm_id % {1} = {2}
229 AND NOT (class='place' and type='houses')
230 AND ST_IsValid(geometry)
231 """.format(_COPY_COLUMNS, place_threads, imod))
232 sel.register(conn, selectors.EVENT_READ, conn)
234 # Address interpolations go into another table.
235 conn = DBConnection(dsn)
237 conn.perform("""INSERT INTO location_property_osmline (osm_id, address, linegeo)
238 SELECT osm_id, address, geometry FROM place
239 WHERE class='place' and type='houses' and osm_type='W'
240 and ST_GeometryType(geometry) = 'ST_LineString'
242 sel.register(conn, selectors.EVENT_READ, conn)
244 # Now wait for all of them to finish.
245 todo = place_threads + 1
247 for key, _ in sel.select(1):
253 print('.', end='', flush=True)
256 with connect(dsn) as conn:
257 with conn.cursor() as cur:
258 cur.execute('ANALYSE')