]> git.openstreetmap.org Git - nominatim.git/commitdiff
fetch place info asynchronously
authorSarah Hoffmann <lonvia@denofr.de>
Thu, 29 Apr 2021 20:16:31 +0000 (22:16 +0200)
committerSarah Hoffmann <lonvia@denofr.de>
Fri, 30 Apr 2021 15:41:08 +0000 (17:41 +0200)
nominatim/db/async_connection.py
nominatim/indexer/indexer.py
nominatim/indexer/runners.py

index 361fe0757365c40db30eb4b3e9246ed3564e39ad..a4f554965ab9144a89bb609296ee80ca82049ac3 100644 (file)
@@ -48,14 +48,14 @@ class DBConnection:
     """ A single non-blocking database connection.
     """
 
-    def __init__(self, dsn):
+    def __init__(self, dsn, cursor_factory=None):
         self.current_query = None
         self.current_params = None
         self.dsn = dsn
 
         self.conn = None
         self.cursor = None
-        self.connect()
+        self.connect(cursor_factory=cursor_factory)
 
     def close(self):
         """ Close all open connections. Does not wait for pending requests.
@@ -66,7 +66,7 @@ class DBConnection:
 
         self.conn = None
 
-    def connect(self):
+    def connect(self, cursor_factory=None):
         """ (Re)connect to the database. Creates an asynchronous connection
             with JIT and parallel processing disabled. If a connection was
             already open, it is closed and a new connection established.
@@ -79,7 +79,7 @@ class DBConnection:
         self.conn = psycopg2.connect(**{'dsn' : self.dsn, 'async' : True})
         self.wait()
 
-        self.cursor = self.conn.cursor()
+        self.cursor = self.conn.cursor(cursor_factory=cursor_factory)
         # Disable JIT and parallel workers as they are known to cause problems.
         # Update pg_settings instead of using SET because it does not yield
         # errors on older versions of Postgres where the settings are not
index 41535af8f2ae47bc7753d2cba1280fc617d4581a..d685e83a1546097366faa41fe8bc580ad5b4e660 100644 (file)
@@ -3,6 +3,7 @@ Main work horse for indexing (computing addresses) the database.
 """
 import logging
 import select
+import time
 
 import psycopg2.extras
 
@@ -183,6 +184,8 @@ class Indexer:
                 total_tuples = cur.scalar(runner.sql_count_objects())
                 LOG.debug("Total number of rows: %i", total_tuples)
 
+                # need to fetch those manually because register_hstore cannot
+                # fetch them on an asynchronous connection below.
                 hstore_oid = cur.scalar("SELECT 'hstore'::regtype::oid")
                 hstore_array_oid = cur.scalar("SELECT 'hstore[]'::regtype::oid")
 
@@ -190,11 +193,14 @@ class Indexer:
 
             progress = ProgressLogger(runner.name(), total_tuples)
 
+            fetcher_wait = 0
+            pool_wait = 0
+
             if total_tuples > 0:
                 with conn.cursor(name='places') as cur:
                     cur.execute(runner.sql_get_objects())
 
-                    fetcher = DBConnection(self.dsn)
+                    fetcher = DBConnection(self.dsn, cursor_factory=psycopg2.extras.DictCursor)
                     psycopg2.extras.register_hstore(fetcher.conn,
                                                     oid=hstore_oid,
                                                     array_oid=hstore_array_oid)
@@ -203,7 +209,9 @@ class Indexer:
                         places = self._fetch_next_batch(cur, fetcher, runner)
                         while places is not None:
                             if not places:
+                                t0 = time.time()
                                 fetcher.wait()
+                                fetcher_wait += time.time() - t0
                                 places = fetcher.cursor.fetchall()
 
                             # asynchronously get the next batch
@@ -211,7 +219,9 @@ class Indexer:
 
                             # And insert the curent batch
                             for idx in range(0, len(places), batch):
+                                t0 = time.time()
                                 worker = pool.next_free_worker()
+                                pool_wait += time.time() - t0
                                 part = places[idx:idx+batch]
                                 LOG.debug("Processing places: %s", str(part))
                                 runner.index_places(worker, part)
@@ -227,10 +237,11 @@ class Indexer:
                 conn.commit()
 
         progress.done()
+        LOG.warning("Wait time: fetcher: {}s,  pool: {}s".format(fetcher_wait, pool_wait))
 
 
     def _fetch_next_batch(self, cur, fetcher, runner):
-        ids = cur.fetchmany(1000)
+        ids = cur.fetchmany(100)
 
         if not ids:
             return None
index 75429fe427428a9bc79b7c9ec9a4f491a873028c..459f80044f90b1dc10aa61ba9741591bf3aca27a 100644 (file)
@@ -11,7 +11,7 @@ import psycopg2.extras
 class AbstractPlacexRunner:
     """ Returns SQL commands for indexing of the placex table.
     """
-    SELECT_SQL = 'SELECT place_id, (placex_prepare_update(placex)).* FROM placex'
+    SELECT_SQL = 'SELECT place_id FROM placex'
 
     def __init__(self, rank, analyzer):
         self.rank = rank
@@ -28,6 +28,12 @@ class AbstractPlacexRunner:
                """.format(','.join(["(%s, %s::hstore, %s::jsonb)"]  * num_places))
 
 
+    def get_place_details(self, worker, ids):
+        worker.perform("""SELECT place_id, (placex_prepare_update(placex)).*
+                          FROM placex WHERE place_id IN %s""",
+                       (tuple((p[0] for p in ids)), ))
+
+
     def index_places(self, worker, places):
         values = []
         for place in places: