2 Functions for setting up and importing a new Nominatim database.
9 from pathlib import Path
14 from nominatim.db.connection import connect, get_pg_env
15 from nominatim.db import utils as db_utils
16 from nominatim.db.async_connection import DBConnection
17 from nominatim.db.sql_preprocessor import SQLPreprocessor
18 from nominatim.tools.exec_utils import run_osm2pgsql
19 from nominatim.errors import UsageError
20 from nominatim.version import POSTGRESQL_REQUIRED_VERSION, POSTGIS_REQUIRED_VERSION
22 LOG = logging.getLogger()
24 def setup_database_skeleton(dsn, data_dir, no_partitions, rouser=None):
25 """ Create a new database for Nominatim and populate it with the
26 essential extensions and data.
28 LOG.warning('Creating database')
29 create_db(dsn, rouser)
31 LOG.warning('Setting up database')
32 with connect(dsn) as conn:
33 setup_extensions(conn)
35 LOG.warning('Loading basic data')
36 import_base_data(dsn, data_dir, no_partitions)
39 def create_db(dsn, rouser=None):
40 """ Create a new database for the given DSN. Fails when the database
41 already exists or the PostgreSQL version is too old.
42 Uses `createdb` to create the database.
44 If 'rouser' is given, then the function also checks that the user
45 with that given name exists.
47 Requires superuser rights by the caller.
49 proc = subprocess.run(['createdb'], env=get_pg_env(dsn), check=False)
51 if proc.returncode != 0:
52 raise UsageError('Creating new database failed.')
54 with connect(dsn) as conn:
55 postgres_version = conn.server_version_tuple()
56 if postgres_version < POSTGRESQL_REQUIRED_VERSION:
57 LOG.fatal('Minimum supported version of Postgresql is %d.%d. '
58 'Found version %d.%d.',
59 POSTGRESQL_REQUIRED_VERSION[0], POSTGRESQL_REQUIRED_VERSION[1],
60 postgres_version[0], postgres_version[1])
61 raise UsageError('PostgreSQL server is too old.')
63 if rouser is not None:
64 with conn.cursor() as cur:
65 cnt = cur.scalar('SELECT count(*) FROM pg_user where usename = %s',
68 LOG.fatal("Web user '%s' does not exists. Create it with:\n"
69 "\n createuser %s", rouser, rouser)
70 raise UsageError('Missing read-only user.')
74 def setup_extensions(conn):
75 """ Set up all extensions needed for Nominatim. Also checks that the
76 versions of the extensions are sufficient.
78 with conn.cursor() as cur:
79 cur.execute('CREATE EXTENSION IF NOT EXISTS hstore')
80 cur.execute('CREATE EXTENSION IF NOT EXISTS postgis')
83 postgis_version = conn.postgis_version_tuple()
84 if postgis_version < POSTGIS_REQUIRED_VERSION:
85 LOG.fatal('Minimum supported version of PostGIS is %d.%d. '
86 'Found version %d.%d.',
87 POSTGIS_REQUIRED_VERSION[0], POSTGIS_REQUIRED_VERSION[1],
88 postgis_version[0], postgis_version[1])
89 raise UsageError('PostGIS version is too old.')
92 def install_module(src_dir, project_dir, module_dir, conn=None):
93 """ Copy the normalization module from src_dir into the project
94 directory under the '/module' directory. If 'module_dir' is set, then
95 use the module from there instead and check that it is accessible
98 The function detects when the installation is run from the
99 build directory. It doesn't touch the module in that case.
101 If 'conn' is given, then the function also tests if the module
102 can be access via the given database.
105 module_dir = project_dir / 'module'
107 if not module_dir.exists() or not src_dir.samefile(module_dir):
109 if not module_dir.exists():
112 destfile = module_dir / 'nominatim.so'
113 shutil.copy(str(src_dir / 'nominatim.so'), str(destfile))
114 destfile.chmod(0o755)
116 LOG.info('Database module installed at %s', str(destfile))
118 LOG.info('Running from build directory. Leaving database module as is.')
120 LOG.info("Using custom path for database module at '%s'", module_dir)
123 with conn.cursor() as cur:
125 cur.execute("""CREATE FUNCTION nominatim_test_import_func(text)
126 RETURNS text AS '{}/nominatim.so', 'transliteration'
127 LANGUAGE c IMMUTABLE STRICT;
128 DROP FUNCTION nominatim_test_import_func(text)
129 """.format(module_dir))
130 except psycopg2.DatabaseError as err:
131 LOG.fatal("Error accessing database module: %s", err)
132 raise UsageError("Database module cannot be accessed.") from err
135 def import_base_data(dsn, sql_dir, ignore_partitions=False):
136 """ Create and populate the tables with basic static data that provides
137 the background for geocoding. Data is assumed to not yet exist.
139 db_utils.execute_file(dsn, sql_dir / 'country_name.sql')
140 db_utils.execute_file(dsn, sql_dir / 'country_osm_grid.sql.gz')
142 if ignore_partitions:
143 with connect(dsn) as conn:
144 with conn.cursor() as cur:
145 cur.execute('UPDATE country_name SET partition = 0')
149 def import_osm_data(osm_file, options, drop=False, ignore_errors=False):
150 """ Import the given OSM file. 'options' contains the list of
151 default settings for osm2pgsql.
153 options['import_file'] = osm_file
154 options['append'] = False
155 options['threads'] = 1
157 if not options['flatnode_file'] and options['osm2pgsql_cache'] == 0:
158 # Make some educated guesses about cache size based on the size
159 # of the import file and the available memory.
160 mem = psutil.virtual_memory()
161 fsize = os.stat(str(osm_file)).st_size
162 options['osm2pgsql_cache'] = int(min((mem.available + mem.cached) * 0.75,
163 fsize * 2) / 1024 / 1024) + 1
165 run_osm2pgsql(options)
167 with connect(options['dsn']) as conn:
168 if not ignore_errors:
169 with conn.cursor() as cur:
170 cur.execute('SELECT * FROM place LIMIT 1')
171 if cur.rowcount == 0:
172 raise UsageError('No data imported by osm2pgsql.')
175 conn.drop_table('planet_osm_nodes')
178 if options['flatnode_file']:
179 Path(options['flatnode_file']).unlink()
182 def create_tables(conn, config, reverse_only=False):
183 """ Create the set of basic tables.
184 When `reverse_only` is True, then the main table for searching will
185 be skipped and only reverse search is possible.
187 sql = SQLPreprocessor(conn, config)
188 sql.env.globals['db']['reverse_only'] = reverse_only
190 sql.run_sql_file(conn, 'tables.sql')
193 def create_table_triggers(conn, config):
194 """ Create the triggers for the tables. The trigger functions must already
195 have been imported with refresh.create_functions().
197 sql = SQLPreprocessor(conn, config)
198 sql.run_sql_file(conn, 'table-triggers.sql')
201 def create_partition_tables(conn, config):
202 """ Create tables that have explicit partitioning.
204 sql = SQLPreprocessor(conn, config)
205 sql.run_sql_file(conn, 'partition-tables.src.sql')
208 def truncate_data_tables(conn, max_word_frequency=None):
209 """ Truncate all data tables to prepare for a fresh load.
211 with conn.cursor() as cur:
212 cur.execute('TRUNCATE word')
213 cur.execute('TRUNCATE placex')
214 cur.execute('TRUNCATE place_addressline')
215 cur.execute('TRUNCATE location_area')
216 cur.execute('TRUNCATE location_area_country')
217 cur.execute('TRUNCATE location_property_tiger')
218 cur.execute('TRUNCATE location_property_osmline')
219 cur.execute('TRUNCATE location_postcode')
220 if conn.table_exists('search_name'):
221 cur.execute('TRUNCATE search_name')
222 cur.execute('DROP SEQUENCE IF EXISTS seq_place')
223 cur.execute('CREATE SEQUENCE seq_place start 100000')
225 cur.execute("""SELECT tablename FROM pg_tables
226 WHERE tablename LIKE 'location_road_%'""")
228 for table in [r[0] for r in list(cur)]:
229 cur.execute('TRUNCATE ' + table)
231 if max_word_frequency is not None:
232 # Used by getorcreate_word_id to ignore frequent partial words.
233 cur.execute("""CREATE OR REPLACE FUNCTION get_maxwordfreq()
234 RETURNS integer AS $$
235 SELECT {} as maxwordfreq;
236 $$ LANGUAGE SQL IMMUTABLE
237 """.format(max_word_frequency))
240 _COPY_COLUMNS = 'osm_type, osm_id, class, type, name, admin_level, address, extratags, geometry'
242 def load_data(dsn, data_dir, threads):
243 """ Copy data into the word and placex table.
245 # Pre-calculate the most important terms in the word list.
246 db_utils.execute_file(dsn, data_dir / 'words.sql')
248 sel = selectors.DefaultSelector()
249 # Then copy data from place to placex in <threads - 1> chunks.
250 place_threads = max(1, threads - 1)
251 for imod in range(place_threads):
252 conn = DBConnection(dsn)
254 conn.perform("""INSERT INTO placex ({0})
255 SELECT {0} FROM place
256 WHERE osm_id % {1} = {2}
257 AND NOT (class='place' and type='houses')
258 AND ST_IsValid(geometry)
259 """.format(_COPY_COLUMNS, place_threads, imod))
260 sel.register(conn, selectors.EVENT_READ, conn)
262 # Address interpolations go into another table.
263 conn = DBConnection(dsn)
265 conn.perform("""INSERT INTO location_property_osmline (osm_id, address, linegeo)
266 SELECT osm_id, address, geometry FROM place
267 WHERE class='place' and type='houses' and osm_type='W'
268 and ST_GeometryType(geometry) = 'ST_LineString'
270 sel.register(conn, selectors.EVENT_READ, conn)
272 # Now wait for all of them to finish.
273 todo = place_threads + 1
275 for key, _ in sel.select(1):
281 print('.', end='', flush=True)
284 with connect(dsn) as conn:
285 with conn.cursor() as cur:
286 cur.execute('ANALYSE')
289 def create_search_indices(conn, config, drop=False):
290 """ Create tables that have explicit partitioning.
293 # If index creation failed and left an index invalid, they need to be
294 # cleaned out first, so that the script recreates them.
295 with conn.cursor() as cur:
296 cur.execute("""SELECT relname FROM pg_class, pg_index
297 WHERE pg_index.indisvalid = false
298 AND pg_index.indexrelid = pg_class.oid""")
299 bad_indices = [row[0] for row in list(cur)]
300 for idx in bad_indices:
301 LOG.info("Drop invalid index %s.", idx)
302 cur.execute('DROP INDEX "{}"'.format(idx))
305 sql = SQLPreprocessor(conn, config)
307 sql.run_sql_file(conn, 'indices.sql', drop=drop)
309 def create_country_names(conn, config):
310 """ Create search index for default country names.
313 with conn.cursor() as cur:
314 cur.execute("""SELECT getorcreate_country(make_standard_name('uk'), 'gb')""")
315 cur.execute("""SELECT getorcreate_country(make_standard_name('united states'), 'us')""")
316 cur.execute("""SELECT COUNT(*) FROM
317 (SELECT getorcreate_country(make_standard_name(country_code),
318 country_code) FROM country_name WHERE country_code is not null) AS x""")
319 cur.execute("""SELECT COUNT(*) FROM
320 (SELECT getorcreate_country(make_standard_name(name->'name'), country_code)
321 FROM country_name WHERE name ? 'name') AS x""")
322 sql_statement = """SELECT COUNT(*) FROM (SELECT getorcreate_country(make_standard_name(v),
323 country_code) FROM (SELECT country_code, skeys(name)
324 AS k, svals(name) AS v FROM country_name) x WHERE k"""
326 languages = config.LANGUAGES
329 sql_statement = "{} IN (".format(sql_statement)
331 for language in languages.split(','):
332 sql_statement = "{}{}'name:{}'".format(sql_statement, delim, language)
334 sql_statement = '{})'.format(sql_statement)
336 sql_statement = "{} LIKE 'name:%'".format(sql_statement)
337 sql_statement = "{}) v".format(sql_statement)
338 cur.execute(sql_statement)