2 Main work horse for indexing (computing addresses) the database.
 
   9 from nominatim.indexer.progress import ProgressLogger
 
  10 from nominatim.indexer import runners
 
  11 from nominatim.db.async_connection import DBConnection
 
  12 from nominatim.db.connection import connect
 
  14 LOG = logging.getLogger()
 
  17     """ A pool of asynchronous database connections.
 
  19         The pool may be used as a context manager.
 
  21     REOPEN_CONNECTIONS_AFTER = 100000
 
  23     def __init__(self, dsn, pool_size):
 
  24         self.threads = [DBConnection(dsn) for _ in range(pool_size)]
 
  25         self.free_workers = self._yield_free_worker()
 
  29         """ Wait for all connection to finish.
 
  31         for thread in self.threads:
 
  32             while not thread.is_done():
 
  35         self.free_workers = self._yield_free_worker()
 
  38         """ Close all connections and clear the pool.
 
  40         for thread in self.threads:
 
  43         self.free_workers = None
 
  46     def next_free_worker(self):
 
  47         """ Get the next free connection.
 
  49         return next(self.free_workers)
 
  52     def _yield_free_worker(self):
 
  61             if command_stat > self.REOPEN_CONNECTIONS_AFTER:
 
  62                 for thread in self.threads:
 
  63                     while not thread.is_done():
 
  69                 _, ready, _ = select.select([], self.threads, [])
 
  76     def __exit__(self, exc_type, exc_value, traceback):
 
  81     """ Main indexing routine.
 
  84     def __init__(self, dsn, tokenizer, num_threads):
 
  86         self.tokenizer = tokenizer
 
  87         self.num_threads = num_threads
 
  90     def index_full(self, analyse=True):
 
  91         """ Index the complete database. This will first index boudnaries
 
  92             followed by all other objects. When `analyse` is True, then the
 
  93             database will be analysed at the appropriate places to
 
  94             ensure that database statistics are updated.
 
  96         with connect(self.dsn) as conn:
 
  97             conn.autocommit = True
 
 101                     with conn.cursor() as cur:
 
 102                         cur.execute('ANALYZE')
 
 107             self.index_by_rank(0, 4)
 
 110             self.index_boundaries(0, 30)
 
 113             self.index_by_rank(5, 25)
 
 116             self.index_by_rank(26, 30)
 
 119             self.index_postcodes()
 
 123     def index_boundaries(self, minrank, maxrank):
 
 124         """ Index only administrative boundaries within the given rank range.
 
 126         LOG.warning("Starting indexing boundaries using %s threads",
 
 129         with self.tokenizer.name_analyzer() as analyzer:
 
 130             for rank in range(max(minrank, 4), min(maxrank, 26)):
 
 131                 self._index(runners.BoundaryRunner(rank, analyzer))
 
 133     def index_by_rank(self, minrank, maxrank):
 
 134         """ Index all entries of placex in the given rank range (inclusive)
 
 135             in order of their address rank.
 
 137             When rank 30 is requested then also interpolations and
 
 138             places with address rank 0 will be indexed.
 
 140         maxrank = min(maxrank, 30)
 
 141         LOG.warning("Starting indexing rank (%i to %i) using %i threads",
 
 142                     minrank, maxrank, self.num_threads)
 
 144         with self.tokenizer.name_analyzer() as analyzer:
 
 145             for rank in range(max(1, minrank), maxrank):
 
 146                 self._index(runners.RankRunner(rank, analyzer))
 
 149                 self._index(runners.RankRunner(0, analyzer))
 
 150                 self._index(runners.InterpolationRunner(analyzer), 20)
 
 151                 self._index(runners.RankRunner(30, analyzer), 20)
 
 153                 self._index(runners.RankRunner(maxrank, analyzer))
 
 156     def index_postcodes(self):
 
 157         """Index the entries ofthe location_postcode table.
 
 159         LOG.warning("Starting indexing postcodes using %s threads", self.num_threads)
 
 161         self._index(runners.PostcodeRunner(), 20)
 
 164     def update_status_table(self):
 
 165         """ Update the status in the status table to 'indexed'.
 
 167         with connect(self.dsn) as conn:
 
 168             with conn.cursor() as cur:
 
 169                 cur.execute('UPDATE import_status SET indexed = true')
 
 173     def _index(self, runner, batch=1):
 
 174         """ Index a single rank or table. `runner` describes the SQL to use
 
 175             for indexing. `batch` describes the number of objects that
 
 176             should be processed with a single SQL statement
 
 178         LOG.warning("Starting %s (using batch size %s)", runner.name(), batch)
 
 180         with connect(self.dsn) as conn:
 
 181             psycopg2.extras.register_hstore(conn)
 
 182             with conn.cursor() as cur:
 
 183                 total_tuples = cur.scalar(runner.sql_count_objects())
 
 184                 LOG.debug("Total number of rows: %i", total_tuples)
 
 186                 hstore_oid = cur.scalar("SELECT 'hstore'::regtype::oid")
 
 187                 hstore_array_oid = cur.scalar("SELECT 'hstore[]'::regtype::oid")
 
 191             progress = ProgressLogger(runner.name(), total_tuples)
 
 194                 with conn.cursor(name='places') as cur:
 
 195                     cur.execute(runner.sql_get_objects())
 
 197                     fetcher = DBConnection(self.dsn)
 
 198                     psycopg2.extras.register_hstore(fetcher.conn,
 
 200                                                     array_oid=hstore_array_oid)
 
 202                     with WorkerPool(self.dsn, self.num_threads) as pool:
 
 203                         places = self._fetch_next_batch(cur, fetcher, runner)
 
 204                         while places is not None:
 
 207                                 places = fetcher.cursor.fetchall()
 
 209                             # asynchronously get the next batch
 
 210                             next_places = self._fetch_next_batch(cur, fetcher, runner)
 
 212                             # And insert the curent batch
 
 213                             for idx in range(0, len(places), batch):
 
 214                                 worker = pool.next_free_worker()
 
 215                                 part = places[idx:idx+batch]
 
 216                                 LOG.debug("Processing places: %s", str(part))
 
 217                                 runner.index_places(worker, part)
 
 218                                 progress.add(len(part))
 
 232     def _fetch_next_batch(self, cur, fetcher, runner):
 
 233         ids = cur.fetchmany(1000)
 
 238         if not hasattr(runner, 'get_place_details'):
 
 241         runner.get_place_details(fetcher, ids)