]> git.openstreetmap.org Git - nominatim.git/blob - nominatim/tools/database_import.py
Added version check for PostGis and Postgres
[nominatim.git] / nominatim / tools / database_import.py
1 # SPDX-License-Identifier: GPL-2.0-only
2 #
3 # This file is part of Nominatim. (https://nominatim.org)
4 #
5 # Copyright (C) 2022 by the Nominatim developer community.
6 # For a full list of authors see the git log.
7 """
8 Functions for setting up and importing a new Nominatim database.
9 """
10 from typing import Tuple, Optional, Union, Sequence, MutableMapping, Any
11 import logging
12 import os
13 import selectors
14 import subprocess
15 from pathlib import Path
16
17 import psutil
18 from psycopg2 import sql as pysql
19
20 from nominatim.config import Configuration
21 from nominatim.db.connection import connect, get_pg_env, Connection
22 from nominatim.db.async_connection import DBConnection
23 from nominatim.db.sql_preprocessor import SQLPreprocessor
24 from nominatim.tools.exec_utils import run_osm2pgsql
25 from nominatim.errors import UsageError
26 from nominatim.version import POSTGRESQL_REQUIRED_VERSION, POSTGIS_REQUIRED_VERSION
27
28 LOG = logging.getLogger()
29
30 def _require_version(module: str, actual: Tuple[int, int], expected: Tuple[int, int]) -> None:
31     """ Compares the version for the given module and raises an exception
32         if the actual version is too old.
33     """
34     if actual < expected:
35         LOG.fatal('Minimum supported version of %s is %d.%d. '
36                   'Found version %d.%d.',
37                   module, expected[0], expected[1], actual[0], actual[1])
38         raise UsageError(f'{module} is too old.')
39
40
41 def check_existing_database_plugins(dsn: str):
42     with connect(dsn) as conn:
43         _require_version('PostgreSQL server',
44                          conn.server_version_tuple(),
45                          POSTGRESQL_REQUIRED_VERSION)
46         _require_version('PostGIS',
47                          conn.postgis_version_tuple(),
48                          POSTGIS_REQUIRED_VERSION)
49
50
51 def setup_database_skeleton(dsn: str, rouser: Optional[str] = None) -> None:
52     """ Create a new database for Nominatim and populate it with the
53         essential extensions.
54
55         The function fails when the database already exists or Postgresql or
56         PostGIS versions are too old.
57
58         Uses `createdb` to create the database.
59
60         If 'rouser' is given, then the function also checks that the user
61         with that given name exists.
62
63         Requires superuser rights by the caller.
64     """
65     proc = subprocess.run(['createdb'], env=get_pg_env(dsn), check=False)
66
67     if proc.returncode != 0:
68         raise UsageError('Creating new database failed.')
69
70     with connect(dsn) as conn:
71         _require_version('PostgreSQL server',
72                          conn.server_version_tuple(),
73                          POSTGRESQL_REQUIRED_VERSION)
74
75         if rouser is not None:
76             with conn.cursor() as cur:
77                 cnt = cur.scalar('SELECT count(*) FROM pg_user where usename = %s',
78                                  (rouser, ))
79                 if cnt == 0:
80                     LOG.fatal("Web user '%s' does not exist. Create it with:\n"
81                               "\n      createuser %s", rouser, rouser)
82                     raise UsageError('Missing read-only user.')
83
84         # Create extensions.
85         with conn.cursor() as cur:
86             cur.execute('CREATE EXTENSION IF NOT EXISTS hstore')
87             cur.execute('CREATE EXTENSION IF NOT EXISTS postgis')
88
89             postgis_version = conn.postgis_version_tuple()
90             if postgis_version[0] >= 3:
91                 cur.execute('CREATE EXTENSION IF NOT EXISTS postgis_raster')
92
93         conn.commit()
94
95         _require_version('PostGIS',
96                          conn.postgis_version_tuple(),
97                          POSTGIS_REQUIRED_VERSION)
98
99
100 def import_osm_data(osm_files: Union[Path, Sequence[Path]],
101                     options: MutableMapping[str, Any],
102                     drop: bool = False, ignore_errors: bool = False) -> None:
103     """ Import the given OSM files. 'options' contains the list of
104         default settings for osm2pgsql.
105     """
106     options['import_file'] = osm_files
107     options['append'] = False
108     options['threads'] = 1
109
110     if not options['flatnode_file'] and options['osm2pgsql_cache'] == 0:
111         # Make some educated guesses about cache size based on the size
112         # of the import file and the available memory.
113         mem = psutil.virtual_memory()
114         fsize = 0
115         if isinstance(osm_files, list):
116             for fname in osm_files:
117                 fsize += os.stat(str(fname)).st_size
118         else:
119             fsize = os.stat(str(osm_files)).st_size
120         options['osm2pgsql_cache'] = int(min((mem.available + mem.cached) * 0.75,
121                                              fsize * 2) / 1024 / 1024) + 1
122
123     run_osm2pgsql(options)
124
125     with connect(options['dsn']) as conn:
126         if not ignore_errors:
127             with conn.cursor() as cur:
128                 cur.execute('SELECT * FROM place LIMIT 1')
129                 if cur.rowcount == 0:
130                     raise UsageError('No data imported by osm2pgsql.')
131
132         if drop:
133             conn.drop_table('planet_osm_nodes')
134
135     if drop and options['flatnode_file']:
136         Path(options['flatnode_file']).unlink()
137
138
139 def create_tables(conn: Connection, config: Configuration, reverse_only: bool = False) -> None:
140     """ Create the set of basic tables.
141         When `reverse_only` is True, then the main table for searching will
142         be skipped and only reverse search is possible.
143     """
144     sql = SQLPreprocessor(conn, config)
145     sql.env.globals['db']['reverse_only'] = reverse_only
146
147     sql.run_sql_file(conn, 'tables.sql')
148
149
150 def create_table_triggers(conn: Connection, config: Configuration) -> None:
151     """ Create the triggers for the tables. The trigger functions must already
152         have been imported with refresh.create_functions().
153     """
154     sql = SQLPreprocessor(conn, config)
155     sql.run_sql_file(conn, 'table-triggers.sql')
156
157
158 def create_partition_tables(conn: Connection, config: Configuration) -> None:
159     """ Create tables that have explicit partitioning.
160     """
161     sql = SQLPreprocessor(conn, config)
162     sql.run_sql_file(conn, 'partition-tables.src.sql')
163
164
165 def truncate_data_tables(conn: Connection) -> None:
166     """ Truncate all data tables to prepare for a fresh load.
167     """
168     with conn.cursor() as cur:
169         cur.execute('TRUNCATE placex')
170         cur.execute('TRUNCATE place_addressline')
171         cur.execute('TRUNCATE location_area')
172         cur.execute('TRUNCATE location_area_country')
173         cur.execute('TRUNCATE location_property_tiger')
174         cur.execute('TRUNCATE location_property_osmline')
175         cur.execute('TRUNCATE location_postcode')
176         if conn.table_exists('search_name'):
177             cur.execute('TRUNCATE search_name')
178         cur.execute('DROP SEQUENCE IF EXISTS seq_place')
179         cur.execute('CREATE SEQUENCE seq_place start 100000')
180
181         cur.execute("""SELECT tablename FROM pg_tables
182                        WHERE tablename LIKE 'location_road_%'""")
183
184         for table in [r[0] for r in list(cur)]:
185             cur.execute('TRUNCATE ' + table)
186
187     conn.commit()
188
189
190 _COPY_COLUMNS = pysql.SQL(',').join(map(pysql.Identifier,
191                                         ('osm_type', 'osm_id', 'class', 'type',
192                                          'name', 'admin_level', 'address',
193                                          'extratags', 'geometry')))
194
195
196 def load_data(dsn: str, threads: int) -> None:
197     """ Copy data into the word and placex table.
198     """
199     sel = selectors.DefaultSelector()
200     # Then copy data from place to placex in <threads - 1> chunks.
201     place_threads = max(1, threads - 1)
202     for imod in range(place_threads):
203         conn = DBConnection(dsn)
204         conn.connect()
205         conn.perform(
206             pysql.SQL("""INSERT INTO placex ({columns})
207                            SELECT {columns} FROM place
208                            WHERE osm_id % {total} = {mod}
209                              AND NOT (class='place' and (type='houses' or type='postcode'))
210                              AND ST_IsValid(geometry)
211                       """).format(columns=_COPY_COLUMNS,
212                                   total=pysql.Literal(place_threads),
213                                   mod=pysql.Literal(imod)))
214         sel.register(conn, selectors.EVENT_READ, conn)
215
216     # Address interpolations go into another table.
217     conn = DBConnection(dsn)
218     conn.connect()
219     conn.perform("""INSERT INTO location_property_osmline (osm_id, address, linegeo)
220                       SELECT osm_id, address, geometry FROM place
221                       WHERE class='place' and type='houses' and osm_type='W'
222                             and ST_GeometryType(geometry) = 'ST_LineString'
223                  """)
224     sel.register(conn, selectors.EVENT_READ, conn)
225
226     # Now wait for all of them to finish.
227     todo = place_threads + 1
228     while todo > 0:
229         for key, _ in sel.select(1):
230             conn = key.data
231             sel.unregister(conn)
232             conn.wait()
233             conn.close()
234             todo -= 1
235         print('.', end='', flush=True)
236     print('\n')
237
238     with connect(dsn) as syn_conn:
239         with syn_conn.cursor() as cur:
240             cur.execute('ANALYSE')
241
242
243 def create_search_indices(conn: Connection, config: Configuration,
244                           drop: bool = False, threads: int = 1) -> None:
245     """ Create tables that have explicit partitioning.
246     """
247
248     # If index creation failed and left an index invalid, they need to be
249     # cleaned out first, so that the script recreates them.
250     with conn.cursor() as cur:
251         cur.execute("""SELECT relname FROM pg_class, pg_index
252                        WHERE pg_index.indisvalid = false
253                              AND pg_index.indexrelid = pg_class.oid""")
254         bad_indices = [row[0] for row in list(cur)]
255         for idx in bad_indices:
256             LOG.info("Drop invalid index %s.", idx)
257             cur.execute(pysql.SQL('DROP INDEX {}').format(pysql.Identifier(idx)))
258     conn.commit()
259
260     sql = SQLPreprocessor(conn, config)
261
262     sql.run_parallel_sql_file(config.get_libpq_dsn(),
263                               'indices.sql', min(8, threads), drop=drop)