]> git.openstreetmap.org Git - nominatim.git/blob - nominatim/tools/database_import.py
Initial implementation of GeoTIFF import functionality
[nominatim.git] / nominatim / tools / database_import.py
1 # SPDX-License-Identifier: GPL-2.0-only
2 #
3 # This file is part of Nominatim. (https://nominatim.org)
4 #
5 # Copyright (C) 2022 by the Nominatim developer community.
6 # For a full list of authors see the git log.
7 """
8 Functions for setting up and importing a new Nominatim database.
9 """
10 from typing import Tuple, Optional, Union, Sequence, MutableMapping, Any
11 import logging
12 import os
13 import selectors
14 import subprocess
15 from pathlib import Path
16
17 import psutil
18 from psycopg2 import sql as pysql
19
20 from nominatim.config import Configuration
21 from nominatim.db.connection import connect, get_pg_env, Connection
22 from nominatim.db.async_connection import DBConnection
23 from nominatim.db.sql_preprocessor import SQLPreprocessor
24 from nominatim.tools.exec_utils import run_osm2pgsql
25 from nominatim.errors import UsageError
26 from nominatim.version import POSTGRESQL_REQUIRED_VERSION, POSTGIS_REQUIRED_VERSION
27
28 LOG = logging.getLogger()
29
30 def _require_version(module: str, actual: Tuple[int, int], expected: Tuple[int, int]) -> None:
31     """ Compares the version for the given module and raises an exception
32         if the actual version is too old.
33     """
34     if actual < expected:
35         LOG.fatal('Minimum supported version of %s is %d.%d. '
36                   'Found version %d.%d.',
37                   module, expected[0], expected[1], actual[0], actual[1])
38         raise UsageError(f'{module} is too old.')
39
40
41 def setup_database_skeleton(dsn: str, rouser: Optional[str] = None) -> None:
42     """ Create a new database for Nominatim and populate it with the
43         essential extensions.
44
45         The function fails when the database already exists or Postgresql or
46         PostGIS versions are too old.
47
48         Uses `createdb` to create the database.
49
50         If 'rouser' is given, then the function also checks that the user
51         with that given name exists.
52
53         Requires superuser rights by the caller.
54     """
55     proc = subprocess.run(['createdb'], env=get_pg_env(dsn), check=False)
56
57     if proc.returncode != 0:
58         raise UsageError('Creating new database failed.')
59
60     with connect(dsn) as conn:
61         _require_version('PostgreSQL server',
62                          conn.server_version_tuple(),
63                          POSTGRESQL_REQUIRED_VERSION)
64
65         if rouser is not None:
66             with conn.cursor() as cur:
67                 cnt = cur.scalar('SELECT count(*) FROM pg_user where usename = %s',
68                                  (rouser, ))
69                 if cnt == 0:
70                     LOG.fatal("Web user '%s' does not exist. Create it with:\n"
71                               "\n      createuser %s", rouser, rouser)
72                     raise UsageError('Missing read-only user.')
73
74         # Create extensions.
75         with conn.cursor() as cur:
76             cur.execute('CREATE EXTENSION IF NOT EXISTS hstore')
77             cur.execute('CREATE EXTENSION IF NOT EXISTS postgis')
78             cur.execute('CREATE EXTENSION IF NOT EXISTS postgis_raster')
79         conn.commit()
80
81         _require_version('PostGIS',
82                          conn.postgis_version_tuple(),
83                          POSTGIS_REQUIRED_VERSION)
84
85
86 def import_osm_data(osm_files: Union[Path, Sequence[Path]],
87                     options: MutableMapping[str, Any],
88                     drop: bool = False, ignore_errors: bool = False) -> None:
89     """ Import the given OSM files. 'options' contains the list of
90         default settings for osm2pgsql.
91     """
92     options['import_file'] = osm_files
93     options['append'] = False
94     options['threads'] = 1
95
96     if not options['flatnode_file'] and options['osm2pgsql_cache'] == 0:
97         # Make some educated guesses about cache size based on the size
98         # of the import file and the available memory.
99         mem = psutil.virtual_memory()
100         fsize = 0
101         if isinstance(osm_files, list):
102             for fname in osm_files:
103                 fsize += os.stat(str(fname)).st_size
104         else:
105             fsize = os.stat(str(osm_files)).st_size
106         options['osm2pgsql_cache'] = int(min((mem.available + mem.cached) * 0.75,
107                                              fsize * 2) / 1024 / 1024) + 1
108
109     run_osm2pgsql(options)
110
111     with connect(options['dsn']) as conn:
112         if not ignore_errors:
113             with conn.cursor() as cur:
114                 cur.execute('SELECT * FROM place LIMIT 1')
115                 if cur.rowcount == 0:
116                     raise UsageError('No data imported by osm2pgsql.')
117
118         if drop:
119             conn.drop_table('planet_osm_nodes')
120
121     if drop and options['flatnode_file']:
122         Path(options['flatnode_file']).unlink()
123
124
125 def create_tables(conn: Connection, config: Configuration, reverse_only: bool = False) -> None:
126     """ Create the set of basic tables.
127         When `reverse_only` is True, then the main table for searching will
128         be skipped and only reverse search is possible.
129     """
130     sql = SQLPreprocessor(conn, config)
131     sql.env.globals['db']['reverse_only'] = reverse_only
132
133     sql.run_sql_file(conn, 'tables.sql')
134
135
136 def create_table_triggers(conn: Connection, config: Configuration) -> None:
137     """ Create the triggers for the tables. The trigger functions must already
138         have been imported with refresh.create_functions().
139     """
140     sql = SQLPreprocessor(conn, config)
141     sql.run_sql_file(conn, 'table-triggers.sql')
142
143
144 def create_partition_tables(conn: Connection, config: Configuration) -> None:
145     """ Create tables that have explicit partitioning.
146     """
147     sql = SQLPreprocessor(conn, config)
148     sql.run_sql_file(conn, 'partition-tables.src.sql')
149
150
151 def truncate_data_tables(conn: Connection) -> None:
152     """ Truncate all data tables to prepare for a fresh load.
153     """
154     with conn.cursor() as cur:
155         cur.execute('TRUNCATE placex')
156         cur.execute('TRUNCATE place_addressline')
157         cur.execute('TRUNCATE location_area')
158         cur.execute('TRUNCATE location_area_country')
159         cur.execute('TRUNCATE location_property_tiger')
160         cur.execute('TRUNCATE location_property_osmline')
161         cur.execute('TRUNCATE location_postcode')
162         if conn.table_exists('search_name'):
163             cur.execute('TRUNCATE search_name')
164         cur.execute('DROP SEQUENCE IF EXISTS seq_place')
165         cur.execute('CREATE SEQUENCE seq_place start 100000')
166
167         cur.execute("""SELECT tablename FROM pg_tables
168                        WHERE tablename LIKE 'location_road_%'""")
169
170         for table in [r[0] for r in list(cur)]:
171             cur.execute('TRUNCATE ' + table)
172
173     conn.commit()
174
175
176 _COPY_COLUMNS = pysql.SQL(',').join(map(pysql.Identifier,
177                                         ('osm_type', 'osm_id', 'class', 'type',
178                                          'name', 'admin_level', 'address',
179                                          'extratags', 'geometry')))
180
181
182 def load_data(dsn: str, threads: int) -> None:
183     """ Copy data into the word and placex table.
184     """
185     sel = selectors.DefaultSelector()
186     # Then copy data from place to placex in <threads - 1> chunks.
187     place_threads = max(1, threads - 1)
188     for imod in range(place_threads):
189         conn = DBConnection(dsn)
190         conn.connect()
191         conn.perform(
192             pysql.SQL("""INSERT INTO placex ({columns})
193                            SELECT {columns} FROM place
194                            WHERE osm_id % {total} = {mod}
195                              AND NOT (class='place' and (type='houses' or type='postcode'))
196                              AND ST_IsValid(geometry)
197                       """).format(columns=_COPY_COLUMNS,
198                                   total=pysql.Literal(place_threads),
199                                   mod=pysql.Literal(imod)))
200         sel.register(conn, selectors.EVENT_READ, conn)
201
202     # Address interpolations go into another table.
203     conn = DBConnection(dsn)
204     conn.connect()
205     conn.perform("""INSERT INTO location_property_osmline (osm_id, address, linegeo)
206                       SELECT osm_id, address, geometry FROM place
207                       WHERE class='place' and type='houses' and osm_type='W'
208                             and ST_GeometryType(geometry) = 'ST_LineString'
209                  """)
210     sel.register(conn, selectors.EVENT_READ, conn)
211
212     # Now wait for all of them to finish.
213     todo = place_threads + 1
214     while todo > 0:
215         for key, _ in sel.select(1):
216             conn = key.data
217             sel.unregister(conn)
218             conn.wait()
219             conn.close()
220             todo -= 1
221         print('.', end='', flush=True)
222     print('\n')
223
224     with connect(dsn) as syn_conn:
225         with syn_conn.cursor() as cur:
226             cur.execute('ANALYSE')
227
228
229 def create_search_indices(conn: Connection, config: Configuration,
230                           drop: bool = False, threads: int = 1) -> None:
231     """ Create tables that have explicit partitioning.
232     """
233
234     # If index creation failed and left an index invalid, they need to be
235     # cleaned out first, so that the script recreates them.
236     with conn.cursor() as cur:
237         cur.execute("""SELECT relname FROM pg_class, pg_index
238                        WHERE pg_index.indisvalid = false
239                              AND pg_index.indexrelid = pg_class.oid""")
240         bad_indices = [row[0] for row in list(cur)]
241         for idx in bad_indices:
242             LOG.info("Drop invalid index %s.", idx)
243             cur.execute(pysql.SQL('DROP INDEX {}').format(pysql.Identifier(idx)))
244     conn.commit()
245
246     sql = SQLPreprocessor(conn, config)
247
248     sql.run_parallel_sql_file(config.get_libpq_dsn(),
249                               'indices.sql', min(8, threads), drop=drop)
250
251
252 def import_osm_views_geotiff():
253     """Import OSM views GeoTIFF file"""
254     subprocess.run("raster2pgsql -s 4326 -I -C -t 100x100 -e osmviews.tiff public.osmviews | psql nominatim", shell=True, check=True)