X-Git-Url: https://git.openstreetmap.org/nominatim.git/blobdiff_plain/d89000cc3de6ecdca64e87c05836f6d4cc25b7d1..2323923becb127d01636a6eadda33f95a1e80379:/nominatim/nominatim.py diff --git a/nominatim/nominatim.py b/nominatim/nominatim.py index b29bf343..a8221737 100755 --- a/nominatim/nominatim.py +++ b/nominatim/nominatim.py @@ -32,6 +32,8 @@ import psycopg2 from psycopg2.extras import wait_select import select +from indexer.progress import ProgressLogger + log = logging.getLogger() def make_connection(options, asynchronous=False): @@ -55,22 +57,16 @@ class RankRunner(object): def name(self): return "rank {}".format(self.rank) - def sql_index_sectors(self): - return """SELECT geometry_sector, count(*) FROM placex + def sql_count_objects(self): + return """SELECT count(*) FROM placex WHERE rank_search = {} and indexed_status > 0 - GROUP BY geometry_sector - ORDER BY geometry_sector""".format(self.rank) + """.format(self.rank) - def sql_nosector_places(self): + def sql_get_objects(self): return """SELECT place_id FROM placex WHERE indexed_status > 0 and rank_search = {} ORDER BY geometry_sector""".format(self.rank) - def sql_sector_places(self): - return """SELECT place_id FROM placex - WHERE indexed_status > 0 and rank_search = {} - and geometry_sector = %s""".format(self.rank) - def sql_index_place(self): return "UPDATE placex SET indexed_status = 0 WHERE place_id = %s" @@ -83,22 +79,15 @@ class InterpolationRunner(object): def name(self): return "interpolation lines (location_property_osmline)" - def sql_index_sectors(self): - return """SELECT geometry_sector, count(*) FROM location_property_osmline - WHERE indexed_status > 0 - GROUP BY geometry_sector - ORDER BY geometry_sector""" + def sql_count_objects(self): + return """SELECT count(*) FROM location_property_osmline + WHERE indexed_status > 0""" - def sql_nosector_places(self): + def sql_get_objects(self): return """SELECT place_id FROM location_property_osmline WHERE indexed_status > 0 ORDER BY geometry_sector""" - def sql_sector_places(self): - return """SELECT place_id FROM location_property_osmline - WHERE indexed_status > 0 and geometry_sector = %s - ORDER BY geometry_sector""" - def sql_index_place(self): return """UPDATE location_property_osmline SET indexed_status = 0 WHERE place_id = %s""" @@ -220,73 +209,34 @@ class Indexer(object): """ log.warning("Starting {}".format(obj.name())) - cur = self.conn.cursor(name='main') - cur.execute(obj.sql_index_sectors()) + cur = self.conn.cursor() + cur.execute(obj.sql_count_objects()) - total_tuples = 0 - for r in cur: - total_tuples += r[1] - log.debug("Total number of rows; {}".format(total_tuples)) + total_tuples = cur.fetchone()[0] + log.debug("Total number of rows: {}".format(total_tuples)) - cur.scroll(0, mode='absolute') + cur.close() next_thread = self.find_free_thread() - done_tuples = 0 - rank_start_time = datetime.now() + progress = ProgressLogger(obj.name(), total_tuples) - sector_sql = obj.sql_sector_places() - index_sql = obj.sql_index_place() - min_grouped_tuples = total_tuples - len(self.threads) * 1000 + cur = self.conn.cursor(name='places') + cur.execute(obj.sql_get_objects()) - next_info = 100 if log.isEnabledFor(logging.INFO) else total_tuples + 1 + for place in cur: + place_id = place[0] + log.debug("Processing place {}".format(place_id)) + thread = next(next_thread) - for r in cur: - sector = r[0] - - # Should we do the remaining ones together? - do_all = done_tuples > min_grouped_tuples - - pcur = self.conn.cursor(name='places') - - if do_all: - pcur.execute(obj.sql_nosector_places()) - else: - pcur.execute(sector_sql, (sector, )) - - for place in pcur: - place_id = place[0] - log.debug("Processing place {}".format(place_id)) - thread = next(next_thread) - - thread.perform(index_sql, (place_id,)) - done_tuples += 1 - - if done_tuples >= next_info: - now = datetime.now() - done_time = (now - rank_start_time).total_seconds() - tuples_per_sec = done_tuples / done_time - log.info("Done {} in {} @ {:.3f} per second - {} ETA (seconds): {:.2f}" - .format(done_tuples, int(done_time), - tuples_per_sec, obj.name(), - (total_tuples - done_tuples)/tuples_per_sec)) - next_info += int(tuples_per_sec) - - pcur.close() - - if do_all: - break + thread.perform(obj.sql_index_place(), (place_id,)) + progress.add() cur.close() for t in self.threads: t.wait() - rank_end_time = datetime.now() - diff_seconds = (rank_end_time-rank_start_time).total_seconds() - - log.warning("Done {}/{} in {} @ {:.3f} per second - FINISHED {}\n".format( - done_tuples, total_tuples, int(diff_seconds), - done_tuples/diff_seconds, obj.name())) + progress.done() def find_free_thread(self): """ Generator that returns the next connection that is free for