1 # SPDX-License-Identifier: GPL-2.0-only
 
   3 # This file is part of Nominatim. (https://nominatim.org)
 
   5 # Copyright (C) 2022 by the Nominatim developer community.
 
   6 # For a full list of authors see the git log.
 
   8 Main work horse for indexing (computing addresses) the database.
 
  13 import psycopg2.extras
 
  15 from nominatim.indexer.progress import ProgressLogger
 
  16 from nominatim.indexer import runners
 
  17 from nominatim.db.async_connection import DBConnection, WorkerPool
 
  18 from nominatim.db.connection import connect
 
  20 LOG = logging.getLogger()
 
  24     """ Asynchronous connection that fetches place details for processing.
 
  26     def __init__(self, dsn, setup_conn):
 
  28         self.current_ids = None
 
  29         self.conn = DBConnection(dsn, cursor_factory=psycopg2.extras.DictCursor)
 
  31         with setup_conn.cursor() as cur:
 
  32             # need to fetch those manually because register_hstore cannot
 
  33             # fetch them on an asynchronous connection below.
 
  34             hstore_oid = cur.scalar("SELECT 'hstore'::regtype::oid")
 
  35             hstore_array_oid = cur.scalar("SELECT 'hstore[]'::regtype::oid")
 
  37         psycopg2.extras.register_hstore(self.conn.conn, oid=hstore_oid,
 
  38                                         array_oid=hstore_array_oid)
 
  41         """ Close the underlying asynchronous connection.
 
  48     def fetch_next_batch(self, cur, runner):
 
  49         """ Send a request for the next batch of places.
 
  50             If details for the places are required, they will be fetched
 
  53             Returns true if there is still data available.
 
  55         ids = cur.fetchmany(100)
 
  58             self.current_ids = None
 
  61         if hasattr(runner, 'get_place_details'):
 
  62             runner.get_place_details(self.conn, ids)
 
  65             self.current_ids = ids
 
  70         """ Get the next batch of data, previously requested with
 
  73         if self.current_ids is not None and not self.current_ids:
 
  76             self.wait_time += time.time() - tstart
 
  77             self.current_ids = self.conn.cursor.fetchall()
 
  79         return self.current_ids
 
  85     def __exit__(self, exc_type, exc_value, traceback):
 
  91     """ Main indexing routine.
 
  94     def __init__(self, dsn, tokenizer, num_threads):
 
  96         self.tokenizer = tokenizer
 
  97         self.num_threads = num_threads
 
 100     def has_pending(self):
 
 101         """ Check if any data still needs indexing.
 
 102             This function must only be used after the import has finished.
 
 103             Otherwise it will be very expensive.
 
 105         with connect(self.dsn) as conn:
 
 106             with conn.cursor() as cur:
 
 107                 cur.execute("SELECT 'a' FROM placex WHERE indexed_status > 0 LIMIT 1")
 
 108                 return cur.rowcount > 0
 
 111     def index_full(self, analyse=True):
 
 112         """ Index the complete database. This will first index boundaries
 
 113             followed by all other objects. When `analyse` is True, then the
 
 114             database will be analysed at the appropriate places to
 
 115             ensure that database statistics are updated.
 
 117         with connect(self.dsn) as conn:
 
 118             conn.autocommit = True
 
 122                     with conn.cursor() as cur:
 
 123                         cur.execute('ANALYZE')
 
 125             self.index_by_rank(0, 4)
 
 128             self.index_boundaries(0, 30)
 
 131             self.index_by_rank(5, 25)
 
 134             self.index_by_rank(26, 30)
 
 137             self.index_postcodes()
 
 141     def index_boundaries(self, minrank, maxrank):
 
 142         """ Index only administrative boundaries within the given rank range.
 
 144         LOG.warning("Starting indexing boundaries using %s threads",
 
 147         with self.tokenizer.name_analyzer() as analyzer:
 
 148             for rank in range(max(minrank, 4), min(maxrank, 26)):
 
 149                 self._index(runners.BoundaryRunner(rank, analyzer))
 
 151     def index_by_rank(self, minrank, maxrank):
 
 152         """ Index all entries of placex in the given rank range (inclusive)
 
 153             in order of their address rank.
 
 155             When rank 30 is requested then also interpolations and
 
 156             places with address rank 0 will be indexed.
 
 158         maxrank = min(maxrank, 30)
 
 159         LOG.warning("Starting indexing rank (%i to %i) using %i threads",
 
 160                     minrank, maxrank, self.num_threads)
 
 162         with self.tokenizer.name_analyzer() as analyzer:
 
 163             for rank in range(max(1, minrank), maxrank):
 
 164                 self._index(runners.RankRunner(rank, analyzer))
 
 167                 self._index(runners.RankRunner(0, analyzer))
 
 168                 self._index(runners.InterpolationRunner(analyzer), 20)
 
 169                 self._index(runners.RankRunner(30, analyzer), 20)
 
 171                 self._index(runners.RankRunner(maxrank, analyzer))
 
 174     def index_postcodes(self):
 
 175         """Index the entries ofthe location_postcode table.
 
 177         LOG.warning("Starting indexing postcodes using %s threads", self.num_threads)
 
 179         self._index(runners.PostcodeRunner(), 20)
 
 182     def update_status_table(self):
 
 183         """ Update the status in the status table to 'indexed'.
 
 185         with connect(self.dsn) as conn:
 
 186             with conn.cursor() as cur:
 
 187                 cur.execute('UPDATE import_status SET indexed = true')
 
 191     def _index(self, runner, batch=1):
 
 192         """ Index a single rank or table. `runner` describes the SQL to use
 
 193             for indexing. `batch` describes the number of objects that
 
 194             should be processed with a single SQL statement
 
 196         LOG.warning("Starting %s (using batch size %s)", runner.name(), batch)
 
 198         with connect(self.dsn) as conn:
 
 199             psycopg2.extras.register_hstore(conn)
 
 200             with conn.cursor() as cur:
 
 201                 total_tuples = cur.scalar(runner.sql_count_objects())
 
 202                 LOG.debug("Total number of rows: %i", total_tuples)
 
 206             progress = ProgressLogger(runner.name(), total_tuples)
 
 209                 with conn.cursor(name='places') as cur:
 
 210                     cur.execute(runner.sql_get_objects())
 
 212                     with PlaceFetcher(self.dsn, conn) as fetcher:
 
 213                         with WorkerPool(self.dsn, self.num_threads) as pool:
 
 214                             has_more = fetcher.fetch_next_batch(cur, runner)
 
 216                                 places = fetcher.get_batch()
 
 218                                 # asynchronously get the next batch
 
 219                                 has_more = fetcher.fetch_next_batch(cur, runner)
 
 221                                 # And insert the curent batch
 
 222                                 for idx in range(0, len(places), batch):
 
 223                                     part = places[idx:idx + batch]
 
 224                                     LOG.debug("Processing places: %s", str(part))
 
 225                                     runner.index_places(pool.next_free_worker(), part)
 
 226                                     progress.add(len(part))
 
 228                             LOG.info("Wait time: fetcher: %.2fs,  pool: %.2fs",
 
 229                                      fetcher.wait_time, pool.wait_time)