2 Main work horse for indexing (computing addresses) the database.
 
   4 # pylint: disable=C0111
 
  10 from .progress import ProgressLogger
 
  11 from ..db.async_connection import DBConnection
 
  13 LOG = logging.getLogger()
 
  16     """ Returns SQL commands for indexing one rank within the placex table.
 
  19     def __init__(self, rank):
 
  23         return "rank {}".format(self.rank)
 
  25     def sql_count_objects(self):
 
  26         return """SELECT count(*) FROM placex
 
  27                   WHERE rank_address = {} and indexed_status > 0
 
  30     def sql_get_objects(self):
 
  31         return """SELECT place_id FROM placex
 
  32                   WHERE indexed_status > 0 and rank_address = {}
 
  33                   ORDER BY geometry_sector""".format(self.rank)
 
  36     def sql_index_place(ids):
 
  37         return "UPDATE placex SET indexed_status = 0 WHERE place_id IN ({})"\
 
  38                .format(','.join((str(i) for i in ids)))
 
  41 class InterpolationRunner:
 
  42     """ Returns SQL commands for indexing the address interpolation table
 
  43         location_property_osmline.
 
  48         return "interpolation lines (location_property_osmline)"
 
  51     def sql_count_objects():
 
  52         return """SELECT count(*) FROM location_property_osmline
 
  53                   WHERE indexed_status > 0"""
 
  56     def sql_get_objects():
 
  57         return """SELECT place_id FROM location_property_osmline
 
  58                   WHERE indexed_status > 0
 
  59                   ORDER BY geometry_sector"""
 
  62     def sql_index_place(ids):
 
  63         return """UPDATE location_property_osmline
 
  64                   SET indexed_status = 0 WHERE place_id IN ({})
 
  65                """.format(','.join((str(i) for i in ids)))
 
  68     """ Returns SQL commands for indexing the administrative boundaries
 
  72     def __init__(self, rank):
 
  76         return "boundaries rank {}".format(self.rank)
 
  78     def sql_count_objects(self):
 
  79         return """SELECT count(*) FROM placex
 
  80                   WHERE indexed_status > 0
 
  82                     AND class = 'boundary' and type = 'administrative'
 
  85     def sql_get_objects(self):
 
  86         return """SELECT place_id FROM placex
 
  87                   WHERE indexed_status > 0 and rank_search = {}
 
  88                         and class = 'boundary' and type = 'administrative'
 
  89                   ORDER BY partition, admin_level
 
  93     def sql_index_place(ids):
 
  94         return "UPDATE placex SET indexed_status = 0 WHERE place_id IN ({})"\
 
  95                .format(','.join((str(i) for i in ids)))
 
  99     """ Provides the SQL commands for indexing the location_postcode table.
 
 104         return "postcodes (location_postcode)"
 
 107     def sql_count_objects():
 
 108         return 'SELECT count(*) FROM location_postcode WHERE indexed_status > 0'
 
 111     def sql_get_objects():
 
 112         return """SELECT place_id FROM location_postcode
 
 113                   WHERE indexed_status > 0
 
 114                   ORDER BY country_code, postcode"""
 
 117     def sql_index_place(ids):
 
 118         return """UPDATE location_postcode SET indexed_status = 0
 
 119                   WHERE place_id IN ({})
 
 120                """.format(','.join((str(i) for i in ids)))
 
 123 def _analyse_db_if(conn, condition):
 
 125         with conn.cursor() as cur:
 
 126             cur.execute('ANALYSE')
 
 130     """ Main indexing routine.
 
 133     def __init__(self, dsn, num_threads):
 
 135         self.num_threads = num_threads
 
 140     def _setup_connections(self):
 
 141         self.conn = psycopg2.connect(self.dsn)
 
 142         self.threads = [DBConnection(self.dsn) for _ in range(self.num_threads)]
 
 145     def _close_connections(self):
 
 150         for thread in self.threads:
 
 155     def index_full(self, analyse=True):
 
 156         """ Index the complete database. This will first index boudnaries
 
 157             followed by all other objects. When `analyse` is True, then the
 
 158             database will be analysed at the appropriate places to
 
 159             ensure that database statistics are updated.
 
 161         conn = psycopg2.connect(self.dsn)
 
 162         conn.autocommit = True
 
 165             self.index_by_rank(0, 4)
 
 166             _analyse_db_if(conn, analyse)
 
 168             self.index_boundaries(0, 30)
 
 169             _analyse_db_if(conn, analyse)
 
 171             self.index_by_rank(5, 25)
 
 172             _analyse_db_if(conn, analyse)
 
 174             self.index_by_rank(26, 30)
 
 175             _analyse_db_if(conn, analyse)
 
 177             self.index_postcodes()
 
 178             _analyse_db_if(conn, analyse)
 
 183     def index_boundaries(self, minrank, maxrank):
 
 184         """ Index only administrative boundaries within the given rank range.
 
 186         LOG.warning("Starting indexing boundaries using %s threads",
 
 189         self._setup_connections()
 
 192             for rank in range(max(minrank, 4), min(maxrank, 26)):
 
 193                 self.index(BoundaryRunner(rank))
 
 195             self._close_connections()
 
 197     def index_by_rank(self, minrank, maxrank):
 
 198         """ Index all entries of placex in the given rank range (inclusive)
 
 199             in order of their address rank.
 
 201             When rank 30 is requested then also interpolations and
 
 202             places with address rank 0 will be indexed.
 
 204         maxrank = min(maxrank, 30)
 
 205         LOG.warning("Starting indexing rank (%i to %i) using %i threads",
 
 206                     minrank, maxrank, self.num_threads)
 
 208         self._setup_connections()
 
 211             for rank in range(max(1, minrank), maxrank):
 
 212                 self.index(RankRunner(rank))
 
 215                 self.index(RankRunner(0))
 
 216                 self.index(InterpolationRunner(), 20)
 
 217                 self.index(RankRunner(30), 20)
 
 219                 self.index(RankRunner(maxrank))
 
 221             self._close_connections()
 
 224     def index_postcodes(self):
 
 225         """Index the entries ofthe location_postcode table.
 
 227         LOG.warning("Starting indexing postcodes using %s threads", self.num_threads)
 
 229         self._setup_connections()
 
 232             self.index(PostcodeRunner(), 20)
 
 234             self._close_connections()
 
 236     def update_status_table(self):
 
 237         """ Update the status in the status table to 'indexed'.
 
 239         conn = psycopg2.connect(self.dsn)
 
 242             with conn.cursor() as cur:
 
 243                 cur.execute('UPDATE import_status SET indexed = true')
 
 249     def index(self, obj, batch=1):
 
 250         """ Index a single rank or table. `obj` describes the SQL to use
 
 251             for indexing. `batch` describes the number of objects that
 
 252             should be processed with a single SQL statement
 
 254         LOG.warning("Starting %s (using batch size %s)", obj.name(), batch)
 
 256         cur = self.conn.cursor()
 
 257         cur.execute(obj.sql_count_objects())
 
 259         total_tuples = cur.fetchone()[0]
 
 260         LOG.debug("Total number of rows: %i", total_tuples)
 
 264         progress = ProgressLogger(obj.name(), total_tuples)
 
 267             cur = self.conn.cursor(name='places')
 
 268             cur.execute(obj.sql_get_objects())
 
 270             next_thread = self.find_free_thread()
 
 272                 places = [p[0] for p in cur.fetchmany(batch)]
 
 276                 LOG.debug("Processing places: %s", str(places))
 
 277                 thread = next(next_thread)
 
 279                 thread.perform(obj.sql_index_place(places))
 
 280                 progress.add(len(places))
 
 284             for thread in self.threads:
 
 289     def find_free_thread(self):
 
 290         """ Generator that returns the next connection that is free for
 
 302             # refresh the connections occasionaly to avoid potential
 
 303             # memory leaks in Postgresql.
 
 304             if command_stat > 100000:
 
 305                 for thread in self.threads:
 
 306                     while not thread.is_done():
 
 312                 ready, _, _ = select.select(self.threads, [], [])
 
 314         assert False, "Unreachable code"