2 Main work horse for indexing (computing addresses) the database.
 
   9 from nominatim.indexer.progress import ProgressLogger
 
  10 from nominatim.indexer import runners
 
  11 from nominatim.db.async_connection import DBConnection, WorkerPool
 
  12 from nominatim.db.connection import connect
 
  14 LOG = logging.getLogger()
 
  18     """ Asynchronous connection that fetches place details for processing.
 
  20     def __init__(self, dsn, setup_conn):
 
  22         self.current_ids = None
 
  23         self.conn = DBConnection(dsn, cursor_factory=psycopg2.extras.DictCursor)
 
  25         with setup_conn.cursor() as cur:
 
  26             # need to fetch those manually because register_hstore cannot
 
  27             # fetch them on an asynchronous connection below.
 
  28             hstore_oid = cur.scalar("SELECT 'hstore'::regtype::oid")
 
  29             hstore_array_oid = cur.scalar("SELECT 'hstore[]'::regtype::oid")
 
  31         psycopg2.extras.register_hstore(self.conn.conn, oid=hstore_oid,
 
  32                                         array_oid=hstore_array_oid)
 
  35         """ Close the underlying asynchronous connection.
 
  42     def fetch_next_batch(self, cur, runner):
 
  43         """ Send a request for the next batch of places.
 
  44             If details for the places are required, they will be fetched
 
  47             Returns true if there is still data available.
 
  49         ids = cur.fetchmany(100)
 
  52             self.current_ids = None
 
  55         if hasattr(runner, 'get_place_details'):
 
  56             runner.get_place_details(self.conn, ids)
 
  59             self.current_ids = ids
 
  64         """ Get the next batch of data, previously requested with
 
  67         if self.current_ids is not None and not self.current_ids:
 
  70             self.wait_time += time.time() - tstart
 
  71             self.current_ids = self.conn.cursor.fetchall()
 
  73         return self.current_ids
 
  79     def __exit__(self, exc_type, exc_value, traceback):
 
  85     """ Main indexing routine.
 
  88     def __init__(self, dsn, tokenizer, num_threads):
 
  90         self.tokenizer = tokenizer
 
  91         self.num_threads = num_threads
 
  94     def has_pending(self):
 
  95         """ Check if any data still needs indexing.
 
  96             This function must only be used after the import has finished.
 
  97             Otherwise it will be very expensive.
 
  99         with connect(self.dsn) as conn:
 
 100             with conn.cursor() as cur:
 
 101                 cur.execute("SELECT 'a' FROM placex WHERE indexed_status > 0 LIMIT 1")
 
 102                 return cur.rowcount > 0
 
 105     def index_full(self, analyse=True):
 
 106         """ Index the complete database. This will first index boundaries
 
 107             followed by all other objects. When `analyse` is True, then the
 
 108             database will be analysed at the appropriate places to
 
 109             ensure that database statistics are updated.
 
 111         with connect(self.dsn) as conn:
 
 112             conn.autocommit = True
 
 116                     with conn.cursor() as cur:
 
 117                         cur.execute('ANALYZE')
 
 119             self.index_by_rank(0, 4)
 
 122             self.index_boundaries(0, 30)
 
 125             self.index_by_rank(5, 25)
 
 128             self.index_by_rank(26, 30)
 
 131             self.index_postcodes()
 
 135     def index_boundaries(self, minrank, maxrank):
 
 136         """ Index only administrative boundaries within the given rank range.
 
 138         LOG.warning("Starting indexing boundaries using %s threads",
 
 141         with self.tokenizer.name_analyzer() as analyzer:
 
 142             for rank in range(max(minrank, 4), min(maxrank, 26)):
 
 143                 self._index(runners.BoundaryRunner(rank, analyzer))
 
 145     def index_by_rank(self, minrank, maxrank):
 
 146         """ Index all entries of placex in the given rank range (inclusive)
 
 147             in order of their address rank.
 
 149             When rank 30 is requested then also interpolations and
 
 150             places with address rank 0 will be indexed.
 
 152         maxrank = min(maxrank, 30)
 
 153         LOG.warning("Starting indexing rank (%i to %i) using %i threads",
 
 154                     minrank, maxrank, self.num_threads)
 
 156         with self.tokenizer.name_analyzer() as analyzer:
 
 157             for rank in range(max(1, minrank), maxrank):
 
 158                 self._index(runners.RankRunner(rank, analyzer))
 
 161                 self._index(runners.RankRunner(0, analyzer))
 
 162                 self._index(runners.InterpolationRunner(analyzer), 20)
 
 163                 self._index(runners.RankRunner(30, analyzer), 20)
 
 165                 self._index(runners.RankRunner(maxrank, analyzer))
 
 168     def index_postcodes(self):
 
 169         """Index the entries ofthe location_postcode table.
 
 171         LOG.warning("Starting indexing postcodes using %s threads", self.num_threads)
 
 173         self._index(runners.PostcodeRunner(), 20)
 
 176     def update_status_table(self):
 
 177         """ Update the status in the status table to 'indexed'.
 
 179         with connect(self.dsn) as conn:
 
 180             with conn.cursor() as cur:
 
 181                 cur.execute('UPDATE import_status SET indexed = true')
 
 185     def _index(self, runner, batch=1):
 
 186         """ Index a single rank or table. `runner` describes the SQL to use
 
 187             for indexing. `batch` describes the number of objects that
 
 188             should be processed with a single SQL statement
 
 190         LOG.warning("Starting %s (using batch size %s)", runner.name(), batch)
 
 192         with connect(self.dsn) as conn:
 
 193             psycopg2.extras.register_hstore(conn)
 
 194             with conn.cursor() as cur:
 
 195                 total_tuples = cur.scalar(runner.sql_count_objects())
 
 196                 LOG.debug("Total number of rows: %i", total_tuples)
 
 200             progress = ProgressLogger(runner.name(), total_tuples)
 
 203                 with conn.cursor(name='places') as cur:
 
 204                     cur.execute(runner.sql_get_objects())
 
 206                     with PlaceFetcher(self.dsn, conn) as fetcher:
 
 207                         with WorkerPool(self.dsn, self.num_threads) as pool:
 
 208                             has_more = fetcher.fetch_next_batch(cur, runner)
 
 210                                 places = fetcher.get_batch()
 
 212                                 # asynchronously get the next batch
 
 213                                 has_more = fetcher.fetch_next_batch(cur, runner)
 
 215                                 # And insert the curent batch
 
 216                                 for idx in range(0, len(places), batch):
 
 217                                     part = places[idx:idx + batch]
 
 218                                     LOG.debug("Processing places: %s", str(part))
 
 219                                     runner.index_places(pool.next_free_worker(), part)
 
 220                                     progress.add(len(part))
 
 222                             LOG.info("Wait time: fetcher: %.2fs,  pool: %.2fs",
 
 223                                      fetcher.wait_time, pool.wait_time)