import logging
import select
-import psycopg2
-
from nominatim.indexer.progress import ProgressLogger
from nominatim.indexer import runners
from nominatim.db.async_connection import DBConnection
+from nominatim.db.connection import connect
LOG = logging.getLogger()
def __init__(self, dsn, num_threads):
self.dsn = dsn
self.num_threads = num_threads
- self.conn = None
self.threads = []
def _setup_connections(self):
- self.conn = psycopg2.connect(self.dsn)
self.threads = [DBConnection(self.dsn) for _ in range(self.num_threads)]
def _close_connections(self):
- if self.conn:
- self.conn.close()
- self.conn = None
-
for thread in self.threads:
thread.close()
self.threads = []
database will be analysed at the appropriate places to
ensure that database statistics are updated.
"""
- with psycopg2.connect(self.dsn) as conn:
+ with connect(self.dsn) as conn:
conn.autocommit = True
if analyse:
try:
for rank in range(max(minrank, 4), min(maxrank, 26)):
- self.index(runners.BoundaryRunner(rank))
+ self._index(runners.BoundaryRunner(rank))
finally:
self._close_connections()
try:
for rank in range(max(1, minrank), maxrank):
- self.index(runners.RankRunner(rank))
+ self._index(runners.RankRunner(rank))
if maxrank == 30:
- self.index(runners.RankRunner(0))
- self.index(runners.InterpolationRunner(), 20)
- self.index(runners.RankRunner(30), 20)
+ self._index(runners.RankRunner(0))
+ self._index(runners.InterpolationRunner(), 20)
+ self._index(runners.RankRunner(30), 20)
else:
- self.index(runners.RankRunner(maxrank))
+ self._index(runners.RankRunner(maxrank))
finally:
self._close_connections()
self._setup_connections()
try:
- self.index(runners.PostcodeRunner(), 20)
+ self._index(runners.PostcodeRunner(), 20)
finally:
self._close_connections()
def update_status_table(self):
""" Update the status in the status table to 'indexed'.
"""
- conn = psycopg2.connect(self.dsn)
-
- try:
+ with connect(self.dsn) as conn:
with conn.cursor() as cur:
cur.execute('UPDATE import_status SET indexed = true')
conn.commit()
- finally:
- conn.close()
- def index(self, obj, batch=1):
- """ Index a single rank or table. `obj` describes the SQL to use
+ def _index(self, runner, batch=1):
+ """ Index a single rank or table. `runner` describes the SQL to use
for indexing. `batch` describes the number of objects that
should be processed with a single SQL statement
"""
- LOG.warning("Starting %s (using batch size %s)", obj.name(), batch)
+ LOG.warning("Starting %s (using batch size %s)", runner.name(), batch)
- cur = self.conn.cursor()
- cur.execute(obj.sql_count_objects())
-
- total_tuples = cur.fetchone()[0]
- LOG.debug("Total number of rows: %i", total_tuples)
+ with connect(self.dsn) as conn:
+ with conn.cursor() as cur:
+ total_tuples = cur.scalar(runner.sql_count_objects())
+ LOG.debug("Total number of rows: %i", total_tuples)
- cur.close()
+ conn.commit()
- progress = ProgressLogger(obj.name(), total_tuples)
+ progress = ProgressLogger(runner.name(), total_tuples)
- if total_tuples > 0:
- cur = self.conn.cursor(name='places')
- cur.execute(obj.sql_get_objects())
+ if total_tuples > 0:
+ with conn.cursor(name='places') as cur:
+ cur.execute(runner.sql_get_objects())
- next_thread = self.find_free_thread()
- while True:
- places = [p[0] for p in cur.fetchmany(batch)]
- if not places:
- break
+ next_thread = self.find_free_thread()
+ while True:
+ places = [p[0] for p in cur.fetchmany(batch)]
+ if not places:
+ break
- LOG.debug("Processing places: %s", str(places))
- thread = next(next_thread)
+ LOG.debug("Processing places: %s", str(places))
+ thread = next(next_thread)
- thread.perform(obj.sql_index_place(places))
- progress.add(len(places))
+ thread.perform(runner.sql_index_place(places))
+ progress.add(len(places))
- cur.close()
+ conn.commit()
- for thread in self.threads:
- thread.wait()
+ for thread in self.threads:
+ thread.wait()
progress.done()