From 2323923becb127d01636a6eadda33f95a1e80379 Mon Sep 17 00:00:00 2001 From: Sarah Hoffmann Date: Wed, 29 Jul 2020 16:00:20 +0200 Subject: [PATCH] indexer: move progress tracker into separate class --- nominatim/indexer/__init__.py | 0 nominatim/indexer/progress.py | 52 +++++++++++++++++++++ nominatim/nominatim.py | 87 ++++++++++------------------------- 3 files changed, 77 insertions(+), 62 deletions(-) create mode 100644 nominatim/indexer/__init__.py create mode 100644 nominatim/indexer/progress.py diff --git a/nominatim/indexer/__init__.py b/nominatim/indexer/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/nominatim/indexer/progress.py b/nominatim/indexer/progress.py new file mode 100644 index 00000000..8324b6bb --- /dev/null +++ b/nominatim/indexer/progress.py @@ -0,0 +1,52 @@ +# SPDX-License-Identifier: GPL-2.0-only +# +# This file is part of Nominatim. +# Copyright (C) 2020 Sarah Hoffmann + +import logging +from datetime import datetime + +log = logging.getLogger() + +class ProgressLogger(object): + """ Tracks and prints progress for the indexing process. + `name` is the name of the indexing step being tracked. + `total` sets up the total number of items that need processing. + `log_interval` denotes the interval in seconds at which progres + should be reported. + """ + + def __init__(self, name, total, log_interval=1): + self.name = name + self.total_places = total + self.done_places = 0 + self.rank_start_time = datetime.now() + self.next_info = 50 if log.isEnabledFor(logging.INFO) else total + 1 + + def add(self, num=1): + """ Mark `num` places as processed. Print a log message if the + logging is at least info and the log interval has past. + """ + self.done_places += num + + if self.done_places >= self.next_info: + now = datetime.now() + done_time = (now - self.rank_start_time).total_seconds() + places_per_sec = self.done_places / done_time + eta = (self.total_places - self.done_places)/places_per_sec + + log.info("Done {} in {} @ {:.3f} per second - {} ETA (seconds): {:.2f}" + .format(self.done_places, int(done_time), + places_per_sec, self.name, eta)) + + self.next_info += int(places_per_sec) + + def done(self): + """ Print final staticstics about the progress. + """ + rank_end_time = datetime.now() + diff_seconds = (rank_end_time-self.rank_start_time).total_seconds() + + log.warning("Done {}/{} in {} @ {:.3f} per second - FINISHED {}\n".format( + self.done_places, self.total_places, int(diff_seconds), + self.done_places/diff_seconds, self.name)) diff --git a/nominatim/nominatim.py b/nominatim/nominatim.py index f87203af..a8221737 100755 --- a/nominatim/nominatim.py +++ b/nominatim/nominatim.py @@ -32,6 +32,8 @@ import psycopg2 from psycopg2.extras import wait_select import select +from indexer.progress import ProgressLogger + log = logging.getLogger() def make_connection(options, asynchronous=False): @@ -55,22 +57,16 @@ class RankRunner(object): def name(self): return "rank {}".format(self.rank) - def sql_index_sectors(self): - return """SELECT geometry_sector, count(*) FROM placex + def sql_count_objects(self): + return """SELECT count(*) FROM placex WHERE rank_search = {} and indexed_status > 0 - GROUP BY geometry_sector - ORDER BY geometry_sector""".format(self.rank) + """.format(self.rank) - def sql_nosector_places(self): + def sql_get_objects(self): return """SELECT place_id FROM placex WHERE indexed_status > 0 and rank_search = {} ORDER BY geometry_sector""".format(self.rank) - def sql_sector_places(self): - return """SELECT place_id FROM placex - WHERE indexed_status > 0 and rank_search = {} - and geometry_sector = %s""".format(self.rank) - def sql_index_place(self): return "UPDATE placex SET indexed_status = 0 WHERE place_id = %s" @@ -83,22 +79,15 @@ class InterpolationRunner(object): def name(self): return "interpolation lines (location_property_osmline)" - def sql_index_sectors(self): - return """SELECT geometry_sector, count(*) FROM location_property_osmline - WHERE indexed_status > 0 - GROUP BY geometry_sector - ORDER BY geometry_sector""" + def sql_count_objects(self): + return """SELECT count(*) FROM location_property_osmline + WHERE indexed_status > 0""" - def sql_nosector_places(self): + def sql_get_objects(self): return """SELECT place_id FROM location_property_osmline WHERE indexed_status > 0 ORDER BY geometry_sector""" - def sql_sector_places(self): - return """SELECT place_id FROM location_property_osmline - WHERE indexed_status > 0 and geometry_sector = %s - ORDER BY geometry_sector""" - def sql_index_place(self): return """UPDATE location_property_osmline SET indexed_status = 0 WHERE place_id = %s""" @@ -220,60 +209,34 @@ class Indexer(object): """ log.warning("Starting {}".format(obj.name())) - cur = self.conn.cursor(name='main') - cur.execute(obj.sql_index_sectors()) + cur = self.conn.cursor() + cur.execute(obj.sql_count_objects()) - total_tuples = 0 - for r in cur: - total_tuples += r[1] - log.debug("Total number of rows; {}".format(total_tuples)) + total_tuples = cur.fetchone()[0] + log.debug("Total number of rows: {}".format(total_tuples)) - cur.scroll(0, mode='absolute') + cur.close() next_thread = self.find_free_thread() - done_tuples = 0 - rank_start_time = datetime.now() - - min_grouped_tuples = total_tuples - len(self.threads) * 1000 + progress = ProgressLogger(obj.name(), total_tuples) - next_info = 100 if log.isEnabledFor(logging.INFO) else total_tuples + 1 + cur = self.conn.cursor(name='places') + cur.execute(obj.sql_get_objects()) - pcur = self.conn.cursor() + for place in cur: + place_id = place[0] + log.debug("Processing place {}".format(place_id)) + thread = next(next_thread) - for r in cur: - sector = r[0] - pcur.execute(obj.sql_sector_places(), (sector, )) + thread.perform(obj.sql_index_place(), (place_id,)) + progress.add() - for place in pcur: - place_id = place[0] - log.debug("Processing place {}".format(place_id)) - thread = next(next_thread) - - thread.perform(obj.sql_index_place(), (place_id,)) - done_tuples += 1 - - if done_tuples >= next_info: - now = datetime.now() - done_time = (now - rank_start_time).total_seconds() - tuples_per_sec = done_tuples / done_time - log.info("Done {} in {} @ {:.3f} per second - {} ETA (seconds): {:.2f}" - .format(done_tuples, int(done_time), - tuples_per_sec, obj.name(), - (total_tuples - done_tuples)/tuples_per_sec)) - next_info += int(tuples_per_sec) - - pcur.close() cur.close() for t in self.threads: t.wait() - rank_end_time = datetime.now() - diff_seconds = (rank_end_time-rank_start_time).total_seconds() - - log.warning("Done {}/{} in {} @ {:.3f} per second - FINISHED {}\n".format( - done_tuples, total_tuples, int(diff_seconds), - done_tuples/diff_seconds, obj.name())) + progress.done() def find_free_thread(self): """ Generator that returns the next connection that is free for -- 2.45.1