import logging
import select
+import psycopg2
+
from .progress import ProgressLogger
-from db.async_connection import DBConnection, make_connection
+from ..db.async_connection import DBConnection
LOG = logging.getLogger()
@staticmethod
def sql_index_place(ids):
return """UPDATE location_property_osmline
- SET indexed_status = 0 WHERE place_id IN ({})"""\
- .format(','.join((str(i) for i in ids)))
+ SET indexed_status = 0 WHERE place_id IN ({})
+ """.format(','.join((str(i) for i in ids)))
class BoundaryRunner:
""" Returns SQL commands for indexing the administrative boundaries
return """SELECT count(*) FROM placex
WHERE indexed_status > 0
AND rank_search = {}
- AND class = 'boundary' and type = 'administrative'""".format(self.rank)
+ AND class = 'boundary' and type = 'administrative'
+ """.format(self.rank)
def sql_get_objects(self):
return """SELECT place_id FROM placex
WHERE indexed_status > 0 and rank_search = {}
and class = 'boundary' and type = 'administrative'
- ORDER BY partition, admin_level""".format(self.rank)
+ ORDER BY partition, admin_level
+ """.format(self.rank)
@staticmethod
def sql_index_place(ids):
return "UPDATE placex SET indexed_status = 0 WHERE place_id IN ({})"\
.format(','.join((str(i) for i in ids)))
+
+class PostcodeRunner:
+ """ Provides the SQL commands for indexing the location_postcode table.
+ """
+
+ @staticmethod
+ def name():
+ return "postcodes (location_postcode)"
+
+ @staticmethod
+ def sql_count_objects():
+ return 'SELECT count(*) FROM location_postcode WHERE indexed_status > 0'
+
+ @staticmethod
+ def sql_get_objects():
+ return """SELECT place_id FROM location_postcode
+ WHERE indexed_status > 0
+ ORDER BY country_code, postcode"""
+
+ @staticmethod
+ def sql_index_place(ids):
+ return """UPDATE location_postcode SET indexed_status = 0
+ WHERE place_id IN ({})
+ """.format(','.join((str(i) for i in ids)))
+
+
+def _analyse_db_if(conn, condition):
+ if condition:
+ with conn.cursor() as cur:
+ cur.execute('ANALYSE')
+
+
class Indexer:
""" Main indexing routine.
"""
- def __init__(self, opts):
- self.minrank = max(1, opts.minrank)
- self.maxrank = min(30, opts.maxrank)
- self.conn = make_connection(opts)
- self.threads = [DBConnection(opts) for _ in range(opts.threads)]
+ def __init__(self, dsn, num_threads):
+ self.dsn = dsn
+ self.num_threads = num_threads
+ self.conn = None
+ self.threads = []
+
+
+ def _setup_connections(self):
+ self.conn = psycopg2.connect(self.dsn)
+ self.threads = [DBConnection(self.dsn) for _ in range(self.num_threads)]
+
- def index_boundaries(self):
+ def _close_connections(self):
+ if self.conn:
+ self.conn.close()
+ self.conn = None
+
+ for thread in self.threads:
+ thread.close()
+ self.threads = []
+
+
+ def index_full(self, analyse=True):
+ """ Index the complete database. This will first index boudnaries
+ followed by all other objects. When `analyse` is True, then the
+ database will be analysed at the appropriate places to
+ ensure that database statistics are updated.
+ """
+ conn = psycopg2.connect(self.dsn)
+ conn.autocommit = True
+
+ try:
+ self.index_by_rank(0, 4)
+ _analyse_db_if(conn, analyse)
+
+ self.index_boundaries(0, 30)
+ _analyse_db_if(conn, analyse)
+
+ self.index_by_rank(5, 25)
+ _analyse_db_if(conn, analyse)
+
+ self.index_by_rank(26, 30)
+ _analyse_db_if(conn, analyse)
+
+ self.index_postcodes()
+ _analyse_db_if(conn, analyse)
+ finally:
+ conn.close()
+
+
+ def index_boundaries(self, minrank, maxrank):
+ """ Index only administrative boundaries within the given rank range.
+ """
LOG.warning("Starting indexing boundaries using %s threads",
- len(self.threads))
+ self.num_threads)
+
+ self._setup_connections()
- for rank in range(max(self.minrank, 5), min(self.maxrank, 26)):
- self.index(BoundaryRunner(rank))
+ try:
+ for rank in range(max(minrank, 4), min(maxrank, 26)):
+ self.index(BoundaryRunner(rank))
+ finally:
+ self._close_connections()
- def index_by_rank(self):
- """ Run classic indexing by rank.
+ def index_by_rank(self, minrank, maxrank):
+ """ Index all entries of placex in the given rank range (inclusive)
+ in order of their address rank.
+
+ When rank 30 is requested then also interpolations and
+ places with address rank 0 will be indexed.
"""
+ maxrank = min(maxrank, 30)
LOG.warning("Starting indexing rank (%i to %i) using %i threads",
- self.minrank, self.maxrank, len(self.threads))
+ minrank, maxrank, self.num_threads)
+
+ self._setup_connections()
+
+ try:
+ for rank in range(max(1, minrank), maxrank):
+ self.index(RankRunner(rank))
+
+ if maxrank == 30:
+ self.index(RankRunner(0))
+ self.index(InterpolationRunner(), 20)
+ self.index(RankRunner(30), 20)
+ else:
+ self.index(RankRunner(maxrank))
+ finally:
+ self._close_connections()
+
+
+ def index_postcodes(self):
+ """Index the entries ofthe location_postcode table.
+ """
+ LOG.warning("Starting indexing postcodes using %s threads", self.num_threads)
+
+ self._setup_connections()
+
+ try:
+ self.index(PostcodeRunner(), 20)
+ finally:
+ self._close_connections()
+
+ def update_status_table(self):
+ """ Update the status in the status table to 'indexed'.
+ """
+ conn = psycopg2.connect(self.dsn)
- for rank in range(max(1, self.minrank), self.maxrank):
- self.index(RankRunner(rank))
+ try:
+ with conn.cursor() as cur:
+ cur.execute('UPDATE import_status SET indexed = true')
- if self.maxrank == 30:
- self.index(RankRunner(0))
- self.index(InterpolationRunner(), 20)
- self.index(RankRunner(self.maxrank), 20)
- else:
- self.index(RankRunner(self.maxrank))
+ conn.commit()
+ finally:
+ conn.close()
def index(self, obj, batch=1):
""" Index a single rank or table. `obj` describes the SQL to use