]> git.openstreetmap.org Git - nominatim.git/commitdiff
extend sqlite converter for search tables
authorSarah Hoffmann <lonvia@denofr.de>
Wed, 6 Dec 2023 12:42:58 +0000 (13:42 +0100)
committerSarah Hoffmann <lonvia@denofr.de>
Thu, 7 Dec 2023 08:31:00 +0000 (09:31 +0100)
nominatim/db/sqlalchemy_types/int_array.py
nominatim/tools/convert_sqlite.py

index 335d5541972bebad04b7ab79367357bdba49bb29..499376cb85ca59d44119f2bcb4b4e17eeedd2f3f 100644 (file)
@@ -10,6 +10,7 @@ Custom type for an array of integers.
 from typing import Any, List, cast, Optional
 
 import sqlalchemy as sa
+from sqlalchemy.ext.compiler import compiles
 from sqlalchemy.dialects.postgresql import ARRAY
 
 from nominatim.typing import SaDialect, SaColumn
@@ -71,3 +72,16 @@ class IntArray(sa.types.TypeDecorator[Any]):
                 in the array.
             """
             return self.op('&&', is_comparison=True)(other)
+
+
+class ArrayAgg(sa.sql.functions.GenericFunction[Any]):
+    """ Aggregate function to collect elements in an array.
+    """
+    type = IntArray()
+    identifier = 'ArrayAgg'
+    name = 'array_agg'
+    inherit_cache = True
+
+@compiles(ArrayAgg, 'sqlite') # type: ignore[no-untyped-call, misc]
+def sqlite_array_agg(element: ArrayAgg, compiler: 'sa.Compiled', **kw: Any) -> str:
+    return "group_concat(%s, ',')" % compiler.process(element.clauses, **kw)
index 16f51b661a6f5849df2c6bd875c76cdc7bb3e025..d9e39ba37402b7a9dcc7455fa1665ce978b82eb9 100644 (file)
@@ -14,7 +14,8 @@ from pathlib import Path
 import sqlalchemy as sa
 
 from nominatim.typing import SaSelect
-from nominatim.db.sqlalchemy_types import Geometry
+from nominatim.db.sqlalchemy_types import Geometry, IntArray
+from nominatim.api.search.query_analyzer_factory import make_query_analyzer
 import nominatim.api as napi
 
 LOG = logging.getLogger()
@@ -54,18 +55,24 @@ class SqliteWriter:
         """ Create the database structure and copy the data from
             the source database to the destination.
         """
+        LOG.warning('Setting up spatialite')
         await self.dest.execute(sa.select(sa.func.InitSpatialMetaData(True, 'WGS84')))
 
         await self.create_tables()
         await self.copy_data()
+        if 'search' in self.options:
+            await self.create_word_table()
         await self.create_indexes()
 
 
     async def create_tables(self) -> None:
         """ Set up the database tables.
         """
+        LOG.warning('Setting up tables')
         if 'search' not in self.options:
             self.dest.t.meta.remove(self.dest.t.search_name)
+        else:
+            await self.create_class_tables()
 
         await self.dest.connection.run_sync(self.dest.t.meta.create_all)
 
@@ -78,6 +85,41 @@ class SqliteWriter:
                                                       col.type.subtype.upper(), 'XY')))
 
 
+    async def create_class_tables(self) -> None:
+        """ Set up the table that serve class/type-specific geometries.
+        """
+        sql = sa.text("""SELECT tablename FROM pg_tables
+                         WHERE tablename LIKE 'place_classtype_%'""")
+        for res in await self.src.execute(sql):
+            for db in (self.src, self.dest):
+                sa.Table(res[0], db.t.meta,
+                         sa.Column('place_id', sa.BigInteger),
+                         sa.Column('centroid', Geometry))
+
+
+    async def create_word_table(self) -> None:
+        """ Create the word table.
+            This table needs the property information to determine the
+            correct format. Therefore needs to be done after all other
+            data has been copied.
+        """
+        await make_query_analyzer(self.src)
+        await make_query_analyzer(self.dest)
+        src = self.src.t.meta.tables['word']
+        dest = self.dest.t.meta.tables['word']
+
+        await self.dest.connection.run_sync(dest.create)
+
+        LOG.warning("Copying word table")
+        async_result = await self.src.connection.stream(sa.select(src))
+
+        async for partition in async_result.partitions(10000):
+            data = [{k: getattr(r, k) for k in r._fields} for r in partition]
+            await self.dest.execute(dest.insert(), data)
+
+        await self.dest.connection.run_sync(sa.Index('idx_word_woken', dest.c.word_token).create)
+
+
     async def copy_data(self) -> None:
         """ Copy data for all registered tables.
         """
@@ -90,6 +132,14 @@ class SqliteWriter:
                         for r in partition]
                 await self.dest.execute(table.insert(), data)
 
+        # Set up a minimal copy of pg_tables used to look up the class tables later.
+        pg_tables = sa.Table('pg_tables', self.dest.t.meta,
+                             sa.Column('schemaname', sa.Text, default='public'),
+                             sa.Column('tablename', sa.Text))
+        await self.dest.connection.run_sync(pg_tables.create)
+        data = [{'tablename': t} for t in self.dest.t.meta.tables]
+        await self.dest.execute(pg_tables.insert().values(data))
+
 
     async def create_indexes(self) -> None:
         """ Add indexes necessary for the frontend.
@@ -119,6 +169,22 @@ class SqliteWriter:
         await self.create_index('placex', 'parent_place_id')
         await self.create_index('placex', 'rank_address')
         await self.create_index('addressline', 'place_id')
+        await self.create_index('postcode', 'place_id')
+        await self.create_index('osmline', 'place_id')
+        await self.create_index('tiger', 'place_id')
+
+        if 'search' in self.options:
+            await self.create_spatial_index('postcode', 'geometry')
+            await self.create_spatial_index('search_name', 'centroid')
+            await self.create_index('search_name', 'place_id')
+            await self.create_index('osmline', 'parent_place_id')
+            await self.create_index('tiger', 'parent_place_id')
+            await self.create_search_index()
+
+            for t in self.dest.t.meta.tables:
+                if t.startswith('place_classtype_'):
+                    await self.dest.execute(sa.select(
+                      sa.func.CreateSpatialIndex(t, 'centroid')))
 
 
     async def create_spatial_index(self, table: str, column: str) -> None:
@@ -136,6 +202,35 @@ class SqliteWriter:
             sa.Index(f"idx_{table}_{column}", getattr(table.c, column)).create)
 
 
+    async def create_search_index(self) -> None:
+        """ Create the tables and indexes needed for word lookup.
+        """
+        tsrc = self.src.t.search_name
+        for column in ('name_vector', 'nameaddress_vector'):
+            table_name = f'reverse_search_{column}'
+            LOG.warning("Creating reverse search %s", table_name)
+            rsn = sa.Table(table_name, self.dest.t.meta,
+                           sa.Column('word', sa.Integer()),
+                           sa.Column('places', IntArray))
+            await self.dest.connection.run_sync(rsn.create)
+
+            sql = sa.select(sa.func.unnest(getattr(tsrc.c, column)).label('word'),
+                            sa.func.ArrayAgg(tsrc.c.place_id).label('places'))\
+                    .group_by('word')
+
+            async_result = await self.src.connection.stream(sql)
+            async for partition in async_result.partitions(100):
+                data = []
+                for row in partition:
+                    row.places.sort()
+                    data.append({'word': row.word,
+                                 'places': row.places})
+                await self.dest.execute(rsn.insert(), data)
+
+            await self.dest.connection.run_sync(
+                sa.Index(f'idx_reverse_search_{column}_word', rsn.c.word).create)
+
+
     def select_from(self, table: str) -> SaSelect:
         """ Create the SQL statement to select the source columns and rows.
         """