From: Sarah Hoffmann Date: Thu, 29 Apr 2021 19:57:43 +0000 (+0200) Subject: indexer: fetch ids in batches X-Git-Tag: v4.0.0~93^2~7 X-Git-Url: https://git.openstreetmap.org/nominatim.git/commitdiff_plain/602728895e7c35ed1f434acb8af6acfaa66033f3?ds=sidebyside indexer: fetch ids in batches --- diff --git a/nominatim/indexer/indexer.py b/nominatim/indexer/indexer.py index d0c6ea0c..41535af8 100644 --- a/nominatim/indexer/indexer.py +++ b/nominatim/indexer/indexer.py @@ -183,6 +183,9 @@ class Indexer: total_tuples = cur.scalar(runner.sql_count_objects()) LOG.debug("Total number of rows: %i", total_tuples) + hstore_oid = cur.scalar("SELECT 'hstore'::regtype::oid") + hstore_array_oid = cur.scalar("SELECT 'hstore[]'::regtype::oid") + conn.commit() progress = ProgressLogger(runner.name(), total_tuples) @@ -191,20 +194,49 @@ class Indexer: with conn.cursor(name='places') as cur: cur.execute(runner.sql_get_objects()) + fetcher = DBConnection(self.dsn) + psycopg2.extras.register_hstore(fetcher.conn, + oid=hstore_oid, + array_oid=hstore_array_oid) + with WorkerPool(self.dsn, self.num_threads) as pool: - while True: - places = cur.fetchmany(batch) + places = self._fetch_next_batch(cur, fetcher, runner) + while places is not None: if not places: - break + fetcher.wait() + places = fetcher.cursor.fetchall() + + # asynchronously get the next batch + next_places = self._fetch_next_batch(cur, fetcher, runner) - LOG.debug("Processing places: %s", str(places)) - worker = pool.next_free_worker() + # And insert the curent batch + for idx in range(0, len(places), batch): + worker = pool.next_free_worker() + part = places[idx:idx+batch] + LOG.debug("Processing places: %s", str(part)) + runner.index_places(worker, part) + progress.add(len(part)) - runner.index_places(worker, places) - progress.add(len(places)) + places = next_places pool.finish_all() + fetcher.wait() + fetcher.close() + conn.commit() progress.done() + + + def _fetch_next_batch(self, cur, fetcher, runner): + ids = cur.fetchmany(1000) + + if not ids: + return None + + if not hasattr(runner, 'get_place_details'): + return ids + + runner.get_place_details(fetcher, ids) + return []