]> git.openstreetmap.org Git - nominatim.git/commitdiff
indexer: allow batch processing of places
authorSarah Hoffmann <lonvia@denofr.de>
Wed, 29 Jul 2020 15:20:30 +0000 (17:20 +0200)
committerSarah Hoffmann <lonvia@denofr.de>
Mon, 3 Aug 2020 08:32:39 +0000 (10:32 +0200)
Request and process multiple place_ids at once so that
Postgres can make better use of caching and there are less
transactions running.

nominatim/indexer/progress.py
nominatim/nominatim.py

index 8324b6bb23ae22e3ac077bfe1dc0272b843e87c4..456d3eae08aeff9258cc4446f709241451070563 100644 (file)
@@ -21,7 +21,7 @@ class ProgressLogger(object):
         self.total_places = total
         self.done_places = 0
         self.rank_start_time = datetime.now()
-        self.next_info = 50 if log.isEnabledFor(logging.INFO) else total + 1
+        self.next_info = 100 if log.isEnabledFor(logging.INFO) else total + 1
 
     def add(self, num=1):
         """ Mark `num` places as processed. Print a log message if the
index a82217375f19a40f8ddf41b09977163a20f20b7a..e8600ca8dc41bb69cd19491c8a2b5abc01c302d1 100755 (executable)
@@ -67,8 +67,9 @@ class RankRunner(object):
                   WHERE indexed_status > 0 and rank_search = {}
                   ORDER BY geometry_sector""".format(self.rank)
 
-    def sql_index_place(self):
-        return "UPDATE placex SET indexed_status = 0 WHERE place_id = %s"
+    def sql_index_place(self, ids):
+        return "UPDATE placex SET indexed_status = 0 WHERE place_id IN ({})"\
+               .format(','.join((str(i) for i in ids)))
 
 
 class InterpolationRunner(object):
@@ -88,9 +89,10 @@ class InterpolationRunner(object):
                   WHERE indexed_status > 0
                   ORDER BY geometry_sector"""
 
-    def sql_index_place(self):
+    def sql_index_place(self, ids):
         return """UPDATE location_property_osmline
-                  SET indexed_status = 0 WHERE place_id = %s"""
+                  SET indexed_status = 0 WHERE place_id IN ({})"""\
+               .format(','.join((str(i) for i in ids)))
 
 
 class DBConnection(object):
@@ -199,13 +201,14 @@ class Indexer(object):
             self.index(RankRunner(rank))
 
         if self.maxrank == 30:
-            self.index(InterpolationRunner())
+            self.index(InterpolationRunner(), 20)
 
-        self.index(RankRunner(self.maxrank))
+        self.index(RankRunner(self.maxrank), 20)
 
-    def index(self, obj):
+    def index(self, obj, batch=1):
         """ Index a single rank or table. `obj` describes the SQL to use
-            for indexing.
+            for indexing. `batch` describes the number of objects that
+            should be processed with a single SQL statement
         """
         log.warning("Starting {}".format(obj.name()))
 
@@ -223,13 +226,16 @@ class Indexer(object):
         cur = self.conn.cursor(name='places')
         cur.execute(obj.sql_get_objects())
 
-        for place in cur:
-            place_id = place[0]
-            log.debug("Processing place {}".format(place_id))
+        while True:
+            places = [p[0] for p in cur.fetchmany(batch)]
+            if len(places) == 0:
+                break
+
+            log.debug("Processing places: {}".format(places))
             thread = next(next_thread)
 
-            thread.perform(obj.sql_index_place(), (place_id,))
-            progress.add()
+            thread.perform(obj.sql_index_place(places))
+            progress.add(len(places))
 
         cur.close()