From: Sarah Hoffmann Date: Fri, 26 Feb 2021 11:10:54 +0000 (+0100) Subject: properly close connections of indexer after use X-Git-Tag: v3.7.0~29^2~7 X-Git-Url: https://git.openstreetmap.org/nominatim.git/commitdiff_plain/3ee8d9fa75011a5e259c3b58fce671b4ff0c35fc properly close connections of indexer after use --- diff --git a/nominatim/indexer/indexer.py b/nominatim/indexer/indexer.py index 61971497..d997e522 100644 --- a/nominatim/indexer/indexer.py +++ b/nominatim/indexer/indexer.py @@ -124,8 +124,25 @@ class Indexer: """ def __init__(self, dsn, num_threads): - self.conn = psycopg2.connect(dsn) - self.threads = [DBConnection(dsn) for _ in range(num_threads)] + self.dsn = dsn + self.num_threads = num_threads + self.conn = None + self.threads = [] + + + def _setup_connections(self): + self.conn = psycopg2.connect(self.dsn) + self.threads = [DBConnection(self.dsn) for _ in range(self.num_threads)] + + + def _close_connections(self): + if self.conn: + self.conn.close() + self.conn = None + + for thread in self.threads: + thread.close() + threads = [] def index_full(self, analyse=True): @@ -134,34 +151,44 @@ class Indexer: database will be analysed at the appropriate places to ensure that database statistics are updated. """ - self.index_by_rank(0, 4) - self._analyse_db_if(analyse) + conn = psycopg2.connect(self.dsn) + + try: + self.index_by_rank(0, 4) + self._analyse_db_if(conn, analyse) - self.index_boundaries(0, 30) - self._analyse_db_if(analyse) + self.index_boundaries(0, 30) + self._analyse_db_if(conn, analyse) - self.index_by_rank(5, 25) - self._analyse_db_if(analyse) + self.index_by_rank(5, 25) + self._analyse_db_if(conn, analyse) - self.index_by_rank(26, 30) - self._analyse_db_if(analyse) + self.index_by_rank(26, 30) + self._analyse_db_if(conn, analyse) - self.index_postcodes() - self._analyse_db_if(analyse) + self.index_postcodes() + self._analyse_db_if(conn, analyse) + finally: + conn.close() - def _analyse_db_if(self, condition): + def _analyse_db_if(self, conn, condition): if condition: - with self.conn.cursor() as cur: + with conn.cursor() as cur: cur.execute('ANALYSE') def index_boundaries(self, minrank, maxrank): """ Index only administrative boundaries within the given rank range. """ LOG.warning("Starting indexing boundaries using %s threads", - len(self.threads)) + self.num_threads) + + self._setup_connections() - for rank in range(max(minrank, 4), min(maxrank, 26)): - self.index(BoundaryRunner(rank)) + try: + for rank in range(max(minrank, 4), min(maxrank, 26)): + self.index(BoundaryRunner(rank)) + finally: + self._close_connections() def index_by_rank(self, minrank, maxrank): """ Index all entries of placex in the given rank range (inclusive) @@ -172,30 +199,48 @@ class Indexer: """ maxrank = min(maxrank, 30) LOG.warning("Starting indexing rank (%i to %i) using %i threads", - minrank, maxrank, len(self.threads)) + minrank, maxrank, self.num_threads) - for rank in range(max(1, minrank), maxrank): - self.index(RankRunner(rank)) + self._setup_connections() - if maxrank == 30: - self.index(RankRunner(0)) - self.index(InterpolationRunner(), 20) - self.index(RankRunner(30), 20) - else: - self.index(RankRunner(maxrank)) + try: + for rank in range(max(1, minrank), maxrank): + self.index(RankRunner(rank)) + + if maxrank == 30: + self.index(RankRunner(0)) + self.index(InterpolationRunner(), 20) + self.index(RankRunner(30), 20) + else: + self.index(RankRunner(maxrank)) + finally: + self._close_connections() def index_postcodes(self): """Index the entries ofthe location_postcode table. """ - self.index(PostcodeRunner(), 20) + LOG.warning("Starting indexing postcodes using %s threads", self.num_threads) + + self._setup_connections() + + try: + self.index(PostcodeRunner(), 20) + finally: + self._close_connections() def update_status_table(self): """ Update the status in the status table to 'indexed'. """ - with self.conn.cursor() as cur: - cur.execute('UPDATE import_status SET indexed = true') - self.conn.commit() + conn = psycopg2.connect(self.dsn) + + try: + with conn.cursor() as cur: + cur.execute('UPDATE import_status SET indexed = true') + + conn.commit() + finally: + conn.close() def index(self, obj, batch=1): """ Index a single rank or table. `obj` describes the SQL to use