2 Main work horse for indexing (computing addresses) the database.
 
   9 from nominatim.indexer.progress import ProgressLogger
 
  10 from nominatim.indexer import runners
 
  11 from nominatim.db.async_connection import DBConnection, WorkerPool
 
  12 from nominatim.db.connection import connect
 
  14 LOG = logging.getLogger()
 
  18     """ Asynchronous connection that fetches place details for processing.
 
  20     def __init__(self, dsn, setup_conn):
 
  22         self.current_ids = None
 
  23         self.conn = DBConnection(dsn, cursor_factory=psycopg2.extras.DictCursor)
 
  25         with setup_conn.cursor() as cur:
 
  26             # need to fetch those manually because register_hstore cannot
 
  27             # fetch them on an asynchronous connection below.
 
  28             hstore_oid = cur.scalar("SELECT 'hstore'::regtype::oid")
 
  29             hstore_array_oid = cur.scalar("SELECT 'hstore[]'::regtype::oid")
 
  31         psycopg2.extras.register_hstore(self.conn.conn, oid=hstore_oid,
 
  32                                         array_oid=hstore_array_oid)
 
  35         """ Close the underlying asynchronous connection.
 
  42     def fetch_next_batch(self, cur, runner):
 
  43         """ Send a request for the next batch of places.
 
  44             If details for the places are required, they will be fetched
 
  47             Returns true if there is still data available.
 
  49         ids = cur.fetchmany(100)
 
  52             self.current_ids = None
 
  55         if hasattr(runner, 'get_place_details'):
 
  56             runner.get_place_details(self.conn, ids)
 
  59             self.current_ids = ids
 
  64         """ Get the next batch of data, previously requested with
 
  67         if self.current_ids is not None and not self.current_ids:
 
  70             self.wait_time += time.time() - tstart
 
  71             self.current_ids = self.conn.cursor.fetchall()
 
  73         return self.current_ids
 
  79     def __exit__(self, exc_type, exc_value, traceback):
 
  85     """ Main indexing routine.
 
  88     def __init__(self, dsn, tokenizer, num_threads):
 
  90         self.tokenizer = tokenizer
 
  91         self.num_threads = num_threads
 
  94     def index_full(self, analyse=True):
 
  95         """ Index the complete database. This will first index boudnaries
 
  96             followed by all other objects. When `analyse` is True, then the
 
  97             database will be analysed at the appropriate places to
 
  98             ensure that database statistics are updated.
 
 100         with connect(self.dsn) as conn:
 
 101             conn.autocommit = True
 
 105                     with conn.cursor() as cur:
 
 106                         cur.execute('ANALYZE')
 
 111             self.index_by_rank(0, 4)
 
 114             self.index_boundaries(0, 30)
 
 117             self.index_by_rank(5, 25)
 
 120             self.index_by_rank(26, 30)
 
 123             self.index_postcodes()
 
 127     def index_boundaries(self, minrank, maxrank):
 
 128         """ Index only administrative boundaries within the given rank range.
 
 130         LOG.warning("Starting indexing boundaries using %s threads",
 
 133         with self.tokenizer.name_analyzer() as analyzer:
 
 134             for rank in range(max(minrank, 4), min(maxrank, 26)):
 
 135                 self._index(runners.BoundaryRunner(rank, analyzer))
 
 137     def index_by_rank(self, minrank, maxrank):
 
 138         """ Index all entries of placex in the given rank range (inclusive)
 
 139             in order of their address rank.
 
 141             When rank 30 is requested then also interpolations and
 
 142             places with address rank 0 will be indexed.
 
 144         maxrank = min(maxrank, 30)
 
 145         LOG.warning("Starting indexing rank (%i to %i) using %i threads",
 
 146                     minrank, maxrank, self.num_threads)
 
 148         with self.tokenizer.name_analyzer() as analyzer:
 
 149             for rank in range(max(1, minrank), maxrank):
 
 150                 self._index(runners.RankRunner(rank, analyzer))
 
 153                 self._index(runners.RankRunner(0, analyzer))
 
 154                 self._index(runners.InterpolationRunner(analyzer), 20)
 
 155                 self._index(runners.RankRunner(30, analyzer), 20)
 
 157                 self._index(runners.RankRunner(maxrank, analyzer))
 
 160     def index_postcodes(self):
 
 161         """Index the entries ofthe location_postcode table.
 
 163         LOG.warning("Starting indexing postcodes using %s threads", self.num_threads)
 
 165         self._index(runners.PostcodeRunner(), 20)
 
 168     def update_status_table(self):
 
 169         """ Update the status in the status table to 'indexed'.
 
 171         with connect(self.dsn) as conn:
 
 172             with conn.cursor() as cur:
 
 173                 cur.execute('UPDATE import_status SET indexed = true')
 
 177     def _index(self, runner, batch=1):
 
 178         """ Index a single rank or table. `runner` describes the SQL to use
 
 179             for indexing. `batch` describes the number of objects that
 
 180             should be processed with a single SQL statement
 
 182         LOG.warning("Starting %s (using batch size %s)", runner.name(), batch)
 
 184         with connect(self.dsn) as conn:
 
 185             psycopg2.extras.register_hstore(conn)
 
 186             with conn.cursor() as cur:
 
 187                 total_tuples = cur.scalar(runner.sql_count_objects())
 
 188                 LOG.debug("Total number of rows: %i", total_tuples)
 
 192             progress = ProgressLogger(runner.name(), total_tuples)
 
 195                 with conn.cursor(name='places') as cur:
 
 196                     cur.execute(runner.sql_get_objects())
 
 198                     with PlaceFetcher(self.dsn, conn) as fetcher:
 
 199                         with WorkerPool(self.dsn, self.num_threads) as pool:
 
 200                             has_more = fetcher.fetch_next_batch(cur, runner)
 
 202                                 places = fetcher.get_batch()
 
 204                                 # asynchronously get the next batch
 
 205                                 has_more = fetcher.fetch_next_batch(cur, runner)
 
 207                                 # And insert the curent batch
 
 208                                 for idx in range(0, len(places), batch):
 
 209                                     part = places[idx:idx+batch]
 
 210                                     LOG.debug("Processing places: %s", str(part))
 
 211                                     runner.index_places(pool.next_free_worker(), part)
 
 212                                     progress.add(len(part))
 
 214                             LOG.info("Wait time: fetcher: %.2fs,  pool: %.2fs",
 
 215                                      fetcher.wait_time, pool.wait_time)