]> git.openstreetmap.org Git - nominatim.git/blob - nominatim/tools/database_import.py
Merge pull request #2281 from changpingc/changping/fix-tiger-index
[nominatim.git] / nominatim / tools / database_import.py
1 """
2 Functions for setting up and importing a new Nominatim database.
3 """
4 import logging
5 import os
6 import selectors
7 import subprocess
8 import shutil
9 from pathlib import Path
10
11 import psutil
12 import psycopg2
13
14 from nominatim.db.connection import connect, get_pg_env
15 from nominatim.db import utils as db_utils
16 from nominatim.db.async_connection import DBConnection
17 from nominatim.db.sql_preprocessor import SQLPreprocessor
18 from nominatim.tools.exec_utils import run_osm2pgsql
19 from nominatim.errors import UsageError
20 from nominatim.version import POSTGRESQL_REQUIRED_VERSION, POSTGIS_REQUIRED_VERSION
21
22 LOG = logging.getLogger()
23
24 def setup_database_skeleton(dsn, data_dir, no_partitions, rouser=None):
25     """ Create a new database for Nominatim and populate it with the
26         essential extensions and data.
27     """
28     LOG.warning('Creating database')
29     create_db(dsn, rouser)
30
31     LOG.warning('Setting up database')
32     with connect(dsn) as conn:
33         setup_extensions(conn)
34
35     LOG.warning('Loading basic data')
36     import_base_data(dsn, data_dir, no_partitions)
37
38
39 def create_db(dsn, rouser=None):
40     """ Create a new database for the given DSN. Fails when the database
41         already exists or the PostgreSQL version is too old.
42         Uses `createdb` to create the database.
43
44         If 'rouser' is given, then the function also checks that the user
45         with that given name exists.
46
47         Requires superuser rights by the caller.
48     """
49     proc = subprocess.run(['createdb'], env=get_pg_env(dsn), check=False)
50
51     if proc.returncode != 0:
52         raise UsageError('Creating new database failed.')
53
54     with connect(dsn) as conn:
55         postgres_version = conn.server_version_tuple()
56         if postgres_version < POSTGRESQL_REQUIRED_VERSION:
57             LOG.fatal('Minimum supported version of Postgresql is %d.%d. '
58                       'Found version %d.%d.',
59                       POSTGRESQL_REQUIRED_VERSION[0], POSTGRESQL_REQUIRED_VERSION[1],
60                       postgres_version[0], postgres_version[1])
61             raise UsageError('PostgreSQL server is too old.')
62
63         if rouser is not None:
64             with conn.cursor() as cur:
65                 cnt = cur.scalar('SELECT count(*) FROM pg_user where usename = %s',
66                                  (rouser, ))
67                 if cnt == 0:
68                     LOG.fatal("Web user '%s' does not exists. Create it with:\n"
69                               "\n      createuser %s", rouser, rouser)
70                     raise UsageError('Missing read-only user.')
71
72
73
74 def setup_extensions(conn):
75     """ Set up all extensions needed for Nominatim. Also checks that the
76         versions of the extensions are sufficient.
77     """
78     with conn.cursor() as cur:
79         cur.execute('CREATE EXTENSION IF NOT EXISTS hstore')
80         cur.execute('CREATE EXTENSION IF NOT EXISTS postgis')
81     conn.commit()
82
83     postgis_version = conn.postgis_version_tuple()
84     if postgis_version < POSTGIS_REQUIRED_VERSION:
85         LOG.fatal('Minimum supported version of PostGIS is %d.%d. '
86                   'Found version %d.%d.',
87                   POSTGIS_REQUIRED_VERSION[0], POSTGIS_REQUIRED_VERSION[1],
88                   postgis_version[0], postgis_version[1])
89         raise UsageError('PostGIS version is too old.')
90
91
92 def install_module(src_dir, project_dir, module_dir, conn=None):
93     """ Copy the normalization module from src_dir into the project
94         directory under the '/module' directory. If 'module_dir' is set, then
95         use the module from there instead and check that it is accessible
96         for Postgresql.
97
98         The function detects when the installation is run from the
99         build directory. It doesn't touch the module in that case.
100
101         If 'conn' is given, then the function also tests if the module
102         can be access via the given database.
103     """
104     if not module_dir:
105         module_dir = project_dir / 'module'
106
107         if not module_dir.exists() or not src_dir.samefile(module_dir):
108
109             if not module_dir.exists():
110                 module_dir.mkdir()
111
112             destfile = module_dir / 'nominatim.so'
113             shutil.copy(str(src_dir / 'nominatim.so'), str(destfile))
114             destfile.chmod(0o755)
115
116             LOG.info('Database module installed at %s', str(destfile))
117         else:
118             LOG.info('Running from build directory. Leaving database module as is.')
119     else:
120         LOG.info("Using custom path for database module at '%s'", module_dir)
121
122     if conn is not None:
123         with conn.cursor() as cur:
124             try:
125                 cur.execute("""CREATE FUNCTION nominatim_test_import_func(text)
126                                RETURNS text AS '{}/nominatim.so', 'transliteration'
127                                LANGUAGE c IMMUTABLE STRICT;
128                                DROP FUNCTION nominatim_test_import_func(text)
129                             """.format(module_dir))
130             except psycopg2.DatabaseError as err:
131                 LOG.fatal("Error accessing database module: %s", err)
132                 raise UsageError("Database module cannot be accessed.") from err
133
134
135 def import_base_data(dsn, sql_dir, ignore_partitions=False):
136     """ Create and populate the tables with basic static data that provides
137         the background for geocoding. Data is assumed to not yet exist.
138     """
139     db_utils.execute_file(dsn, sql_dir / 'country_name.sql')
140     db_utils.execute_file(dsn, sql_dir / 'country_osm_grid.sql.gz')
141
142     if ignore_partitions:
143         with connect(dsn) as conn:
144             with conn.cursor() as cur:
145                 cur.execute('UPDATE country_name SET partition = 0')
146             conn.commit()
147
148
149 def import_osm_data(osm_file, options, drop=False, ignore_errors=False):
150     """ Import the given OSM file. 'options' contains the list of
151         default settings for osm2pgsql.
152     """
153     options['import_file'] = osm_file
154     options['append'] = False
155     options['threads'] = 1
156
157     if not options['flatnode_file'] and options['osm2pgsql_cache'] == 0:
158         # Make some educated guesses about cache size based on the size
159         # of the import file and the available memory.
160         mem = psutil.virtual_memory()
161         fsize = os.stat(str(osm_file)).st_size
162         options['osm2pgsql_cache'] = int(min((mem.available + mem.cached) * 0.75,
163                                              fsize * 2) / 1024 / 1024) + 1
164
165     run_osm2pgsql(options)
166
167     with connect(options['dsn']) as conn:
168         if not ignore_errors:
169             with conn.cursor() as cur:
170                 cur.execute('SELECT * FROM place LIMIT 1')
171                 if cur.rowcount == 0:
172                     raise UsageError('No data imported by osm2pgsql.')
173
174         if drop:
175             conn.drop_table('planet_osm_nodes')
176
177     if drop:
178         if options['flatnode_file']:
179             Path(options['flatnode_file']).unlink()
180
181
182 def create_tables(conn, config, sqllib_dir, reverse_only=False):
183     """ Create the set of basic tables.
184         When `reverse_only` is True, then the main table for searching will
185         be skipped and only reverse search is possible.
186     """
187     sql = SQLPreprocessor(conn, config, sqllib_dir)
188     sql.env.globals['db']['reverse_only'] = reverse_only
189
190     sql.run_sql_file(conn, 'tables.sql')
191
192
193 def create_table_triggers(conn, config, sqllib_dir):
194     """ Create the triggers for the tables. The trigger functions must already
195         have been imported with refresh.create_functions().
196     """
197     sql = SQLPreprocessor(conn, config, sqllib_dir)
198     sql.run_sql_file(conn, 'table-triggers.sql')
199
200
201 def create_partition_tables(conn, config, sqllib_dir):
202     """ Create tables that have explicit partitioning.
203     """
204     sql = SQLPreprocessor(conn, config, sqllib_dir)
205     sql.run_sql_file(conn, 'partition-tables.src.sql')
206
207
208 def truncate_data_tables(conn, max_word_frequency=None):
209     """ Truncate all data tables to prepare for a fresh load.
210     """
211     with conn.cursor() as cur:
212         cur.execute('TRUNCATE word')
213         cur.execute('TRUNCATE placex')
214         cur.execute('TRUNCATE place_addressline')
215         cur.execute('TRUNCATE location_area')
216         cur.execute('TRUNCATE location_area_country')
217         cur.execute('TRUNCATE location_property')
218         cur.execute('TRUNCATE location_property_tiger')
219         cur.execute('TRUNCATE location_property_osmline')
220         cur.execute('TRUNCATE location_postcode')
221         if conn.table_exists('search_name'):
222             cur.execute('TRUNCATE search_name')
223         cur.execute('DROP SEQUENCE IF EXISTS seq_place')
224         cur.execute('CREATE SEQUENCE seq_place start 100000')
225
226         cur.execute("""SELECT tablename FROM pg_tables
227                        WHERE tablename LIKE 'location_road_%'""")
228
229         for table in [r[0] for r in list(cur)]:
230             cur.execute('TRUNCATE ' + table)
231
232         if max_word_frequency is not None:
233             # Used by getorcreate_word_id to ignore frequent partial words.
234             cur.execute("""CREATE OR REPLACE FUNCTION get_maxwordfreq()
235                            RETURNS integer AS $$
236                              SELECT {} as maxwordfreq;
237                            $$ LANGUAGE SQL IMMUTABLE
238                         """.format(max_word_frequency))
239         conn.commit()
240
241 _COPY_COLUMNS = 'osm_type, osm_id, class, type, name, admin_level, address, extratags, geometry'
242
243 def load_data(dsn, data_dir, threads):
244     """ Copy data into the word and placex table.
245     """
246     # Pre-calculate the most important terms in the word list.
247     db_utils.execute_file(dsn, data_dir / 'words.sql')
248
249     sel = selectors.DefaultSelector()
250     # Then copy data from place to placex in <threads - 1> chunks.
251     place_threads = max(1, threads - 1)
252     for imod in range(place_threads):
253         conn = DBConnection(dsn)
254         conn.connect()
255         conn.perform("""INSERT INTO placex ({0})
256                          SELECT {0} FROM place
257                          WHERE osm_id % {1} = {2}
258                            AND NOT (class='place' and type='houses')
259                            AND ST_IsValid(geometry)
260                      """.format(_COPY_COLUMNS, place_threads, imod))
261         sel.register(conn, selectors.EVENT_READ, conn)
262
263     # Address interpolations go into another table.
264     conn = DBConnection(dsn)
265     conn.connect()
266     conn.perform("""INSERT INTO location_property_osmline (osm_id, address, linegeo)
267                       SELECT osm_id, address, geometry FROM place
268                       WHERE class='place' and type='houses' and osm_type='W'
269                             and ST_GeometryType(geometry) = 'ST_LineString'
270                  """)
271     sel.register(conn, selectors.EVENT_READ, conn)
272
273     # Now wait for all of them to finish.
274     todo = place_threads + 1
275     while todo > 0:
276         for key, _ in sel.select(1):
277             conn = key.data
278             sel.unregister(conn)
279             conn.wait()
280             conn.close()
281             todo -= 1
282         print('.', end='', flush=True)
283     print('\n')
284
285     with connect(dsn) as conn:
286         with conn.cursor() as cur:
287             cur.execute('ANALYSE')
288
289
290 def create_search_indices(conn, config, sqllib_dir, drop=False):
291     """ Create tables that have explicit partitioning.
292     """
293
294     # If index creation failed and left an index invalid, they need to be
295     # cleaned out first, so that the script recreates them.
296     with conn.cursor() as cur:
297         cur.execute("""SELECT relname FROM pg_class, pg_index
298                        WHERE pg_index.indisvalid = false
299                              AND pg_index.indexrelid = pg_class.oid""")
300         bad_indices = [row[0] for row in list(cur)]
301         for idx in bad_indices:
302             LOG.info("Drop invalid index %s.", idx)
303             cur.execute('DROP INDEX "{}"'.format(idx))
304     conn.commit()
305
306     sql = SQLPreprocessor(conn, config, sqllib_dir)
307
308     sql.run_sql_file(conn, 'indices.sql', drop=drop)
309
310 def create_country_names(conn, config):
311     """ Create search index for default country names.
312     """
313
314     with conn.cursor() as cur:
315         cur.execute("""SELECT getorcreate_country(make_standard_name('uk'), 'gb')""")
316         cur.execute("""SELECT getorcreate_country(make_standard_name('united states'), 'us')""")
317         cur.execute("""SELECT COUNT(*) FROM
318                        (SELECT getorcreate_country(make_standard_name(country_code),
319                        country_code) FROM country_name WHERE country_code is not null) AS x""")
320         cur.execute("""SELECT COUNT(*) FROM
321                        (SELECT getorcreate_country(make_standard_name(name->'name'), country_code) 
322                        FROM country_name WHERE name ? 'name') AS x""")
323         sql_statement = """SELECT COUNT(*) FROM (SELECT getorcreate_country(make_standard_name(v),
324                            country_code) FROM (SELECT country_code, skeys(name)
325                            AS k, svals(name) AS v FROM country_name) x WHERE k"""
326
327         languages = config.LANGUAGES
328
329         if languages:
330             sql_statement = "{} IN (".format(sql_statement)
331             delim = ''
332             for language in languages.split(','):
333                 sql_statement = "{}{}'name:{}'".format(sql_statement, delim, language)
334                 delim = ', '
335             sql_statement = '{})'.format(sql_statement)
336         else:
337             sql_statement = "{} LIKE 'name:%'".format(sql_statement)
338         sql_statement = "{}) v".format(sql_statement)
339         cur.execute(sql_statement)
340     conn.commit()