]> git.openstreetmap.org Git - nominatim.git/blob - nominatim/tools/database_import.py
Merge pull request #2303 from lonvia/remove-aux-support
[nominatim.git] / nominatim / tools / database_import.py
1 """
2 Functions for setting up and importing a new Nominatim database.
3 """
4 import logging
5 import os
6 import selectors
7 import subprocess
8 import shutil
9 from pathlib import Path
10
11 import psutil
12 import psycopg2
13
14 from nominatim.db.connection import connect, get_pg_env
15 from nominatim.db import utils as db_utils
16 from nominatim.db.async_connection import DBConnection
17 from nominatim.db.sql_preprocessor import SQLPreprocessor
18 from nominatim.tools.exec_utils import run_osm2pgsql
19 from nominatim.errors import UsageError
20 from nominatim.version import POSTGRESQL_REQUIRED_VERSION, POSTGIS_REQUIRED_VERSION
21
22 LOG = logging.getLogger()
23
24 def setup_database_skeleton(dsn, data_dir, no_partitions, rouser=None):
25     """ Create a new database for Nominatim and populate it with the
26         essential extensions and data.
27     """
28     LOG.warning('Creating database')
29     create_db(dsn, rouser)
30
31     LOG.warning('Setting up database')
32     with connect(dsn) as conn:
33         setup_extensions(conn)
34
35     LOG.warning('Loading basic data')
36     import_base_data(dsn, data_dir, no_partitions)
37
38
39 def create_db(dsn, rouser=None):
40     """ Create a new database for the given DSN. Fails when the database
41         already exists or the PostgreSQL version is too old.
42         Uses `createdb` to create the database.
43
44         If 'rouser' is given, then the function also checks that the user
45         with that given name exists.
46
47         Requires superuser rights by the caller.
48     """
49     proc = subprocess.run(['createdb'], env=get_pg_env(dsn), check=False)
50
51     if proc.returncode != 0:
52         raise UsageError('Creating new database failed.')
53
54     with connect(dsn) as conn:
55         postgres_version = conn.server_version_tuple()
56         if postgres_version < POSTGRESQL_REQUIRED_VERSION:
57             LOG.fatal('Minimum supported version of Postgresql is %d.%d. '
58                       'Found version %d.%d.',
59                       POSTGRESQL_REQUIRED_VERSION[0], POSTGRESQL_REQUIRED_VERSION[1],
60                       postgres_version[0], postgres_version[1])
61             raise UsageError('PostgreSQL server is too old.')
62
63         if rouser is not None:
64             with conn.cursor() as cur:
65                 cnt = cur.scalar('SELECT count(*) FROM pg_user where usename = %s',
66                                  (rouser, ))
67                 if cnt == 0:
68                     LOG.fatal("Web user '%s' does not exists. Create it with:\n"
69                               "\n      createuser %s", rouser, rouser)
70                     raise UsageError('Missing read-only user.')
71
72
73
74 def setup_extensions(conn):
75     """ Set up all extensions needed for Nominatim. Also checks that the
76         versions of the extensions are sufficient.
77     """
78     with conn.cursor() as cur:
79         cur.execute('CREATE EXTENSION IF NOT EXISTS hstore')
80         cur.execute('CREATE EXTENSION IF NOT EXISTS postgis')
81     conn.commit()
82
83     postgis_version = conn.postgis_version_tuple()
84     if postgis_version < POSTGIS_REQUIRED_VERSION:
85         LOG.fatal('Minimum supported version of PostGIS is %d.%d. '
86                   'Found version %d.%d.',
87                   POSTGIS_REQUIRED_VERSION[0], POSTGIS_REQUIRED_VERSION[1],
88                   postgis_version[0], postgis_version[1])
89         raise UsageError('PostGIS version is too old.')
90
91
92 def install_module(src_dir, project_dir, module_dir, conn=None):
93     """ Copy the normalization module from src_dir into the project
94         directory under the '/module' directory. If 'module_dir' is set, then
95         use the module from there instead and check that it is accessible
96         for Postgresql.
97
98         The function detects when the installation is run from the
99         build directory. It doesn't touch the module in that case.
100
101         If 'conn' is given, then the function also tests if the module
102         can be access via the given database.
103     """
104     if not module_dir:
105         module_dir = project_dir / 'module'
106
107         if not module_dir.exists() or not src_dir.samefile(module_dir):
108
109             if not module_dir.exists():
110                 module_dir.mkdir()
111
112             destfile = module_dir / 'nominatim.so'
113             shutil.copy(str(src_dir / 'nominatim.so'), str(destfile))
114             destfile.chmod(0o755)
115
116             LOG.info('Database module installed at %s', str(destfile))
117         else:
118             LOG.info('Running from build directory. Leaving database module as is.')
119     else:
120         LOG.info("Using custom path for database module at '%s'", module_dir)
121
122     if conn is not None:
123         with conn.cursor() as cur:
124             try:
125                 cur.execute("""CREATE FUNCTION nominatim_test_import_func(text)
126                                RETURNS text AS '{}/nominatim.so', 'transliteration'
127                                LANGUAGE c IMMUTABLE STRICT;
128                                DROP FUNCTION nominatim_test_import_func(text)
129                             """.format(module_dir))
130             except psycopg2.DatabaseError as err:
131                 LOG.fatal("Error accessing database module: %s", err)
132                 raise UsageError("Database module cannot be accessed.") from err
133
134
135 def import_base_data(dsn, sql_dir, ignore_partitions=False):
136     """ Create and populate the tables with basic static data that provides
137         the background for geocoding. Data is assumed to not yet exist.
138     """
139     db_utils.execute_file(dsn, sql_dir / 'country_name.sql')
140     db_utils.execute_file(dsn, sql_dir / 'country_osm_grid.sql.gz')
141
142     if ignore_partitions:
143         with connect(dsn) as conn:
144             with conn.cursor() as cur:
145                 cur.execute('UPDATE country_name SET partition = 0')
146             conn.commit()
147
148
149 def import_osm_data(osm_file, options, drop=False, ignore_errors=False):
150     """ Import the given OSM file. 'options' contains the list of
151         default settings for osm2pgsql.
152     """
153     options['import_file'] = osm_file
154     options['append'] = False
155     options['threads'] = 1
156
157     if not options['flatnode_file'] and options['osm2pgsql_cache'] == 0:
158         # Make some educated guesses about cache size based on the size
159         # of the import file and the available memory.
160         mem = psutil.virtual_memory()
161         fsize = os.stat(str(osm_file)).st_size
162         options['osm2pgsql_cache'] = int(min((mem.available + mem.cached) * 0.75,
163                                              fsize * 2) / 1024 / 1024) + 1
164
165     run_osm2pgsql(options)
166
167     with connect(options['dsn']) as conn:
168         if not ignore_errors:
169             with conn.cursor() as cur:
170                 cur.execute('SELECT * FROM place LIMIT 1')
171                 if cur.rowcount == 0:
172                     raise UsageError('No data imported by osm2pgsql.')
173
174         if drop:
175             conn.drop_table('planet_osm_nodes')
176
177     if drop:
178         if options['flatnode_file']:
179             Path(options['flatnode_file']).unlink()
180
181
182 def create_tables(conn, config, reverse_only=False):
183     """ Create the set of basic tables.
184         When `reverse_only` is True, then the main table for searching will
185         be skipped and only reverse search is possible.
186     """
187     sql = SQLPreprocessor(conn, config)
188     sql.env.globals['db']['reverse_only'] = reverse_only
189
190     sql.run_sql_file(conn, 'tables.sql')
191
192
193 def create_table_triggers(conn, config):
194     """ Create the triggers for the tables. The trigger functions must already
195         have been imported with refresh.create_functions().
196     """
197     sql = SQLPreprocessor(conn, config)
198     sql.run_sql_file(conn, 'table-triggers.sql')
199
200
201 def create_partition_tables(conn, config):
202     """ Create tables that have explicit partitioning.
203     """
204     sql = SQLPreprocessor(conn, config)
205     sql.run_sql_file(conn, 'partition-tables.src.sql')
206
207
208 def truncate_data_tables(conn, max_word_frequency=None):
209     """ Truncate all data tables to prepare for a fresh load.
210     """
211     with conn.cursor() as cur:
212         cur.execute('TRUNCATE word')
213         cur.execute('TRUNCATE placex')
214         cur.execute('TRUNCATE place_addressline')
215         cur.execute('TRUNCATE location_area')
216         cur.execute('TRUNCATE location_area_country')
217         cur.execute('TRUNCATE location_property_tiger')
218         cur.execute('TRUNCATE location_property_osmline')
219         cur.execute('TRUNCATE location_postcode')
220         if conn.table_exists('search_name'):
221             cur.execute('TRUNCATE search_name')
222         cur.execute('DROP SEQUENCE IF EXISTS seq_place')
223         cur.execute('CREATE SEQUENCE seq_place start 100000')
224
225         cur.execute("""SELECT tablename FROM pg_tables
226                        WHERE tablename LIKE 'location_road_%'""")
227
228         for table in [r[0] for r in list(cur)]:
229             cur.execute('TRUNCATE ' + table)
230
231         if max_word_frequency is not None:
232             # Used by getorcreate_word_id to ignore frequent partial words.
233             cur.execute("""CREATE OR REPLACE FUNCTION get_maxwordfreq()
234                            RETURNS integer AS $$
235                              SELECT {} as maxwordfreq;
236                            $$ LANGUAGE SQL IMMUTABLE
237                         """.format(max_word_frequency))
238         conn.commit()
239
240 _COPY_COLUMNS = 'osm_type, osm_id, class, type, name, admin_level, address, extratags, geometry'
241
242 def load_data(dsn, data_dir, threads):
243     """ Copy data into the word and placex table.
244     """
245     # Pre-calculate the most important terms in the word list.
246     db_utils.execute_file(dsn, data_dir / 'words.sql')
247
248     sel = selectors.DefaultSelector()
249     # Then copy data from place to placex in <threads - 1> chunks.
250     place_threads = max(1, threads - 1)
251     for imod in range(place_threads):
252         conn = DBConnection(dsn)
253         conn.connect()
254         conn.perform("""INSERT INTO placex ({0})
255                          SELECT {0} FROM place
256                          WHERE osm_id % {1} = {2}
257                            AND NOT (class='place' and type='houses')
258                            AND ST_IsValid(geometry)
259                      """.format(_COPY_COLUMNS, place_threads, imod))
260         sel.register(conn, selectors.EVENT_READ, conn)
261
262     # Address interpolations go into another table.
263     conn = DBConnection(dsn)
264     conn.connect()
265     conn.perform("""INSERT INTO location_property_osmline (osm_id, address, linegeo)
266                       SELECT osm_id, address, geometry FROM place
267                       WHERE class='place' and type='houses' and osm_type='W'
268                             and ST_GeometryType(geometry) = 'ST_LineString'
269                  """)
270     sel.register(conn, selectors.EVENT_READ, conn)
271
272     # Now wait for all of them to finish.
273     todo = place_threads + 1
274     while todo > 0:
275         for key, _ in sel.select(1):
276             conn = key.data
277             sel.unregister(conn)
278             conn.wait()
279             conn.close()
280             todo -= 1
281         print('.', end='', flush=True)
282     print('\n')
283
284     with connect(dsn) as conn:
285         with conn.cursor() as cur:
286             cur.execute('ANALYSE')
287
288
289 def create_search_indices(conn, config, drop=False):
290     """ Create tables that have explicit partitioning.
291     """
292
293     # If index creation failed and left an index invalid, they need to be
294     # cleaned out first, so that the script recreates them.
295     with conn.cursor() as cur:
296         cur.execute("""SELECT relname FROM pg_class, pg_index
297                        WHERE pg_index.indisvalid = false
298                              AND pg_index.indexrelid = pg_class.oid""")
299         bad_indices = [row[0] for row in list(cur)]
300         for idx in bad_indices:
301             LOG.info("Drop invalid index %s.", idx)
302             cur.execute('DROP INDEX "{}"'.format(idx))
303     conn.commit()
304
305     sql = SQLPreprocessor(conn, config)
306
307     sql.run_sql_file(conn, 'indices.sql', drop=drop)
308
309 def create_country_names(conn, config):
310     """ Create search index for default country names.
311     """
312
313     with conn.cursor() as cur:
314         cur.execute("""SELECT getorcreate_country(make_standard_name('uk'), 'gb')""")
315         cur.execute("""SELECT getorcreate_country(make_standard_name('united states'), 'us')""")
316         cur.execute("""SELECT COUNT(*) FROM
317                        (SELECT getorcreate_country(make_standard_name(country_code),
318                        country_code) FROM country_name WHERE country_code is not null) AS x""")
319         cur.execute("""SELECT COUNT(*) FROM
320                        (SELECT getorcreate_country(make_standard_name(name->'name'), country_code) 
321                        FROM country_name WHERE name ? 'name') AS x""")
322         sql_statement = """SELECT COUNT(*) FROM (SELECT getorcreate_country(make_standard_name(v),
323                            country_code) FROM (SELECT country_code, skeys(name)
324                            AS k, svals(name) AS v FROM country_name) x WHERE k"""
325
326         languages = config.LANGUAGES
327
328         if languages:
329             sql_statement = "{} IN (".format(sql_statement)
330             delim = ''
331             for language in languages.split(','):
332                 sql_statement = "{}{}'name:{}'".format(sql_statement, delim, language)
333                 delim = ', '
334             sql_statement = '{})'.format(sql_statement)
335         else:
336             sql_statement = "{} LIKE 'name:%'".format(sql_statement)
337         sql_statement = "{}) v".format(sql_statement)
338         cur.execute(sql_statement)
339     conn.commit()