1 # SPDX-License-Identifier: GPL-2.0-only
 
   3 # This file is part of Nominatim. (https://nominatim.org)
 
   5 # Copyright (C) 2022 by the Nominatim developer community.
 
   6 # For a full list of authors see the git log.
 
   8 Main work horse for indexing (computing addresses) the database.
 
  10 from typing import Optional, Any, cast
 
  14 import psycopg2.extras
 
  16 from nominatim.tokenizer.base import AbstractTokenizer
 
  17 from nominatim.indexer.progress import ProgressLogger
 
  18 from nominatim.indexer import runners
 
  19 from nominatim.db.async_connection import DBConnection, WorkerPool
 
  20 from nominatim.db.connection import connect, Connection, Cursor
 
  21 from nominatim.typing import DictCursorResults
 
  23 LOG = logging.getLogger()
 
  27     """ Asynchronous connection that fetches place details for processing.
 
  29     def __init__(self, dsn: str, setup_conn: Connection) -> None:
 
  31         self.current_ids: Optional[DictCursorResults] = None
 
  32         self.conn: Optional[DBConnection] = DBConnection(dsn,
 
  33                                                cursor_factory=psycopg2.extras.DictCursor)
 
  35         with setup_conn.cursor() as cur:
 
  36             # need to fetch those manually because register_hstore cannot
 
  37             # fetch them on an asynchronous connection below.
 
  38             hstore_oid = cur.scalar("SELECT 'hstore'::regtype::oid")
 
  39             hstore_array_oid = cur.scalar("SELECT 'hstore[]'::regtype::oid")
 
  41         psycopg2.extras.register_hstore(self.conn.conn, oid=hstore_oid,
 
  42                                         array_oid=hstore_array_oid)
 
  44     def close(self) -> None:
 
  45         """ Close the underlying asynchronous connection.
 
  52     def fetch_next_batch(self, cur: Cursor, runner: runners.Runner) -> bool:
 
  53         """ Send a request for the next batch of places.
 
  54             If details for the places are required, they will be fetched
 
  57             Returns true if there is still data available.
 
  59         ids = cast(Optional[DictCursorResults], cur.fetchmany(100))
 
  62             self.current_ids = None
 
  65         assert self.conn is not None
 
  66         self.current_ids = runner.get_place_details(self.conn, ids)
 
  70     def get_batch(self) -> DictCursorResults:
 
  71         """ Get the next batch of data, previously requested with
 
  74         assert self.conn is not None
 
  75         assert self.conn.cursor is not None
 
  77         if self.current_ids is not None and not self.current_ids:
 
  80             self.wait_time += time.time() - tstart
 
  81             self.current_ids = cast(Optional[DictCursorResults],
 
  82                                     self.conn.cursor.fetchall())
 
  84         return self.current_ids if self.current_ids is not None else []
 
  86     def __enter__(self) -> 'PlaceFetcher':
 
  90     def __exit__(self, exc_type: Any, exc_value: Any, traceback: Any) -> None:
 
  91         assert self.conn is not None
 
  97     """ Main indexing routine.
 
 100     def __init__(self, dsn: str, tokenizer: AbstractTokenizer, num_threads: int):
 
 102         self.tokenizer = tokenizer
 
 103         self.num_threads = num_threads
 
 106     def has_pending(self) -> bool:
 
 107         """ Check if any data still needs indexing.
 
 108             This function must only be used after the import has finished.
 
 109             Otherwise it will be very expensive.
 
 111         with connect(self.dsn) as conn:
 
 112             with conn.cursor() as cur:
 
 113                 cur.execute("SELECT 'a' FROM placex WHERE indexed_status > 0 LIMIT 1")
 
 114                 return cur.rowcount > 0
 
 117     def index_full(self, analyse: bool = True) -> None:
 
 118         """ Index the complete database. This will first index boundaries
 
 119             followed by all other objects. When `analyse` is True, then the
 
 120             database will be analysed at the appropriate places to
 
 121             ensure that database statistics are updated.
 
 123         with connect(self.dsn) as conn:
 
 124             conn.autocommit = True
 
 126             def _analyze() -> None:
 
 128                     with conn.cursor() as cur:
 
 129                         cur.execute('ANALYZE')
 
 131             if self.index_by_rank(0, 4) > 0:
 
 134             if self.index_boundaries(0, 30) > 100:
 
 137             if self.index_by_rank(5, 25) > 100:
 
 140             if self.index_by_rank(26, 30) > 1000:
 
 143             if self.index_postcodes() > 100:
 
 147     def index_boundaries(self, minrank: int, maxrank: int) -> int:
 
 148         """ Index only administrative boundaries within the given rank range.
 
 151         LOG.warning("Starting indexing boundaries using %s threads",
 
 154         with self.tokenizer.name_analyzer() as analyzer:
 
 155             for rank in range(max(minrank, 4), min(maxrank, 26)):
 
 156                 total += self._index(runners.BoundaryRunner(rank, analyzer))
 
 160     def index_by_rank(self, minrank: int, maxrank: int) -> int:
 
 161         """ Index all entries of placex in the given rank range (inclusive)
 
 162             in order of their address rank.
 
 164             When rank 30 is requested then also interpolations and
 
 165             places with address rank 0 will be indexed.
 
 168         maxrank = min(maxrank, 30)
 
 169         LOG.warning("Starting indexing rank (%i to %i) using %i threads",
 
 170                     minrank, maxrank, self.num_threads)
 
 172         with self.tokenizer.name_analyzer() as analyzer:
 
 173             for rank in range(max(1, minrank), maxrank + 1):
 
 174                 total += self._index(runners.RankRunner(rank, analyzer), 20 if rank == 30 else 1)
 
 177                 total += self._index(runners.RankRunner(0, analyzer))
 
 178                 total += self._index(runners.InterpolationRunner(analyzer), 20)
 
 183     def index_postcodes(self) -> int:
 
 184         """Index the entries of the location_postcode table.
 
 186         LOG.warning("Starting indexing postcodes using %s threads", self.num_threads)
 
 188         return self._index(runners.PostcodeRunner(), 20)
 
 191     def update_status_table(self) -> None:
 
 192         """ Update the status in the status table to 'indexed'.
 
 194         with connect(self.dsn) as conn:
 
 195             with conn.cursor() as cur:
 
 196                 cur.execute('UPDATE import_status SET indexed = true')
 
 200     def _index(self, runner: runners.Runner, batch: int = 1) -> int:
 
 201         """ Index a single rank or table. `runner` describes the SQL to use
 
 202             for indexing. `batch` describes the number of objects that
 
 203             should be processed with a single SQL statement
 
 205         LOG.warning("Starting %s (using batch size %s)", runner.name(), batch)
 
 207         with connect(self.dsn) as conn:
 
 208             psycopg2.extras.register_hstore(conn)
 
 209             with conn.cursor() as cur:
 
 210                 total_tuples = cur.scalar(runner.sql_count_objects())
 
 211                 LOG.debug("Total number of rows: %i", total_tuples)
 
 215             progress = ProgressLogger(runner.name(), total_tuples)
 
 218                 with conn.cursor(name='places') as cur:
 
 219                     cur.execute(runner.sql_get_objects())
 
 221                     with PlaceFetcher(self.dsn, conn) as fetcher:
 
 222                         with WorkerPool(self.dsn, self.num_threads) as pool:
 
 223                             has_more = fetcher.fetch_next_batch(cur, runner)
 
 225                                 places = fetcher.get_batch()
 
 227                                 # asynchronously get the next batch
 
 228                                 has_more = fetcher.fetch_next_batch(cur, runner)
 
 230                                 # And insert the current batch
 
 231                                 for idx in range(0, len(places), batch):
 
 232                                     part = places[idx:idx + batch]
 
 233                                     LOG.debug("Processing places: %s", str(part))
 
 234                                     runner.index_places(pool.next_free_worker(), part)
 
 235                                     progress.add(len(part))
 
 237                             LOG.info("Wait time: fetcher: %.2fs,  pool: %.2fs",
 
 238                                      fetcher.wait_time, pool.wait_time)
 
 242         return progress.done()