]> git.openstreetmap.org Git - nominatim.git/blob - nominatim/tools/database_import.py
move setup function to python
[nominatim.git] / nominatim / tools / database_import.py
1 """
2 Functions for setting up and importing a new Nominatim database.
3 """
4 import logging
5 import os
6 import selectors
7 import subprocess
8 import shutil
9 from pathlib import Path
10
11 import psutil
12 import psycopg2
13
14 from ..db.connection import connect, get_pg_env
15 from ..db import utils as db_utils
16 from ..db.async_connection import DBConnection
17 from .exec_utils import run_osm2pgsql
18 from ..errors import UsageError
19 from ..version import POSTGRESQL_REQUIRED_VERSION, POSTGIS_REQUIRED_VERSION
20
21 LOG = logging.getLogger()
22
23 def setup_database_skeleton(dsn, data_dir, no_partitions, rouser=None):
24     """ Create a new database for Nominatim and populate it with the
25         essential extensions and data.
26     """
27     LOG.warning('Creating database')
28     create_db(dsn, rouser)
29
30     LOG.warning('Setting up database')
31     with connect(dsn) as conn:
32         setup_extensions(conn)
33
34     LOG.warning('Loading basic data')
35     import_base_data(dsn, data_dir, no_partitions)
36
37
38 def create_db(dsn, rouser=None):
39     """ Create a new database for the given DSN. Fails when the database
40         already exists or the PostgreSQL version is too old.
41         Uses `createdb` to create the database.
42
43         If 'rouser' is given, then the function also checks that the user
44         with that given name exists.
45
46         Requires superuser rights by the caller.
47     """
48     proc = subprocess.run(['createdb'], env=get_pg_env(dsn), check=False)
49
50     if proc.returncode != 0:
51         raise UsageError('Creating new database failed.')
52
53     with connect(dsn) as conn:
54         postgres_version = conn.server_version_tuple()
55         if postgres_version < POSTGRESQL_REQUIRED_VERSION:
56             LOG.fatal('Minimum supported version of Postgresql is %d.%d. '
57                       'Found version %d.%d.',
58                       POSTGRESQL_REQUIRED_VERSION[0], POSTGRESQL_REQUIRED_VERSION[1],
59                       postgres_version[0], postgres_version[1])
60             raise UsageError('PostgreSQL server is too old.')
61
62         if rouser is not None:
63             with conn.cursor() as cur:
64                 cnt = cur.scalar('SELECT count(*) FROM pg_user where usename = %s',
65                                  (rouser, ))
66                 if cnt == 0:
67                     LOG.fatal("Web user '%s' does not exists. Create it with:\n"
68                               "\n      createuser %s", rouser, rouser)
69                     raise UsageError('Missing read-only user.')
70
71
72
73 def setup_extensions(conn):
74     """ Set up all extensions needed for Nominatim. Also checks that the
75         versions of the extensions are sufficient.
76     """
77     with conn.cursor() as cur:
78         cur.execute('CREATE EXTENSION IF NOT EXISTS hstore')
79         cur.execute('CREATE EXTENSION IF NOT EXISTS postgis')
80     conn.commit()
81
82     postgis_version = conn.postgis_version_tuple()
83     if postgis_version < POSTGIS_REQUIRED_VERSION:
84         LOG.fatal('Minimum supported version of PostGIS is %d.%d. '
85                   'Found version %d.%d.',
86                   POSTGIS_REQUIRED_VERSION[0], POSTGIS_REQUIRED_VERSION[1],
87                   postgis_version[0], postgis_version[1])
88         raise UsageError('PostGIS version is too old.')
89
90
91 def install_module(src_dir, project_dir, module_dir, conn=None):
92     """ Copy the normalization module from src_dir into the project
93         directory under the '/module' directory. If 'module_dir' is set, then
94         use the module from there instead and check that it is accessible
95         for Postgresql.
96
97         The function detects when the installation is run from the
98         build directory. It doesn't touch the module in that case.
99
100         If 'conn' is given, then the function also tests if the module
101         can be access via the given database.
102     """
103     if not module_dir:
104         module_dir = project_dir / 'module'
105
106         if not module_dir.exists() or not src_dir.samefile(module_dir):
107
108             if not module_dir.exists():
109                 module_dir.mkdir()
110
111             destfile = module_dir / 'nominatim.so'
112             shutil.copy(str(src_dir / 'nominatim.so'), str(destfile))
113             destfile.chmod(0o755)
114
115             LOG.info('Database module installed at %s', str(destfile))
116         else:
117             LOG.info('Running from build directory. Leaving database module as is.')
118     else:
119         LOG.info("Using custom path for database module at '%s'", module_dir)
120
121     if conn is not None:
122         with conn.cursor() as cur:
123             try:
124                 cur.execute("""CREATE FUNCTION nominatim_test_import_func(text)
125                                RETURNS text AS '{}/nominatim.so', 'transliteration'
126                                LANGUAGE c IMMUTABLE STRICT;
127                                DROP FUNCTION nominatim_test_import_func(text)
128                             """.format(module_dir))
129             except psycopg2.DatabaseError as err:
130                 LOG.fatal("Error accessing database module: %s", err)
131                 raise UsageError("Database module cannot be accessed.") from err
132
133
134 def import_base_data(dsn, sql_dir, ignore_partitions=False):
135     """ Create and populate the tables with basic static data that provides
136         the background for geocoding. Data is assumed to not yet exist.
137     """
138     db_utils.execute_file(dsn, sql_dir / 'country_name.sql')
139     db_utils.execute_file(dsn, sql_dir / 'country_osm_grid.sql.gz')
140
141     if ignore_partitions:
142         with connect(dsn) as conn:
143             with conn.cursor() as cur:
144                 cur.execute('UPDATE country_name SET partition = 0')
145             conn.commit()
146
147
148 def import_osm_data(osm_file, options, drop=False):
149     """ Import the given OSM file. 'options' contains the list of
150         default settings for osm2pgsql.
151     """
152     options['import_file'] = osm_file
153     options['append'] = False
154     options['threads'] = 1
155
156     if not options['flatnode_file'] and options['osm2pgsql_cache'] == 0:
157         # Make some educated guesses about cache size based on the size
158         # of the import file and the available memory.
159         mem = psutil.virtual_memory()
160         fsize = os.stat(str(osm_file)).st_size
161         options['osm2pgsql_cache'] = int(min((mem.available + mem.cached) * 0.75,
162                                              fsize * 2) / 1024 / 1024) + 1
163
164     run_osm2pgsql(options)
165
166     with connect(options['dsn']) as conn:
167         with conn.cursor() as cur:
168             cur.execute('SELECT * FROM place LIMIT 1')
169             if cur.rowcount == 0:
170                 raise UsageError('No data imported by osm2pgsql.')
171
172         if drop:
173             conn.drop_table('planet_osm_nodes')
174
175     if drop:
176         if options['flatnode_file']:
177             Path(options['flatnode_file']).unlink()
178
179
180 def truncate_data_tables(conn, max_word_frequency=None):
181     """ Truncate all data tables to prepare for a fresh load.
182     """
183     with conn.cursor() as cur:
184         cur.execute('TRUNCATE word')
185         cur.execute('TRUNCATE placex')
186         cur.execute('TRUNCATE place_addressline')
187         cur.execute('TRUNCATE location_area')
188         cur.execute('TRUNCATE location_area_country')
189         cur.execute('TRUNCATE location_property')
190         cur.execute('TRUNCATE location_property_tiger')
191         cur.execute('TRUNCATE location_property_osmline')
192         cur.execute('TRUNCATE location_postcode')
193         cur.execute('TRUNCATE search_name')
194         cur.execute('DROP SEQUENCE IF EXISTS seq_place')
195         cur.execute('CREATE SEQUENCE seq_place start 100000')
196
197         cur.execute("""SELECT tablename FROM pg_tables
198                        WHERE tablename LIKE 'location_road_%'""")
199
200         for table in [r[0] for r in list(cur)]:
201             cur.execute('TRUNCATE ' + table)
202
203         if max_word_frequency is not None:
204             # Used by getorcreate_word_id to ignore frequent partial words.
205             cur.execute("""CREATE OR REPLACE FUNCTION get_maxwordfreq()
206                            RETURNS integer AS $$
207                              SELECT {} as maxwordfreq;
208                            $$ LANGUAGE SQL IMMUTABLE
209                         """.format(max_word_frequency))
210         conn.commit()
211
212 _COPY_COLUMNS = 'osm_type, osm_id, class, type, name, admin_level, address, extratags, geometry'
213
214 def load_data(dsn, data_dir, threads):
215     """ Copy data into the word and placex table.
216     """
217     # Pre-calculate the most important terms in the word list.
218     db_utils.execute_file(dsn, data_dir / 'words.sql')
219
220     sel = selectors.DefaultSelector()
221     # Then copy data from place to placex in <threads - 1> chunks.
222     place_threads = max(1, threads - 1)
223     for imod in range(place_threads):
224         conn = DBConnection(dsn)
225         conn.connect()
226         conn.perform("""INSERT INTO placex ({0})
227                          SELECT {0} FROM place
228                          WHERE osm_id % {1} = {2}
229                            AND NOT (class='place' and type='houses')
230                            AND ST_IsValid(geometry)
231                      """.format(_COPY_COLUMNS, place_threads, imod))
232         sel.register(conn, selectors.EVENT_READ, conn)
233
234     # Address interpolations go into another table.
235     conn = DBConnection(dsn)
236     conn.connect()
237     conn.perform("""INSERT INTO location_property_osmline (osm_id, address, linegeo)
238                       SELECT osm_id, address, geometry FROM place
239                       WHERE class='place' and type='houses' and osm_type='W'
240                             and ST_GeometryType(geometry) = 'ST_LineString'
241                  """)
242     sel.register(conn, selectors.EVENT_READ, conn)
243
244     # Now wait for all of them to finish.
245     todo = place_threads + 1
246     while todo > 0:
247         for key, _ in sel.select(1):
248             conn = key.data
249             sel.unregister(conn)
250             conn.wait()
251             conn.close()
252             todo -= 1
253         print('.', end='', flush=True)
254     print('\n')
255
256     with connect(dsn) as conn:
257         with conn.cursor() as cur:
258             cur.execute('ANALYSE')