1 #! /usr/bin/env python3
 
   2 #-----------------------------------------------------------------------------
 
   3 # nominatim - [description]
 
   4 #-----------------------------------------------------------------------------
 
   6 # Indexing tool for the Nominatim database.
 
   8 # Based on C version by Brian Quinion
 
  10 # This program is free software; you can redistribute it and/or
 
  11 # modify it under the terms of the GNU General Public License
 
  12 # as published by the Free Software Foundation; either version 2
 
  13 # of the License, or (at your option) any later version.
 
  15 # This program is distributed in the hope that it will be useful,
 
  16 # but WITHOUT ANY WARRANTY; without even the implied warranty of
 
  17 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
  18 # GNU General Public License for more details.
 
  20 # You should have received a copy of the GNU General Public License
 
  21 # along with this program; if not, write to the Free Software
 
  22 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
 
  23 #-----------------------------------------------------------------------------
 
  25 from argparse import ArgumentParser, RawDescriptionHelpFormatter, ArgumentTypeError
 
  30 from datetime import datetime
 
  32 from psycopg2.extras import wait_select
 
  35 log = logging.getLogger()
 
  37 def make_connection(options, asynchronous=False):
 
  38     params = {'dbname' : options.dbname,
 
  39               'user' : options.user,
 
  40               'password' : options.password,
 
  41               'host' : options.host,
 
  42               'port' : options.port,
 
  43               'async' : asynchronous}
 
  45     return psycopg2.connect(**params)
 
  48 class RankRunner(object):
 
  49     """ Returns SQL commands for indexing one rank within the placex table.
 
  52     def __init__(self, rank):
 
  56         return "rank {}".format(self.rank)
 
  58     def sql_index_sectors(self):
 
  59         return """SELECT geometry_sector, count(*) FROM placex
 
  60                   WHERE rank_search = {} and indexed_status > 0
 
  61                   GROUP BY geometry_sector
 
  62                   ORDER BY geometry_sector""".format(self.rank)
 
  64     def sql_nosector_places(self):
 
  65         return """SELECT place_id FROM placex
 
  66                   WHERE indexed_status > 0 and rank_search = {}
 
  67                   ORDER BY geometry_sector""".format(self.rank)
 
  69     def sql_sector_places(self):
 
  70         return """SELECT place_id FROM placex
 
  71                   WHERE indexed_status > 0 and rank_search = {}
 
  72                         and geometry_sector = %s""".format(self.rank)
 
  74     def sql_index_place(self):
 
  75         return "UPDATE placex SET indexed_status = 0 WHERE place_id = %s"
 
  78 class InterpolationRunner(object):
 
  79     """ Returns SQL commands for indexing the address interpolation table
 
  80         location_property_osmline.
 
  84         return "interpolation lines (location_property_osmline)"
 
  86     def sql_index_sectors(self):
 
  87         return """SELECT geometry_sector, count(*) FROM location_property_osmline
 
  88                   WHERE indexed_status > 0
 
  89                   GROUP BY geometry_sector
 
  90                   ORDER BY geometry_sector"""
 
  92     def sql_nosector_places(self):
 
  93         return """SELECT place_id FROM location_property_osmline
 
  94                   WHERE indexed_status > 0
 
  95                   ORDER BY geometry_sector"""
 
  97     def sql_sector_places(self):
 
  98         return """SELECT place_id FROM location_property_osmline
 
  99                   WHERE indexed_status > 0 and geometry_sector = %s
 
 100                   ORDER BY geometry_sector"""
 
 102     def sql_index_place(self):
 
 103         return """UPDATE location_property_osmline
 
 104                   SET indexed_status = 0 WHERE place_id = %s"""
 
 107 class DBConnection(object):
 
 108     """ A single non-blocking database connection.
 
 111     def __init__(self, options):
 
 112         self.current_query = None
 
 113         self.current_params = None
 
 119         if self.conn is not None:
 
 123         self.conn = make_connection(options, asynchronous=True)
 
 126         self.cursor = self.conn.cursor()
 
 127         # Disable JIT and parallel workers as they are known to cause problems.
 
 128         # Update pg_settings instead of using SET because it does not yield
 
 129         # errors on older versions of Postgres where the settings are not
 
 132             """ UPDATE pg_settings SET setting = -1 WHERE name = 'jit_above_cost';
 
 133                 UPDATE pg_settings SET setting = 0 
 
 134                    WHERE name = 'max_parallel_workers_per_gather';""")
 
 138         """ Block until any pending operation is done.
 
 142                 wait_select(self.conn)
 
 143                 self.current_query = None
 
 145             except psycopg2.extensions.TransactionRollbackError as e:
 
 146                 if e.pgcode == '40P01':
 
 147                     log.info("Deadlock detected (params = {}), retry."
 
 148                               .format(self.current_params))
 
 149                     self.cursor.execute(self.current_query, self.current_params)
 
 152             except psycopg2.errors.DeadlockDetected:
 
 153                 self.cursor.execute(self.current_query, self.current_params)
 
 155     def perform(self, sql, args=None):
 
 156         """ Send SQL query to the server. Returns immediately without
 
 159         self.current_query = sql
 
 160         self.current_params = args
 
 161         self.cursor.execute(sql, args)
 
 164         """ File descriptor to wait for. (Makes this class select()able.)
 
 166         return self.conn.fileno()
 
 169         """ Check if the connection is available for a new query.
 
 171             Also checks if the previous query has run into a deadlock.
 
 172             If so, then the previous query is repeated.
 
 174         if self.current_query is None:
 
 178             if self.conn.poll() == psycopg2.extensions.POLL_OK:
 
 179                 self.current_query = None
 
 181         except psycopg2.extensions.TransactionRollbackError as e:
 
 182             if e.pgcode == '40P01':
 
 183                 log.info("Deadlock detected (params = {}), retry.".format(self.current_params))
 
 184                 self.cursor.execute(self.current_query, self.current_params)
 
 187         except psycopg2.errors.DeadlockDetected:
 
 188             self.cursor.execute(self.current_query, self.current_params)
 
 193 class Indexer(object):
 
 194     """ Main indexing routine.
 
 197     def __init__(self, options):
 
 198         self.minrank = max(0, options.minrank)
 
 199         self.maxrank = min(30, options.maxrank)
 
 200         self.conn = make_connection(options)
 
 201         self.threads = [DBConnection(options) for i in range(options.threads)]
 
 204         """ Run indexing over the entire database.
 
 206         log.warning("Starting indexing rank ({} to {}) using {} threads".format(
 
 207                  self.minrank, self.maxrank, len(self.threads)))
 
 209         for rank in range(self.minrank, self.maxrank):
 
 210             self.index(RankRunner(rank))
 
 212         if self.maxrank == 30:
 
 213             self.index(InterpolationRunner())
 
 215         self.index(RankRunner(self.maxrank))
 
 217     def index(self, obj):
 
 218         """ Index a single rank or table. `obj` describes the SQL to use
 
 221         log.warning("Starting {}".format(obj.name()))
 
 223         cur = self.conn.cursor(name='main')
 
 224         cur.execute(obj.sql_index_sectors())
 
 229         log.debug("Total number of rows; {}".format(total_tuples))
 
 231         cur.scroll(0, mode='absolute')
 
 233         next_thread = self.find_free_thread()
 
 235         rank_start_time = datetime.now()
 
 237         sector_sql = obj.sql_sector_places()
 
 238         index_sql = obj.sql_index_place()
 
 239         min_grouped_tuples = total_tuples - len(self.threads) * 1000
 
 241         next_info = 100 if log.isEnabledFor(logging.INFO) else total_tuples + 1
 
 246             # Should we do the remaining ones together?
 
 247             do_all = done_tuples > min_grouped_tuples
 
 249             pcur = self.conn.cursor(name='places')
 
 252                 pcur.execute(obj.sql_nosector_places())
 
 254                 pcur.execute(sector_sql, (sector, ))
 
 258                 log.debug("Processing place {}".format(place_id))
 
 259                 thread = next(next_thread)
 
 261                 thread.perform(index_sql, (place_id,))
 
 264                 if done_tuples >= next_info:
 
 266                     done_time = (now - rank_start_time).total_seconds()
 
 267                     tuples_per_sec = done_tuples / done_time
 
 268                     log.info("Done {} in {} @ {:.3f} per second - {} ETA (seconds): {:.2f}"
 
 269                            .format(done_tuples, int(done_time),
 
 270                                    tuples_per_sec, obj.name(),
 
 271                                    (total_tuples - done_tuples)/tuples_per_sec))
 
 272                     next_info += int(tuples_per_sec)
 
 281         for t in self.threads:
 
 284         rank_end_time = datetime.now()
 
 285         diff_seconds = (rank_end_time-rank_start_time).total_seconds()
 
 287         log.warning("Done {}/{} in {} @ {:.3f} per second - FINISHED {}\n".format(
 
 288                  done_tuples, total_tuples, int(diff_seconds),
 
 289                  done_tuples/diff_seconds, obj.name()))
 
 291     def find_free_thread(self):
 
 292         """ Generator that returns the next connection that is free for
 
 304             # refresh the connections occasionaly to avoid potential
 
 305             # memory leaks in Postgresql.
 
 306             if command_stat > 100000:
 
 307                 for t in self.threads:
 
 308                     while not t.is_done():
 
 314                 ready, _, _ = select.select(self.threads, [], [])
 
 316         assert False, "Unreachable code"
 
 319 def nominatim_arg_parser():
 
 320     """ Setup the command-line parser for the tool.
 
 323         return re.sub("\s\s+" , " ", s)
 
 325     p = ArgumentParser(description="Indexing tool for Nominatim.",
 
 326                        formatter_class=RawDescriptionHelpFormatter)
 
 328     p.add_argument('-d', '--database',
 
 329                    dest='dbname', action='store', default='nominatim',
 
 330                    help='Name of the PostgreSQL database to connect to.')
 
 331     p.add_argument('-U', '--username',
 
 332                    dest='user', action='store',
 
 333                    help='PostgreSQL user name.')
 
 334     p.add_argument('-W', '--password',
 
 335                    dest='password_prompt', action='store_true',
 
 336                    help='Force password prompt.')
 
 337     p.add_argument('-H', '--host',
 
 338                    dest='host', action='store',
 
 339                    help='PostgreSQL server hostname or socket location.')
 
 340     p.add_argument('-P', '--port',
 
 341                    dest='port', action='store',
 
 342                    help='PostgreSQL server port')
 
 343     p.add_argument('-r', '--minrank',
 
 344                    dest='minrank', type=int, metavar='RANK', default=0,
 
 345                    help='Minimum/starting rank.')
 
 346     p.add_argument('-R', '--maxrank',
 
 347                    dest='maxrank', type=int, metavar='RANK', default=30,
 
 348                    help='Maximum/finishing rank.')
 
 349     p.add_argument('-t', '--threads',
 
 350                    dest='threads', type=int, metavar='NUM', default=1,
 
 351                    help='Number of threads to create for indexing.')
 
 352     p.add_argument('-v', '--verbose',
 
 353                    dest='loglevel', action='count', default=0,
 
 354                    help='Increase verbosity')
 
 358 if __name__ == '__main__':
 
 359     logging.basicConfig(stream=sys.stderr, format='%(levelname)s: %(message)s')
 
 361     options = nominatim_arg_parser().parse_args(sys.argv[1:])
 
 363     log.setLevel(max(3 - options.loglevel, 0) * 10)
 
 365     options.password = None
 
 366     if options.password_prompt:
 
 367         password = getpass.getpass("Database password: ")
 
 368         options.password = password
 
 370     Indexer(options).run()