2 Main work horse for indexing (computing addresses) the database.
 
  10 from nominatim.indexer.progress import ProgressLogger
 
  11 from nominatim.indexer import runners
 
  12 from nominatim.db.async_connection import DBConnection
 
  13 from nominatim.db.connection import connect
 
  15 LOG = logging.getLogger()
 
  19     """ Asynchronous connection that fetches place details for processing.
 
  21     def __init__(self, dsn, setup_conn):
 
  23         self.current_ids = None
 
  24         self.conn = DBConnection(dsn, cursor_factory=psycopg2.extras.DictCursor)
 
  26         with setup_conn.cursor() as cur:
 
  27             # need to fetch those manually because register_hstore cannot
 
  28             # fetch them on an asynchronous connection below.
 
  29             hstore_oid = cur.scalar("SELECT 'hstore'::regtype::oid")
 
  30             hstore_array_oid = cur.scalar("SELECT 'hstore[]'::regtype::oid")
 
  32         psycopg2.extras.register_hstore(self.conn.conn, oid=hstore_oid,
 
  33                                         array_oid=hstore_array_oid)
 
  36         """ Close the underlying asynchronous connection.
 
  43     def fetch_next_batch(self, cur, runner):
 
  44         """ Send a request for the next batch of places.
 
  45             If details for the places are required, they will be fetched
 
  48             Returns true if there is still data available.
 
  50         ids = cur.fetchmany(100)
 
  53             self.current_ids = None
 
  56         if hasattr(runner, 'get_place_details'):
 
  57             runner.get_place_details(self.conn, ids)
 
  60             self.current_ids = ids
 
  65         """ Get the next batch of data, previously requested with
 
  68         if self.current_ids is not None and not self.current_ids:
 
  71             self.wait_time += time.time() - tstart
 
  72             self.current_ids = self.conn.cursor.fetchall()
 
  74         return self.current_ids
 
  80     def __exit__(self, exc_type, exc_value, traceback):
 
  85     """ A pool of asynchronous database connections.
 
  87         The pool may be used as a context manager.
 
  89     REOPEN_CONNECTIONS_AFTER = 100000
 
  91     def __init__(self, dsn, pool_size):
 
  92         self.threads = [DBConnection(dsn) for _ in range(pool_size)]
 
  93         self.free_workers = self._yield_free_worker()
 
  98         """ Wait for all connection to finish.
 
 100         for thread in self.threads:
 
 101             while not thread.is_done():
 
 104         self.free_workers = self._yield_free_worker()
 
 107         """ Close all connections and clear the pool.
 
 109         for thread in self.threads:
 
 112         self.free_workers = None
 
 115     def next_free_worker(self):
 
 116         """ Get the next free connection.
 
 118         return next(self.free_workers)
 
 121     def _yield_free_worker(self):
 
 130             if command_stat > self.REOPEN_CONNECTIONS_AFTER:
 
 131                 for thread in self.threads:
 
 132                     while not thread.is_done():
 
 139                 _, ready, _ = select.select([], self.threads, [])
 
 140                 self.wait_time += time.time() - tstart
 
 147     def __exit__(self, exc_type, exc_value, traceback):
 
 153     """ Main indexing routine.
 
 156     def __init__(self, dsn, tokenizer, num_threads):
 
 158         self.tokenizer = tokenizer
 
 159         self.num_threads = num_threads
 
 162     def index_full(self, analyse=True):
 
 163         """ Index the complete database. This will first index boudnaries
 
 164             followed by all other objects. When `analyse` is True, then the
 
 165             database will be analysed at the appropriate places to
 
 166             ensure that database statistics are updated.
 
 168         with connect(self.dsn) as conn:
 
 169             conn.autocommit = True
 
 173                     with conn.cursor() as cur:
 
 174                         cur.execute('ANALYZE')
 
 179             self.index_by_rank(0, 4)
 
 182             self.index_boundaries(0, 30)
 
 185             self.index_by_rank(5, 25)
 
 188             self.index_by_rank(26, 30)
 
 191             self.index_postcodes()
 
 195     def index_boundaries(self, minrank, maxrank):
 
 196         """ Index only administrative boundaries within the given rank range.
 
 198         LOG.warning("Starting indexing boundaries using %s threads",
 
 201         with self.tokenizer.name_analyzer() as analyzer:
 
 202             for rank in range(max(minrank, 4), min(maxrank, 26)):
 
 203                 self._index(runners.BoundaryRunner(rank, analyzer))
 
 205     def index_by_rank(self, minrank, maxrank):
 
 206         """ Index all entries of placex in the given rank range (inclusive)
 
 207             in order of their address rank.
 
 209             When rank 30 is requested then also interpolations and
 
 210             places with address rank 0 will be indexed.
 
 212         maxrank = min(maxrank, 30)
 
 213         LOG.warning("Starting indexing rank (%i to %i) using %i threads",
 
 214                     minrank, maxrank, self.num_threads)
 
 216         with self.tokenizer.name_analyzer() as analyzer:
 
 217             for rank in range(max(1, minrank), maxrank):
 
 218                 self._index(runners.RankRunner(rank, analyzer))
 
 221                 self._index(runners.RankRunner(0, analyzer))
 
 222                 self._index(runners.InterpolationRunner(analyzer), 20)
 
 223                 self._index(runners.RankRunner(30, analyzer), 20)
 
 225                 self._index(runners.RankRunner(maxrank, analyzer))
 
 228     def index_postcodes(self):
 
 229         """Index the entries ofthe location_postcode table.
 
 231         LOG.warning("Starting indexing postcodes using %s threads", self.num_threads)
 
 233         self._index(runners.PostcodeRunner(), 20)
 
 236     def update_status_table(self):
 
 237         """ Update the status in the status table to 'indexed'.
 
 239         with connect(self.dsn) as conn:
 
 240             with conn.cursor() as cur:
 
 241                 cur.execute('UPDATE import_status SET indexed = true')
 
 245     def _index(self, runner, batch=1):
 
 246         """ Index a single rank or table. `runner` describes the SQL to use
 
 247             for indexing. `batch` describes the number of objects that
 
 248             should be processed with a single SQL statement
 
 250         LOG.warning("Starting %s (using batch size %s)", runner.name(), batch)
 
 252         with connect(self.dsn) as conn:
 
 253             psycopg2.extras.register_hstore(conn)
 
 254             with conn.cursor() as cur:
 
 255                 total_tuples = cur.scalar(runner.sql_count_objects())
 
 256                 LOG.debug("Total number of rows: %i", total_tuples)
 
 260             progress = ProgressLogger(runner.name(), total_tuples)
 
 263                 with conn.cursor(name='places') as cur:
 
 264                     cur.execute(runner.sql_get_objects())
 
 266                     with PlaceFetcher(self.dsn, conn) as fetcher:
 
 267                         with WorkerPool(self.dsn, self.num_threads) as pool:
 
 268                             has_more = fetcher.fetch_next_batch(cur, runner)
 
 270                                 places = fetcher.get_batch()
 
 272                                 # asynchronously get the next batch
 
 273                                 has_more = fetcher.fetch_next_batch(cur, runner)
 
 275                                 # And insert the curent batch
 
 276                                 for idx in range(0, len(places), batch):
 
 277                                     part = places[idx:idx+batch]
 
 278                                     LOG.debug("Processing places: %s", str(part))
 
 279                                     runner.index_places(pool.next_free_worker(), part)
 
 280                                     progress.add(len(part))
 
 282                             LOG.info("Wait time: fetcher: %.2fs,  pool: %.2fs",
 
 283                                      fetcher.wait_time, pool.wait_time)