1 #! /usr/bin/env python3
 
   2 #-----------------------------------------------------------------------------
 
   3 # nominatim - [description]
 
   4 #-----------------------------------------------------------------------------
 
   6 # Indexing tool for the Nominatim database.
 
   8 # Based on C version by Brian Quinion
 
  10 # This program is free software; you can redistribute it and/or
 
  11 # modify it under the terms of the GNU General Public License
 
  12 # as published by the Free Software Foundation; either version 2
 
  13 # of the License, or (at your option) any later version.
 
  15 # This program is distributed in the hope that it will be useful,
 
  16 # but WITHOUT ANY WARRANTY; without even the implied warranty of
 
  17 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
  18 # GNU General Public License for more details.
 
  20 # You should have received a copy of the GNU General Public License
 
  21 # along with this program; if not, write to the Free Software
 
  22 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
 
  23 #-----------------------------------------------------------------------------
 
  25 from argparse import ArgumentParser, RawDescriptionHelpFormatter, ArgumentTypeError
 
  30 from datetime import datetime
 
  33 from indexer.progress import ProgressLogger
 
  34 from indexer.db import DBConnection, make_connection
 
  36 log = logging.getLogger()
 
  38 class RankRunner(object):
 
  39     """ Returns SQL commands for indexing one rank within the placex table.
 
  42     def __init__(self, rank):
 
  46         return "rank {}".format(self.rank)
 
  48     def sql_count_objects(self):
 
  49         return """SELECT count(*) FROM placex
 
  50                   WHERE rank_address = {} and indexed_status > 0
 
  53     def sql_get_objects(self):
 
  54         return """SELECT place_id FROM placex
 
  55                   WHERE indexed_status > 0 and rank_address = {}
 
  56                   ORDER BY geometry_sector""".format(self.rank)
 
  58     def sql_index_place(self, ids):
 
  59         return "UPDATE placex SET indexed_status = 0 WHERE place_id IN ({})"\
 
  60                .format(','.join((str(i) for i in ids)))
 
  63 class InterpolationRunner(object):
 
  64     """ Returns SQL commands for indexing the address interpolation table
 
  65         location_property_osmline.
 
  69         return "interpolation lines (location_property_osmline)"
 
  71     def sql_count_objects(self):
 
  72         return """SELECT count(*) FROM location_property_osmline
 
  73                   WHERE indexed_status > 0"""
 
  75     def sql_get_objects(self):
 
  76         return """SELECT place_id FROM location_property_osmline
 
  77                   WHERE indexed_status > 0
 
  78                   ORDER BY geometry_sector"""
 
  80     def sql_index_place(self, ids):
 
  81         return """UPDATE location_property_osmline
 
  82                   SET indexed_status = 0 WHERE place_id IN ({})"""\
 
  83                .format(','.join((str(i) for i in ids)))
 
  85 class BoundaryRunner(object):
 
  86     """ Returns SQL commands for indexing the administrative boundaries
 
  90     def __init__(self, rank):
 
  94         return "boundaries rank {}".format(self.rank)
 
  96     def sql_count_objects(self):
 
  97         return """SELECT count(*) FROM placex
 
  98                   WHERE indexed_status > 0
 
 100                     AND class = 'boundary' and type = 'administrative'""".format(self.rank)
 
 102     def sql_get_objects(self):
 
 103         return """SELECT place_id FROM placex
 
 104                   WHERE indexed_status > 0 and rank_search = {}
 
 105                         and class = 'boundary' and type = 'administrative'
 
 106                   ORDER BY partition, admin_level""".format(self.rank)
 
 108     def sql_index_place(self, ids):
 
 109         return "UPDATE placex SET indexed_status = 0 WHERE place_id IN ({})"\
 
 110                .format(','.join((str(i) for i in ids)))
 
 112 class Indexer(object):
 
 113     """ Main indexing routine.
 
 116     def __init__(self, options):
 
 117         self.minrank = max(1, options.minrank)
 
 118         self.maxrank = min(30, options.maxrank)
 
 119         self.conn = make_connection(options)
 
 120         self.threads = [DBConnection(options) for i in range(options.threads)]
 
 122     def index_boundaries(self):
 
 123         log.warning("Starting indexing boundaries using {} threads".format(
 
 126         for rank in range(max(self.minrank, 5), min(self.maxrank, 26)):
 
 127             self.index(BoundaryRunner(rank))
 
 129     def index_by_rank(self):
 
 130         """ Run classic indexing by rank.
 
 132         log.warning("Starting indexing rank ({} to {}) using {} threads".format(
 
 133                  self.minrank, self.maxrank, len(self.threads)))
 
 135         for rank in range(max(1, self.minrank), self.maxrank):
 
 136             self.index(RankRunner(rank))
 
 138         if self.maxrank == 30:
 
 139             self.index(RankRunner(0))
 
 140             self.index(InterpolationRunner(), 20)
 
 141             self.index(RankRunner(self.maxrank), 20)
 
 143             self.index(RankRunner(self.maxrank))
 
 145     def index(self, obj, batch=1):
 
 146         """ Index a single rank or table. `obj` describes the SQL to use
 
 147             for indexing. `batch` describes the number of objects that
 
 148             should be processed with a single SQL statement
 
 150         log.warning("Starting %s (using batch size %s)", obj.name(), batch)
 
 152         cur = self.conn.cursor()
 
 153         cur.execute(obj.sql_count_objects())
 
 155         total_tuples = cur.fetchone()[0]
 
 156         log.debug("Total number of rows: {}".format(total_tuples))
 
 160         progress = ProgressLogger(obj.name(), total_tuples)
 
 163             cur = self.conn.cursor(name='places')
 
 164             cur.execute(obj.sql_get_objects())
 
 166             next_thread = self.find_free_thread()
 
 168                 places = [p[0] for p in cur.fetchmany(batch)]
 
 172                 log.debug("Processing places: {}".format(places))
 
 173                 thread = next(next_thread)
 
 175                 thread.perform(obj.sql_index_place(places))
 
 176                 progress.add(len(places))
 
 180             for t in self.threads:
 
 185     def find_free_thread(self):
 
 186         """ Generator that returns the next connection that is free for
 
 198             # refresh the connections occasionaly to avoid potential
 
 199             # memory leaks in Postgresql.
 
 200             if command_stat > 100000:
 
 201                 for t in self.threads:
 
 202                     while not t.is_done():
 
 208                 ready, _, _ = select.select(self.threads, [], [])
 
 210         assert False, "Unreachable code"
 
 213 def nominatim_arg_parser():
 
 214     """ Setup the command-line parser for the tool.
 
 217         return re.sub("\s\s+" , " ", s)
 
 219     p = ArgumentParser(description="Indexing tool for Nominatim.",
 
 220                        formatter_class=RawDescriptionHelpFormatter)
 
 222     p.add_argument('-d', '--database',
 
 223                    dest='dbname', action='store', default='nominatim',
 
 224                    help='Name of the PostgreSQL database to connect to.')
 
 225     p.add_argument('-U', '--username',
 
 226                    dest='user', action='store',
 
 227                    help='PostgreSQL user name.')
 
 228     p.add_argument('-W', '--password',
 
 229                    dest='password_prompt', action='store_true',
 
 230                    help='Force password prompt.')
 
 231     p.add_argument('-H', '--host',
 
 232                    dest='host', action='store',
 
 233                    help='PostgreSQL server hostname or socket location.')
 
 234     p.add_argument('-P', '--port',
 
 235                    dest='port', action='store',
 
 236                    help='PostgreSQL server port')
 
 237     p.add_argument('-b', '--boundary-only',
 
 238                    dest='boundary_only', action='store_true',
 
 239                    help='Only index administrative boundaries (ignores min/maxrank).')
 
 240     p.add_argument('-r', '--minrank',
 
 241                    dest='minrank', type=int, metavar='RANK', default=0,
 
 242                    help='Minimum/starting rank.')
 
 243     p.add_argument('-R', '--maxrank',
 
 244                    dest='maxrank', type=int, metavar='RANK', default=30,
 
 245                    help='Maximum/finishing rank.')
 
 246     p.add_argument('-t', '--threads',
 
 247                    dest='threads', type=int, metavar='NUM', default=1,
 
 248                    help='Number of threads to create for indexing.')
 
 249     p.add_argument('-v', '--verbose',
 
 250                    dest='loglevel', action='count', default=0,
 
 251                    help='Increase verbosity')
 
 255 if __name__ == '__main__':
 
 256     logging.basicConfig(stream=sys.stderr, format='%(levelname)s: %(message)s')
 
 258     options = nominatim_arg_parser().parse_args(sys.argv[1:])
 
 260     log.setLevel(max(3 - options.loglevel, 0) * 10)
 
 262     options.password = None
 
 263     if options.password_prompt:
 
 264         password = getpass.getpass("Database password: ")
 
 265         options.password = password
 
 267     if options.boundary_only:
 
 268         Indexer(options).index_boundaries()
 
 270         Indexer(options).index_by_rank()