1 # SPDX-License-Identifier: GPL-3.0-or-later
 
   3 # This file is part of Nominatim. (https://nominatim.org)
 
   5 # Copyright (C) 2024 by the Nominatim developer community.
 
   6 # For a full list of authors see the git log.
 
   8 Exporting a Nominatim database to SQlite.
 
  10 from typing import Set, Any, Optional, Union
 
  13 from pathlib import Path
 
  15 import sqlalchemy as sa
 
  17 import nominatim_api as napi
 
  18 from nominatim_api.search.query_analyzer_factory import make_query_analyzer
 
  19 from nominatim_api.typing import SaSelect, SaRow
 
  20 from nominatim_api.sql.sqlalchemy_types import Geometry, IntArray
 
  22 LOG = logging.getLogger()
 
  25 async def convert(project_dir: Optional[Union[str, Path]],
 
  26                   outfile: Path, options: Set[str]) -> None:
 
  27     """ Export an existing database to sqlite. The resulting database
 
  28         will be usable against the Python frontend of Nominatim.
 
  30     api = napi.NominatimAPIAsync(project_dir)
 
  33         outapi = napi.NominatimAPIAsync(project_dir,
 
  34                                         {'NOMINATIM_DATABASE_DSN': f"sqlite:dbname={outfile}",
 
  35                                          'NOMINATIM_DATABASE_RW': '1'})
 
  38             async with api.begin() as src, outapi.begin() as dest:
 
  39                 writer = SqliteWriter(src, dest, options)
 
  48     """ Worker class which creates a new SQLite database.
 
  51     def __init__(self, src: napi.SearchConnection,
 
  52                  dest: napi.SearchConnection, options: Set[str]) -> None:
 
  55         self.options = options
 
  57     async def write(self) -> None:
 
  58         """ Create the database structure and copy the data from
 
  59             the source database to the destination.
 
  61         LOG.warning('Setting up spatialite')
 
  62         await self.dest.execute(sa.select(sa.func.InitSpatialMetaData(True, 'WGS84')))
 
  64         await self.create_tables()
 
  65         await self.copy_data()
 
  66         if 'search' in self.options:
 
  67             await self.create_word_table()
 
  68         await self.create_indexes()
 
  70     async def create_tables(self) -> None:
 
  71         """ Set up the database tables.
 
  73         LOG.warning('Setting up tables')
 
  74         if 'search' not in self.options:
 
  75             self.dest.t.meta.remove(self.dest.t.search_name)
 
  77             await self.create_class_tables()
 
  79         await self.dest.connection.run_sync(self.dest.t.meta.create_all)
 
  81         # Convert all Geometry columns to Spatialite geometries
 
  82         for table in self.dest.t.meta.sorted_tables:
 
  84                 if isinstance(col.type, Geometry):
 
  85                     await self.dest.execute(sa.select(
 
  86                         sa.func.RecoverGeometryColumn(table.name, col.name, 4326,
 
  87                                                       col.type.subtype.upper(), 'XY')))
 
  89     async def create_class_tables(self) -> None:
 
  90         """ Set up the table that serve class/type-specific geometries.
 
  92         sql = sa.text("""SELECT tablename FROM pg_tables
 
  93                          WHERE tablename LIKE 'place_classtype_%'""")
 
  94         for res in await self.src.execute(sql):
 
  95             for db in (self.src, self.dest):
 
  96                 sa.Table(res[0], db.t.meta,
 
  97                          sa.Column('place_id', sa.BigInteger),
 
  98                          sa.Column('centroid', Geometry))
 
 100     async def create_word_table(self) -> None:
 
 101         """ Create the word table.
 
 102             This table needs the property information to determine the
 
 103             correct format. Therefore needs to be done after all other
 
 104             data has been copied.
 
 106         await make_query_analyzer(self.src)
 
 107         await make_query_analyzer(self.dest)
 
 108         src = self.src.t.meta.tables['word']
 
 109         dest = self.dest.t.meta.tables['word']
 
 111         await self.dest.connection.run_sync(dest.create)
 
 113         LOG.warning("Copying word table")
 
 114         async_result = await self.src.connection.stream(sa.select(src))
 
 116         async for partition in async_result.partitions(10000):
 
 117             data = [{k: getattr(r, k) for k in r._fields} for r in partition]
 
 118             await self.dest.execute(dest.insert(), data)
 
 120         await self.dest.connection.run_sync(sa.Index('idx_word_woken', dest.c.word_token).create)
 
 122     async def copy_data(self) -> None:
 
 123         """ Copy data for all registered tables.
 
 125         def _getfield(row: SaRow, key: str) -> Any:
 
 126             value = getattr(row, key)
 
 127             if isinstance(value, dt.datetime):
 
 128                 if value.tzinfo is not None:
 
 129                     value = value.astimezone(dt.timezone.utc)
 
 132         for table in self.dest.t.meta.sorted_tables:
 
 133             LOG.warning("Copying '%s'", table.name)
 
 134             async_result = await self.src.connection.stream(self.select_from(table.name))
 
 136             async for partition in async_result.partitions(10000):
 
 137                 data = [{('class_' if k == 'class' else k): _getfield(r, k)
 
 140                 await self.dest.execute(table.insert(), data)
 
 142         # Set up a minimal copy of pg_tables used to look up the class tables later.
 
 143         pg_tables = sa.Table('pg_tables', self.dest.t.meta,
 
 144                              sa.Column('schemaname', sa.Text, default='public'),
 
 145                              sa.Column('tablename', sa.Text))
 
 146         await self.dest.connection.run_sync(pg_tables.create)
 
 147         data = [{'tablename': t} for t in self.dest.t.meta.tables]
 
 148         await self.dest.execute(pg_tables.insert().values(data))
 
 150     async def create_indexes(self) -> None:
 
 151         """ Add indexes necessary for the frontend.
 
 153         # reverse place node lookup needs an extra table to simulate a
 
 154         # partial index with adaptive buffering.
 
 155         await self.dest.execute(sa.text(
 
 156             """ CREATE TABLE placex_place_node_areas AS
 
 157                   SELECT place_id, ST_Expand(geometry,
 
 158                                              14.0 * exp(-0.2 * rank_search) - 0.03) as geometry
 
 160                   WHERE rank_address between 5 and 25
 
 162                         and linked_place_id is NULL """))
 
 163         await self.dest.execute(sa.select(
 
 164             sa.func.RecoverGeometryColumn('placex_place_node_areas', 'geometry',
 
 165                                           4326, 'GEOMETRY', 'XY')))
 
 166         await self.dest.execute(sa.select(sa.func.CreateSpatialIndex(
 
 167                                              'placex_place_node_areas', 'geometry')))
 
 170         await self.create_spatial_index('country_grid', 'geometry')
 
 171         await self.create_spatial_index('placex', 'geometry')
 
 172         await self.create_spatial_index('osmline', 'linegeo')
 
 173         await self.create_spatial_index('tiger', 'linegeo')
 
 174         await self.create_index('placex', 'place_id')
 
 175         await self.create_index('placex', 'parent_place_id')
 
 176         await self.create_index('placex', 'rank_address')
 
 177         await self.create_index('addressline', 'place_id')
 
 178         await self.create_index('postcode', 'place_id')
 
 179         await self.create_index('osmline', 'place_id')
 
 180         await self.create_index('tiger', 'place_id')
 
 182         if 'search' in self.options:
 
 183             await self.create_spatial_index('postcode', 'geometry')
 
 184             await self.create_spatial_index('search_name', 'centroid')
 
 185             await self.create_index('search_name', 'place_id')
 
 186             await self.create_index('osmline', 'parent_place_id')
 
 187             await self.create_index('tiger', 'parent_place_id')
 
 188             await self.create_search_index()
 
 190             for t in self.dest.t.meta.tables:
 
 191                 if t.startswith('place_classtype_'):
 
 192                     await self.dest.execute(sa.select(
 
 193                       sa.func.CreateSpatialIndex(t, 'centroid')))
 
 195     async def create_spatial_index(self, table: str, column: str) -> None:
 
 196         """ Create a spatial index on the given table and column.
 
 198         await self.dest.execute(sa.select(
 
 199                   sa.func.CreateSpatialIndex(getattr(self.dest.t, table).name, column)))
 
 201     async def create_index(self, table_name: str, column: str) -> None:
 
 202         """ Create a simple index on the given table and column.
 
 204         table = getattr(self.dest.t, table_name)
 
 205         await self.dest.connection.run_sync(
 
 206             sa.Index(f"idx_{table}_{column}", getattr(table.c, column)).create)
 
 208     async def create_search_index(self) -> None:
 
 209         """ Create the tables and indexes needed for word lookup.
 
 211         LOG.warning("Creating reverse search table")
 
 212         rsn = sa.Table('reverse_search_name', self.dest.t.meta,
 
 213                        sa.Column('word', sa.Integer()),
 
 214                        sa.Column('column', sa.Text()),
 
 215                        sa.Column('places', IntArray))
 
 216         await self.dest.connection.run_sync(rsn.create)
 
 218         tsrc = self.src.t.search_name
 
 219         for column in ('name_vector', 'nameaddress_vector'):
 
 220             sql = sa.select(sa.func.unnest(getattr(tsrc.c, column)).label('word'),
 
 221                             sa.func.ArrayAgg(tsrc.c.place_id).label('places'))\
 
 224             async_result = await self.src.connection.stream(sql)
 
 225             async for partition in async_result.partitions(100):
 
 227                 for row in partition:
 
 229                     data.append({'word': row.word,
 
 231                                  'places': row.places})
 
 232                 await self.dest.execute(rsn.insert(), data)
 
 234         await self.dest.connection.run_sync(
 
 235             sa.Index('idx_reverse_search_name_word', rsn.c.word).create)
 
 237     def select_from(self, table: str) -> SaSelect:
 
 238         """ Create the SQL statement to select the source columns and rows.
 
 240         columns = self.src.t.meta.tables[table].c
 
 242         if table == 'placex':
 
 243             # SQLite struggles with Geometries that are larger than 5MB,
 
 245             return sa.select(*(c for c in columns if not isinstance(c.type, Geometry)),
 
 246                              sa.func.ST_AsText(columns.centroid).label('centroid'),
 
 248                                sa.case((sa.func.ST_MemSize(columns.geometry) < 5000000,
 
 250                                        else_=sa.func.ST_SimplifyPreserveTopology(
 
 251                                                 columns.geometry, 0.0001)
 
 252                                        )).label('geometry'))
 
 254         sql = sa.select(*(sa.func.ST_AsText(c).label(c.name)
 
 255                         if isinstance(c.type, Geometry) else c for c in columns))