2 Main work horse for indexing (computing addresses) the database.
 
   4 # pylint: disable=C0111
 
  10 from .progress import ProgressLogger
 
  11 from ..db.async_connection import DBConnection
 
  13 LOG = logging.getLogger()
 
  16     """ Returns SQL commands for indexing one rank within the placex table.
 
  19     def __init__(self, rank):
 
  23         return "rank {}".format(self.rank)
 
  25     def sql_count_objects(self):
 
  26         return """SELECT count(*) FROM placex
 
  27                   WHERE rank_address = {} and indexed_status > 0
 
  30     def sql_get_objects(self):
 
  31         return """SELECT place_id FROM placex
 
  32                   WHERE indexed_status > 0 and rank_address = {}
 
  33                   ORDER BY geometry_sector""".format(self.rank)
 
  36     def sql_index_place(ids):
 
  37         return "UPDATE placex SET indexed_status = 0 WHERE place_id IN ({})"\
 
  38                .format(','.join((str(i) for i in ids)))
 
  41 class InterpolationRunner:
 
  42     """ Returns SQL commands for indexing the address interpolation table
 
  43         location_property_osmline.
 
  48         return "interpolation lines (location_property_osmline)"
 
  51     def sql_count_objects():
 
  52         return """SELECT count(*) FROM location_property_osmline
 
  53                   WHERE indexed_status > 0"""
 
  56     def sql_get_objects():
 
  57         return """SELECT place_id FROM location_property_osmline
 
  58                   WHERE indexed_status > 0
 
  59                   ORDER BY geometry_sector"""
 
  62     def sql_index_place(ids):
 
  63         return """UPDATE location_property_osmline
 
  64                   SET indexed_status = 0 WHERE place_id IN ({})
 
  65                """.format(','.join((str(i) for i in ids)))
 
  68     """ Returns SQL commands for indexing the administrative boundaries
 
  72     def __init__(self, rank):
 
  76         return "boundaries rank {}".format(self.rank)
 
  78     def sql_count_objects(self):
 
  79         return """SELECT count(*) FROM placex
 
  80                   WHERE indexed_status > 0
 
  82                     AND class = 'boundary' and type = 'administrative'
 
  85     def sql_get_objects(self):
 
  86         return """SELECT place_id FROM placex
 
  87                   WHERE indexed_status > 0 and rank_search = {}
 
  88                         and class = 'boundary' and type = 'administrative'
 
  89                   ORDER BY partition, admin_level
 
  93     def sql_index_place(ids):
 
  94         return "UPDATE placex SET indexed_status = 0 WHERE place_id IN ({})"\
 
  95                .format(','.join((str(i) for i in ids)))
 
  99     """ Provides the SQL commands for indexing the location_postcode table.
 
 104         return "postcodes (location_postcode)"
 
 107     def sql_count_objects():
 
 108         return 'SELECT count(*) FROM location_postcode WHERE indexed_status > 0'
 
 111     def sql_get_objects():
 
 112         return """SELECT place_id FROM location_postcode
 
 113                   WHERE indexed_status > 0
 
 114                   ORDER BY country_code, postcode"""
 
 117     def sql_index_place(ids):
 
 118         return """UPDATE location_postcode SET indexed_status = 0
 
 119                   WHERE place_id IN ({})
 
 120                """.format(','.join((str(i) for i in ids)))
 
 123 def _analyse_db_if(conn, condition):
 
 125         with conn.cursor() as cur:
 
 126             cur.execute('ANALYSE')
 
 130     """ Main indexing routine.
 
 133     def __init__(self, dsn, num_threads):
 
 135         self.num_threads = num_threads
 
 140     def _setup_connections(self):
 
 141         self.conn = psycopg2.connect(self.dsn)
 
 142         self.threads = [DBConnection(self.dsn) for _ in range(self.num_threads)]
 
 145     def _close_connections(self):
 
 150         for thread in self.threads:
 
 155     def index_full(self, analyse=True):
 
 156         """ Index the complete database. This will first index boudnaries
 
 157             followed by all other objects. When `analyse` is True, then the
 
 158             database will be analysed at the appropriate places to
 
 159             ensure that database statistics are updated.
 
 161         conn = psycopg2.connect(self.dsn)
 
 164             self.index_by_rank(0, 4)
 
 165             _analyse_db_if(conn, analyse)
 
 167             self.index_boundaries(0, 30)
 
 168             _analyse_db_if(conn, analyse)
 
 170             self.index_by_rank(5, 25)
 
 171             _analyse_db_if(conn, analyse)
 
 173             self.index_by_rank(26, 30)
 
 174             _analyse_db_if(conn, analyse)
 
 176             self.index_postcodes()
 
 177             _analyse_db_if(conn, analyse)
 
 182     def index_boundaries(self, minrank, maxrank):
 
 183         """ Index only administrative boundaries within the given rank range.
 
 185         LOG.warning("Starting indexing boundaries using %s threads",
 
 188         self._setup_connections()
 
 191             for rank in range(max(minrank, 4), min(maxrank, 26)):
 
 192                 self.index(BoundaryRunner(rank))
 
 194             self._close_connections()
 
 196     def index_by_rank(self, minrank, maxrank):
 
 197         """ Index all entries of placex in the given rank range (inclusive)
 
 198             in order of their address rank.
 
 200             When rank 30 is requested then also interpolations and
 
 201             places with address rank 0 will be indexed.
 
 203         maxrank = min(maxrank, 30)
 
 204         LOG.warning("Starting indexing rank (%i to %i) using %i threads",
 
 205                     minrank, maxrank, self.num_threads)
 
 207         self._setup_connections()
 
 210             for rank in range(max(1, minrank), maxrank):
 
 211                 self.index(RankRunner(rank))
 
 214                 self.index(RankRunner(0))
 
 215                 self.index(InterpolationRunner(), 20)
 
 216                 self.index(RankRunner(30), 20)
 
 218                 self.index(RankRunner(maxrank))
 
 220             self._close_connections()
 
 223     def index_postcodes(self):
 
 224         """Index the entries ofthe location_postcode table.
 
 226         LOG.warning("Starting indexing postcodes using %s threads", self.num_threads)
 
 228         self._setup_connections()
 
 231             self.index(PostcodeRunner(), 20)
 
 233             self._close_connections()
 
 235     def update_status_table(self):
 
 236         """ Update the status in the status table to 'indexed'.
 
 238         conn = psycopg2.connect(self.dsn)
 
 241             with conn.cursor() as cur:
 
 242                 cur.execute('UPDATE import_status SET indexed = true')
 
 248     def index(self, obj, batch=1):
 
 249         """ Index a single rank or table. `obj` describes the SQL to use
 
 250             for indexing. `batch` describes the number of objects that
 
 251             should be processed with a single SQL statement
 
 253         LOG.warning("Starting %s (using batch size %s)", obj.name(), batch)
 
 255         cur = self.conn.cursor()
 
 256         cur.execute(obj.sql_count_objects())
 
 258         total_tuples = cur.fetchone()[0]
 
 259         LOG.debug("Total number of rows: %i", total_tuples)
 
 263         progress = ProgressLogger(obj.name(), total_tuples)
 
 266             cur = self.conn.cursor(name='places')
 
 267             cur.execute(obj.sql_get_objects())
 
 269             next_thread = self.find_free_thread()
 
 271                 places = [p[0] for p in cur.fetchmany(batch)]
 
 275                 LOG.debug("Processing places: %s", str(places))
 
 276                 thread = next(next_thread)
 
 278                 thread.perform(obj.sql_index_place(places))
 
 279                 progress.add(len(places))
 
 283             for thread in self.threads:
 
 288     def find_free_thread(self):
 
 289         """ Generator that returns the next connection that is free for
 
 301             # refresh the connections occasionaly to avoid potential
 
 302             # memory leaks in Postgresql.
 
 303             if command_stat > 100000:
 
 304                 for thread in self.threads:
 
 305                     while not thread.is_done():
 
 311                 ready, _, _ = select.select(self.threads, [], [])
 
 313         assert False, "Unreachable code"