1 # SPDX-License-Identifier: GPL-3.0-or-later
 
   3 # This file is part of Nominatim. (https://nominatim.org)
 
   5 # Copyright (C) 2024 by the Nominatim developer community.
 
   6 # For a full list of authors see the git log.
 
   8 Main work horse for indexing (computing addresses) the database.
 
  10 from typing import cast, List, Any, Optional
 
  16 from ..db.connection import connect, execute_scalar
 
  17 from ..db.query_pool import QueryPool
 
  18 from ..tokenizer.base import AbstractTokenizer
 
  19 from .progress import ProgressLogger
 
  22 LOG = logging.getLogger()
 
  26     """ Main indexing routine.
 
  29     def __init__(self, dsn: str, tokenizer: AbstractTokenizer, num_threads: int):
 
  31         self.tokenizer = tokenizer
 
  32         self.num_threads = num_threads
 
  34     def has_pending(self) -> bool:
 
  35         """ Check if any data still needs indexing.
 
  36             This function must only be used after the import has finished.
 
  37             Otherwise it will be very expensive.
 
  39         with connect(self.dsn) as conn:
 
  40             with conn.cursor() as cur:
 
  41                 cur.execute("SELECT 'a' FROM placex WHERE indexed_status > 0 LIMIT 1")
 
  42                 return cur.rowcount > 0
 
  44     async def index_full(self, analyse: bool = True) -> None:
 
  45         """ Index the complete database. This will first index boundaries
 
  46             followed by all other objects. When `analyse` is True, then the
 
  47             database will be analysed at the appropriate places to
 
  48             ensure that database statistics are updated.
 
  50         with connect(self.dsn) as conn:
 
  51             conn.autocommit = True
 
  53             def _analyze() -> None:
 
  55                     with conn.cursor() as cur:
 
  56                         cur.execute('ANALYZE')
 
  59                 if await self.index_by_rank(0, 4) > 0:
 
  62                 if await self.index_boundaries(0, 30) > 100:
 
  65                 if await self.index_by_rank(5, 25) > 100:
 
  68                 if await self.index_by_rank(26, 30) > 1000:
 
  71                 if await self.index_postcodes() > 100:
 
  74                 if not self.has_pending():
 
  77     async def index_boundaries(self, minrank: int, maxrank: int) -> int:
 
  78         """ Index only administrative boundaries within the given rank range.
 
  81         LOG.warning("Starting indexing boundaries using %s threads",
 
  84         minrank = max(minrank, 4)
 
  85         maxrank = min(maxrank, 25)
 
  87         # Precompute number of rows to process for all rows
 
  88         with connect(self.dsn) as conn:
 
  89             hstore_info = psycopg.types.TypeInfo.fetch(conn, "hstore")
 
  90             if hstore_info is None:
 
  91                 raise RuntimeError('Hstore extension is requested but not installed.')
 
  92             psycopg.types.hstore.register_hstore(hstore_info)
 
  94             with conn.cursor() as cur:
 
  95                 cur = conn.execute(""" SELECT rank_search, count(*)
 
  97                                        WHERE rank_search between %s and %s
 
  98                                              AND class = 'boundary' and type = 'administrative'
 
  99                                              AND indexed_status > 0
 
 100                                        GROUP BY rank_search""",
 
 102                 total_tuples = {row.rank_search: row.count for row in cur}
 
 104         with self.tokenizer.name_analyzer() as analyzer:
 
 105             for rank in range(minrank, maxrank + 1):
 
 106                 total += await self._index(runners.BoundaryRunner(rank, analyzer),
 
 107                                            total_tuples=total_tuples.get(rank, 0))
 
 111     async def index_by_rank(self, minrank: int, maxrank: int) -> int:
 
 112         """ Index all entries of placex in the given rank range (inclusive)
 
 113             in order of their address rank.
 
 115             When rank 30 is requested then also interpolations and
 
 116             places with address rank 0 will be indexed.
 
 119         maxrank = min(maxrank, 30)
 
 120         LOG.warning("Starting indexing rank (%i to %i) using %i threads",
 
 121                     minrank, maxrank, self.num_threads)
 
 123         # Precompute number of rows to process for all rows
 
 124         with connect(self.dsn) as conn:
 
 125             hstore_info = psycopg.types.TypeInfo.fetch(conn, "hstore")
 
 126             if hstore_info is None:
 
 127                 raise RuntimeError('Hstore extension is requested but not installed.')
 
 128             psycopg.types.hstore.register_hstore(hstore_info)
 
 130             with conn.cursor() as cur:
 
 131                 cur = conn.execute(""" SELECT rank_address, count(*)
 
 133                                        WHERE rank_address between %s and %s
 
 134                                              AND indexed_status > 0
 
 135                                        GROUP BY rank_address""",
 
 137                 total_tuples = {row.rank_address: row.count for row in cur}
 
 139         with self.tokenizer.name_analyzer() as analyzer:
 
 140             for rank in range(max(1, minrank), maxrank + 1):
 
 147                 total += await self._index(runners.RankRunner(rank, analyzer),
 
 148                                            batch=batch, total_tuples=total_tuples.get(rank, 0))
 
 151                 total += await self._index(runners.RankRunner(0, analyzer))
 
 152                 total += await self._index(runners.InterpolationRunner(analyzer), batch=20)
 
 156     async def index_postcodes(self) -> int:
 
 157         """Index the entries of the location_postcode table.
 
 159         LOG.warning("Starting indexing postcodes using %s threads", self.num_threads)
 
 161         return await self._index(runners.PostcodeRunner(), batch=20)
 
 163     def update_status_table(self) -> None:
 
 164         """ Update the status in the status table to 'indexed'.
 
 166         with connect(self.dsn) as conn:
 
 167             with conn.cursor() as cur:
 
 168                 cur.execute('UPDATE import_status SET indexed = true')
 
 172     async def _index(self, runner: runners.Runner, batch: int = 1,
 
 173                      total_tuples: Optional[int] = None) -> int:
 
 174         """ Index a single rank or table. `runner` describes the SQL to use
 
 175             for indexing. `batch` describes the number of objects that
 
 176             should be processed with a single SQL statement.
 
 178             `total_tuples` may contain the total number of rows to process.
 
 179             When not supplied, the value will be computed using the
 
 180             approriate runner function.
 
 182         LOG.warning("Starting %s (using batch size %s)", runner.name(), batch)
 
 184         if total_tuples is None:
 
 185             total_tuples = self._prepare_indexing(runner)
 
 187         progress = ProgressLogger(runner.name(), total_tuples)
 
 190             async with await psycopg.AsyncConnection.connect(
 
 191                                  self.dsn, row_factory=psycopg.rows.dict_row) as aconn, \
 
 192                        QueryPool(self.dsn, self.num_threads, autocommit=True) as pool:
 
 195                 async with aconn.cursor(name='places') as cur:
 
 196                     query = runner.index_places_query(batch)
 
 197                     params: List[Any] = []
 
 199                     async for place in cur.stream(runner.sql_get_objects()):
 
 200                         fetcher_time += time.time() - tstart
 
 202                         params.extend(runner.index_places_params(place))
 
 205                         if num_places >= batch:
 
 206                             LOG.debug("Processing places: %s", str(params))
 
 207                             await pool.put_query(query, params)
 
 208                             progress.add(num_places)
 
 215                     await pool.put_query(runner.index_places_query(num_places), params)
 
 217             LOG.info("Wait time: fetcher: %.2fs,  pool: %.2fs",
 
 218                      fetcher_time, pool.wait_time)
 
 220         return progress.done()
 
 222     def _prepare_indexing(self, runner: runners.Runner) -> int:
 
 223         with connect(self.dsn) as conn:
 
 224             hstore_info = psycopg.types.TypeInfo.fetch(conn, "hstore")
 
 225             if hstore_info is None:
 
 226                 raise RuntimeError('Hstore extension is requested but not installed.')
 
 227             psycopg.types.hstore.register_hstore(hstore_info)
 
 229             total_tuples = execute_scalar(conn, runner.sql_count_objects())
 
 230             LOG.debug("Total number of rows: %i", total_tuples)
 
 231         return cast(int, total_tuples)