From: Sarah Hoffmann Date: Wed, 29 Jul 2020 15:20:30 +0000 (+0200) Subject: indexer: allow batch processing of places X-Git-Tag: v3.6.0~100^2 X-Git-Url: https://git.openstreetmap.org/nominatim.git/commitdiff_plain/5be084e0f5b19a798f32c6310060c1a3c206211f indexer: allow batch processing of places Request and process multiple place_ids at once so that Postgres can make better use of caching and there are less transactions running. --- diff --git a/nominatim/indexer/progress.py b/nominatim/indexer/progress.py index 8324b6bb..456d3eae 100644 --- a/nominatim/indexer/progress.py +++ b/nominatim/indexer/progress.py @@ -21,7 +21,7 @@ class ProgressLogger(object): self.total_places = total self.done_places = 0 self.rank_start_time = datetime.now() - self.next_info = 50 if log.isEnabledFor(logging.INFO) else total + 1 + self.next_info = 100 if log.isEnabledFor(logging.INFO) else total + 1 def add(self, num=1): """ Mark `num` places as processed. Print a log message if the diff --git a/nominatim/nominatim.py b/nominatim/nominatim.py index a8221737..e8600ca8 100755 --- a/nominatim/nominatim.py +++ b/nominatim/nominatim.py @@ -67,8 +67,9 @@ class RankRunner(object): WHERE indexed_status > 0 and rank_search = {} ORDER BY geometry_sector""".format(self.rank) - def sql_index_place(self): - return "UPDATE placex SET indexed_status = 0 WHERE place_id = %s" + def sql_index_place(self, ids): + return "UPDATE placex SET indexed_status = 0 WHERE place_id IN ({})"\ + .format(','.join((str(i) for i in ids))) class InterpolationRunner(object): @@ -88,9 +89,10 @@ class InterpolationRunner(object): WHERE indexed_status > 0 ORDER BY geometry_sector""" - def sql_index_place(self): + def sql_index_place(self, ids): return """UPDATE location_property_osmline - SET indexed_status = 0 WHERE place_id = %s""" + SET indexed_status = 0 WHERE place_id IN ({})"""\ + .format(','.join((str(i) for i in ids))) class DBConnection(object): @@ -199,13 +201,14 @@ class Indexer(object): self.index(RankRunner(rank)) if self.maxrank == 30: - self.index(InterpolationRunner()) + self.index(InterpolationRunner(), 20) - self.index(RankRunner(self.maxrank)) + self.index(RankRunner(self.maxrank), 20) - def index(self, obj): + def index(self, obj, batch=1): """ Index a single rank or table. `obj` describes the SQL to use - for indexing. + for indexing. `batch` describes the number of objects that + should be processed with a single SQL statement """ log.warning("Starting {}".format(obj.name())) @@ -223,13 +226,16 @@ class Indexer(object): cur = self.conn.cursor(name='places') cur.execute(obj.sql_get_objects()) - for place in cur: - place_id = place[0] - log.debug("Processing place {}".format(place_id)) + while True: + places = [p[0] for p in cur.fetchmany(batch)] + if len(places) == 0: + break + + log.debug("Processing places: {}".format(places)) thread = next(next_thread) - thread.perform(obj.sql_index_place(), (place_id,)) - progress.add() + thread.perform(obj.sql_index_place(places)) + progress.add(len(places)) cur.close()