From 3ee8d9fa75011a5e259c3b58fce671b4ff0c35fc Mon Sep 17 00:00:00 2001 From: Sarah Hoffmann Date: Fri, 26 Feb 2021 12:10:54 +0100 Subject: [PATCH] properly close connections of indexer after use --- nominatim/indexer/indexer.py | 105 +++++++++++++++++++++++++---------- 1 file changed, 75 insertions(+), 30 deletions(-) diff --git a/nominatim/indexer/indexer.py b/nominatim/indexer/indexer.py index 61971497..d997e522 100644 --- a/nominatim/indexer/indexer.py +++ b/nominatim/indexer/indexer.py @@ -124,8 +124,25 @@ class Indexer: """ def __init__(self, dsn, num_threads): - self.conn = psycopg2.connect(dsn) - self.threads = [DBConnection(dsn) for _ in range(num_threads)] + self.dsn = dsn + self.num_threads = num_threads + self.conn = None + self.threads = [] + + + def _setup_connections(self): + self.conn = psycopg2.connect(self.dsn) + self.threads = [DBConnection(self.dsn) for _ in range(self.num_threads)] + + + def _close_connections(self): + if self.conn: + self.conn.close() + self.conn = None + + for thread in self.threads: + thread.close() + threads = [] def index_full(self, analyse=True): @@ -134,34 +151,44 @@ class Indexer: database will be analysed at the appropriate places to ensure that database statistics are updated. """ - self.index_by_rank(0, 4) - self._analyse_db_if(analyse) + conn = psycopg2.connect(self.dsn) + + try: + self.index_by_rank(0, 4) + self._analyse_db_if(conn, analyse) - self.index_boundaries(0, 30) - self._analyse_db_if(analyse) + self.index_boundaries(0, 30) + self._analyse_db_if(conn, analyse) - self.index_by_rank(5, 25) - self._analyse_db_if(analyse) + self.index_by_rank(5, 25) + self._analyse_db_if(conn, analyse) - self.index_by_rank(26, 30) - self._analyse_db_if(analyse) + self.index_by_rank(26, 30) + self._analyse_db_if(conn, analyse) - self.index_postcodes() - self._analyse_db_if(analyse) + self.index_postcodes() + self._analyse_db_if(conn, analyse) + finally: + conn.close() - def _analyse_db_if(self, condition): + def _analyse_db_if(self, conn, condition): if condition: - with self.conn.cursor() as cur: + with conn.cursor() as cur: cur.execute('ANALYSE') def index_boundaries(self, minrank, maxrank): """ Index only administrative boundaries within the given rank range. """ LOG.warning("Starting indexing boundaries using %s threads", - len(self.threads)) + self.num_threads) + + self._setup_connections() - for rank in range(max(minrank, 4), min(maxrank, 26)): - self.index(BoundaryRunner(rank)) + try: + for rank in range(max(minrank, 4), min(maxrank, 26)): + self.index(BoundaryRunner(rank)) + finally: + self._close_connections() def index_by_rank(self, minrank, maxrank): """ Index all entries of placex in the given rank range (inclusive) @@ -172,30 +199,48 @@ class Indexer: """ maxrank = min(maxrank, 30) LOG.warning("Starting indexing rank (%i to %i) using %i threads", - minrank, maxrank, len(self.threads)) + minrank, maxrank, self.num_threads) - for rank in range(max(1, minrank), maxrank): - self.index(RankRunner(rank)) + self._setup_connections() - if maxrank == 30: - self.index(RankRunner(0)) - self.index(InterpolationRunner(), 20) - self.index(RankRunner(30), 20) - else: - self.index(RankRunner(maxrank)) + try: + for rank in range(max(1, minrank), maxrank): + self.index(RankRunner(rank)) + + if maxrank == 30: + self.index(RankRunner(0)) + self.index(InterpolationRunner(), 20) + self.index(RankRunner(30), 20) + else: + self.index(RankRunner(maxrank)) + finally: + self._close_connections() def index_postcodes(self): """Index the entries ofthe location_postcode table. """ - self.index(PostcodeRunner(), 20) + LOG.warning("Starting indexing postcodes using %s threads", self.num_threads) + + self._setup_connections() + + try: + self.index(PostcodeRunner(), 20) + finally: + self._close_connections() def update_status_table(self): """ Update the status in the status table to 'indexed'. """ - with self.conn.cursor() as cur: - cur.execute('UPDATE import_status SET indexed = true') - self.conn.commit() + conn = psycopg2.connect(self.dsn) + + try: + with conn.cursor() as cur: + cur.execute('UPDATE import_status SET indexed = true') + + conn.commit() + finally: + conn.close() def index(self, obj, batch=1): """ Index a single rank or table. `obj` describes the SQL to use -- 2.45.1