2 Main work horse for indexing (computing addresses) the database.
 
   7 from nominatim.indexer.progress import ProgressLogger
 
   8 from nominatim.indexer import runners
 
   9 from nominatim.db.async_connection import DBConnection
 
  10 from nominatim.db.connection import connect
 
  12 LOG = logging.getLogger()
 
  15     """ A pool of asynchronous database connections.
 
  17         The pool may be used as a context manager.
 
  19     REOPEN_CONNECTIONS_AFTER = 100000
 
  21     def __init__(self, dsn, pool_size):
 
  22         self.threads = [DBConnection(dsn) for _ in range(pool_size)]
 
  23         self.free_workers = self._yield_free_worker()
 
  27         """ Wait for all connection to finish.
 
  29         for thread in self.threads:
 
  30             while not thread.is_done():
 
  33         self.free_workers = self._yield_free_worker()
 
  36         """ Close all connections and clear the pool.
 
  38         for thread in self.threads:
 
  41         self.free_workers = None
 
  44     def next_free_worker(self):
 
  45         """ Get the next free connection.
 
  47         return next(self.free_workers)
 
  50     def _yield_free_worker(self):
 
  59             if command_stat > self.REOPEN_CONNECTIONS_AFTER:
 
  60                 for thread in self.threads:
 
  61                     while not thread.is_done():
 
  67                 _, ready, _ = select.select([], self.threads, [])
 
  74     def __exit__(self, exc_type, exc_value, traceback):
 
  79     """ Main indexing routine.
 
  82     def __init__(self, dsn, num_threads):
 
  84         self.num_threads = num_threads
 
  87     def index_full(self, analyse=True):
 
  88         """ Index the complete database. This will first index boudnaries
 
  89             followed by all other objects. When `analyse` is True, then the
 
  90             database will be analysed at the appropriate places to
 
  91             ensure that database statistics are updated.
 
  93         with connect(self.dsn) as conn:
 
  94             conn.autocommit = True
 
  98                     with conn.cursor() as cur:
 
  99                         cur.execute('ANALYZE')
 
 104             self.index_by_rank(0, 4)
 
 107             self.index_boundaries(0, 30)
 
 110             self.index_by_rank(5, 25)
 
 113             self.index_by_rank(26, 30)
 
 116             self.index_postcodes()
 
 120     def index_boundaries(self, minrank, maxrank):
 
 121         """ Index only administrative boundaries within the given rank range.
 
 123         LOG.warning("Starting indexing boundaries using %s threads",
 
 126         for rank in range(max(minrank, 4), min(maxrank, 26)):
 
 127             self._index(runners.BoundaryRunner(rank))
 
 129     def index_by_rank(self, minrank, maxrank):
 
 130         """ Index all entries of placex in the given rank range (inclusive)
 
 131             in order of their address rank.
 
 133             When rank 30 is requested then also interpolations and
 
 134             places with address rank 0 will be indexed.
 
 136         maxrank = min(maxrank, 30)
 
 137         LOG.warning("Starting indexing rank (%i to %i) using %i threads",
 
 138                     minrank, maxrank, self.num_threads)
 
 140         for rank in range(max(1, minrank), maxrank):
 
 141             self._index(runners.RankRunner(rank))
 
 144             self._index(runners.RankRunner(0))
 
 145             self._index(runners.InterpolationRunner(), 20)
 
 146             self._index(runners.RankRunner(30), 20)
 
 148             self._index(runners.RankRunner(maxrank))
 
 151     def index_postcodes(self):
 
 152         """Index the entries ofthe location_postcode table.
 
 154         LOG.warning("Starting indexing postcodes using %s threads", self.num_threads)
 
 156         self._index(runners.PostcodeRunner(), 20)
 
 159     def update_status_table(self):
 
 160         """ Update the status in the status table to 'indexed'.
 
 162         with connect(self.dsn) as conn:
 
 163             with conn.cursor() as cur:
 
 164                 cur.execute('UPDATE import_status SET indexed = true')
 
 168     def _index(self, runner, batch=1):
 
 169         """ Index a single rank or table. `runner` describes the SQL to use
 
 170             for indexing. `batch` describes the number of objects that
 
 171             should be processed with a single SQL statement
 
 173         LOG.warning("Starting %s (using batch size %s)", runner.name(), batch)
 
 175         with connect(self.dsn) as conn:
 
 176             with conn.cursor() as cur:
 
 177                 total_tuples = cur.scalar(runner.sql_count_objects())
 
 178                 LOG.debug("Total number of rows: %i", total_tuples)
 
 182             progress = ProgressLogger(runner.name(), total_tuples)
 
 185                 with conn.cursor(name='places') as cur:
 
 186                     cur.execute(runner.sql_get_objects())
 
 188                     with WorkerPool(self.dsn, self.num_threads) as pool:
 
 190                             places = [p[0] for p in cur.fetchmany(batch)]
 
 194                             LOG.debug("Processing places: %s", str(places))
 
 195                             worker = pool.next_free_worker()
 
 197                             worker.perform(runner.sql_index_place(places))
 
 198                             progress.add(len(places))