1 # SPDX-License-Identifier: GPL-3.0-or-later
 
   3 # This file is part of Nominatim. (https://nominatim.org)
 
   5 # Copyright (C) 2023 by the Nominatim developer community.
 
   6 # For a full list of authors see the git log.
 
   8 Exporting a Nominatim database to SQlite.
 
  10 from typing import Set, Any
 
  13 from pathlib import Path
 
  15 import sqlalchemy as sa
 
  17 from nominatim.typing import SaSelect, SaRow
 
  18 from nominatim.db.sqlalchemy_types import Geometry, IntArray
 
  19 from nominatim.api.search.query_analyzer_factory import make_query_analyzer
 
  20 import nominatim.api as napi
 
  22 LOG = logging.getLogger()
 
  24 async def convert(project_dir: Path, outfile: Path, options: Set[str]) -> None:
 
  25     """ Export an existing database to sqlite. The resulting database
 
  26         will be usable against the Python frontend of Nominatim.
 
  28     api = napi.NominatimAPIAsync(project_dir)
 
  31         outapi = napi.NominatimAPIAsync(project_dir,
 
  32                                         {'NOMINATIM_DATABASE_DSN': f"sqlite:dbname={outfile}",
 
  33                                          'NOMINATIM_DATABASE_RW': '1'})
 
  36             async with api.begin() as src, outapi.begin() as dest:
 
  37                 writer = SqliteWriter(src, dest, options)
 
  46     """ Worker class which creates a new SQLite database.
 
  49     def __init__(self, src: napi.SearchConnection,
 
  50                  dest: napi.SearchConnection, options: Set[str]) -> None:
 
  53         self.options = options
 
  56     async def write(self) -> None:
 
  57         """ Create the database structure and copy the data from
 
  58             the source database to the destination.
 
  60         LOG.warning('Setting up spatialite')
 
  61         await self.dest.execute(sa.select(sa.func.InitSpatialMetaData(True, 'WGS84')))
 
  63         await self.create_tables()
 
  64         await self.copy_data()
 
  65         if 'search' in self.options:
 
  66             await self.create_word_table()
 
  67         await self.create_indexes()
 
  70     async def create_tables(self) -> None:
 
  71         """ Set up the database tables.
 
  73         LOG.warning('Setting up tables')
 
  74         if 'search' not in self.options:
 
  75             self.dest.t.meta.remove(self.dest.t.search_name)
 
  77             await self.create_class_tables()
 
  79         await self.dest.connection.run_sync(self.dest.t.meta.create_all)
 
  81         # Convert all Geometry columns to Spatialite geometries
 
  82         for table in self.dest.t.meta.sorted_tables:
 
  84                 if isinstance(col.type, Geometry):
 
  85                     await self.dest.execute(sa.select(
 
  86                         sa.func.RecoverGeometryColumn(table.name, col.name, 4326,
 
  87                                                       col.type.subtype.upper(), 'XY')))
 
  90     async def create_class_tables(self) -> None:
 
  91         """ Set up the table that serve class/type-specific geometries.
 
  93         sql = sa.text("""SELECT tablename FROM pg_tables
 
  94                          WHERE tablename LIKE 'place_classtype_%'""")
 
  95         for res in await self.src.execute(sql):
 
  96             for db in (self.src, self.dest):
 
  97                 sa.Table(res[0], db.t.meta,
 
  98                          sa.Column('place_id', sa.BigInteger),
 
  99                          sa.Column('centroid', Geometry))
 
 102     async def create_word_table(self) -> None:
 
 103         """ Create the word table.
 
 104             This table needs the property information to determine the
 
 105             correct format. Therefore needs to be done after all other
 
 106             data has been copied.
 
 108         await make_query_analyzer(self.src)
 
 109         await make_query_analyzer(self.dest)
 
 110         src = self.src.t.meta.tables['word']
 
 111         dest = self.dest.t.meta.tables['word']
 
 113         await self.dest.connection.run_sync(dest.create)
 
 115         LOG.warning("Copying word table")
 
 116         async_result = await self.src.connection.stream(sa.select(src))
 
 118         async for partition in async_result.partitions(10000):
 
 119             data = [{k: getattr(r, k) for k in r._fields} for r in partition]
 
 120             await self.dest.execute(dest.insert(), data)
 
 122         await self.dest.connection.run_sync(sa.Index('idx_word_woken', dest.c.word_token).create)
 
 125     async def copy_data(self) -> None:
 
 126         """ Copy data for all registered tables.
 
 128         def _getfield(row: SaRow, key: str) -> Any:
 
 129             value = getattr(row, key)
 
 130             if isinstance(value, dt.datetime):
 
 131                 if value.tzinfo is not None:
 
 132                     value = value.astimezone(dt.timezone.utc)
 
 135         for table in self.dest.t.meta.sorted_tables:
 
 136             LOG.warning("Copying '%s'", table.name)
 
 137             async_result = await self.src.connection.stream(self.select_from(table.name))
 
 139             async for partition in async_result.partitions(10000):
 
 140                 data = [{('class_' if k == 'class' else k): _getfield(r, k)
 
 143                 await self.dest.execute(table.insert(), data)
 
 145         # Set up a minimal copy of pg_tables used to look up the class tables later.
 
 146         pg_tables = sa.Table('pg_tables', self.dest.t.meta,
 
 147                              sa.Column('schemaname', sa.Text, default='public'),
 
 148                              sa.Column('tablename', sa.Text))
 
 149         await self.dest.connection.run_sync(pg_tables.create)
 
 150         data = [{'tablename': t} for t in self.dest.t.meta.tables]
 
 151         await self.dest.execute(pg_tables.insert().values(data))
 
 154     async def create_indexes(self) -> None:
 
 155         """ Add indexes necessary for the frontend.
 
 157         # reverse place node lookup needs an extra table to simulate a
 
 158         # partial index with adaptive buffering.
 
 159         await self.dest.execute(sa.text(
 
 160             """ CREATE TABLE placex_place_node_areas AS
 
 161                   SELECT place_id, ST_Expand(geometry,
 
 162                                              14.0 * exp(-0.2 * rank_search) - 0.03) as geometry
 
 164                   WHERE rank_address between 5 and 25
 
 166                         and linked_place_id is NULL """))
 
 167         await self.dest.execute(sa.select(
 
 168             sa.func.RecoverGeometryColumn('placex_place_node_areas', 'geometry',
 
 169                                           4326, 'GEOMETRY', 'XY')))
 
 170         await self.dest.execute(sa.select(sa.func.CreateSpatialIndex(
 
 171                                              'placex_place_node_areas', 'geometry')))
 
 174         await self.create_spatial_index('country_grid', 'geometry')
 
 175         await self.create_spatial_index('placex', 'geometry')
 
 176         await self.create_spatial_index('osmline', 'linegeo')
 
 177         await self.create_spatial_index('tiger', 'linegeo')
 
 178         await self.create_index('placex', 'place_id')
 
 179         await self.create_index('placex', 'parent_place_id')
 
 180         await self.create_index('placex', 'rank_address')
 
 181         await self.create_index('addressline', 'place_id')
 
 182         await self.create_index('postcode', 'place_id')
 
 183         await self.create_index('osmline', 'place_id')
 
 184         await self.create_index('tiger', 'place_id')
 
 186         if 'search' in self.options:
 
 187             await self.create_spatial_index('postcode', 'geometry')
 
 188             await self.create_spatial_index('search_name', 'centroid')
 
 189             await self.create_index('search_name', 'place_id')
 
 190             await self.create_index('osmline', 'parent_place_id')
 
 191             await self.create_index('tiger', 'parent_place_id')
 
 192             await self.create_search_index()
 
 194             for t in self.dest.t.meta.tables:
 
 195                 if t.startswith('place_classtype_'):
 
 196                     await self.dest.execute(sa.select(
 
 197                       sa.func.CreateSpatialIndex(t, 'centroid')))
 
 200     async def create_spatial_index(self, table: str, column: str) -> None:
 
 201         """ Create a spatial index on the given table and column.
 
 203         await self.dest.execute(sa.select(
 
 204                   sa.func.CreateSpatialIndex(getattr(self.dest.t, table).name, column)))
 
 207     async def create_index(self, table_name: str, column: str) -> None:
 
 208         """ Create a simple index on the given table and column.
 
 210         table = getattr(self.dest.t, table_name)
 
 211         await self.dest.connection.run_sync(
 
 212             sa.Index(f"idx_{table}_{column}", getattr(table.c, column)).create)
 
 215     async def create_search_index(self) -> None:
 
 216         """ Create the tables and indexes needed for word lookup.
 
 218         LOG.warning("Creating reverse search table")
 
 219         rsn = sa.Table('reverse_search_name', self.dest.t.meta,
 
 220                        sa.Column('word', sa.Integer()),
 
 221                        sa.Column('column', sa.Text()),
 
 222                        sa.Column('places', IntArray))
 
 223         await self.dest.connection.run_sync(rsn.create)
 
 225         tsrc = self.src.t.search_name
 
 226         for column in ('name_vector', 'nameaddress_vector'):
 
 227             sql = sa.select(sa.func.unnest(getattr(tsrc.c, column)).label('word'),
 
 228                             sa.func.ArrayAgg(tsrc.c.place_id).label('places'))\
 
 231             async_result = await self.src.connection.stream(sql)
 
 232             async for partition in async_result.partitions(100):
 
 234                 for row in partition:
 
 236                     data.append({'word': row.word,
 
 238                                  'places': row.places})
 
 239                 await self.dest.execute(rsn.insert(), data)
 
 241         await self.dest.connection.run_sync(
 
 242             sa.Index('idx_reverse_search_name_word', rsn.c.word).create)
 
 245     def select_from(self, table: str) -> SaSelect:
 
 246         """ Create the SQL statement to select the source columns and rows.
 
 248         columns = self.src.t.meta.tables[table].c
 
 250         if table == 'placex':
 
 251             # SQLite struggles with Geometries that are larger than 5MB,
 
 253             return sa.select(*(c for c in columns if not isinstance(c.type, Geometry)),
 
 254                              sa.func.ST_AsText(columns.centroid).label('centroid'),
 
 256                                sa.case((sa.func.ST_MemSize(columns.geometry) < 5000000,
 
 258                                        else_=sa.func.ST_SimplifyPreserveTopology(
 
 259                                                 columns.geometry, 0.0001)
 
 260                                 )).label('geometry'))
 
 262         sql = sa.select(*(sa.func.ST_AsText(c).label(c.name)
 
 263                              if isinstance(c.type, Geometry) else c for c in columns))