]> git.openstreetmap.org Git - nominatim.git/blob - nominatim/nominatim.py
nominatim: also index boundaries by rank
[nominatim.git] / nominatim / nominatim.py
1 #! /usr/bin/env python3
2 #-----------------------------------------------------------------------------
3 # nominatim - [description]
4 #-----------------------------------------------------------------------------
5 #
6 # Indexing tool for the Nominatim database.
7 #
8 # Based on C version by Brian Quinion
9 #
10 # This program is free software; you can redistribute it and/or
11 # modify it under the terms of the GNU General Public License
12 # as published by the Free Software Foundation; either version 2
13 # of the License, or (at your option) any later version.
14 #
15 # This program is distributed in the hope that it will be useful,
16 # but WITHOUT ANY WARRANTY; without even the implied warranty of
17 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18 # GNU General Public License for more details.
19 #
20 # You should have received a copy of the GNU General Public License
21 # along with this program; if not, write to the Free Software
22 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
23 #-----------------------------------------------------------------------------
24
25 from argparse import ArgumentParser, RawDescriptionHelpFormatter, ArgumentTypeError
26 import logging
27 import sys
28 import re
29 import getpass
30 from datetime import datetime
31 import select
32
33 from indexer.progress import ProgressLogger
34 from indexer.db import DBConnection, make_connection
35
36 log = logging.getLogger()
37
38 class RankRunner(object):
39     """ Returns SQL commands for indexing one rank within the placex table.
40     """
41
42     def __init__(self, rank):
43         self.rank = rank
44
45     def name(self):
46         return "rank {}".format(self.rank)
47
48     def sql_count_objects(self):
49         return """SELECT count(*) FROM placex
50                   WHERE rank_search = {} and indexed_status > 0
51                """.format(self.rank)
52
53     def sql_get_objects(self):
54         return """SELECT place_id FROM placex
55                   WHERE indexed_status > 0 and rank_search = {}
56                   ORDER BY geometry_sector""".format(self.rank)
57
58     def sql_index_place(self, ids):
59         return "UPDATE placex SET indexed_status = 0 WHERE place_id IN ({})"\
60                .format(','.join((str(i) for i in ids)))
61
62
63 class InterpolationRunner(object):
64     """ Returns SQL commands for indexing the address interpolation table
65         location_property_osmline.
66     """
67
68     def name(self):
69         return "interpolation lines (location_property_osmline)"
70
71     def sql_count_objects(self):
72         return """SELECT count(*) FROM location_property_osmline
73                   WHERE indexed_status > 0"""
74
75     def sql_get_objects(self):
76         return """SELECT place_id FROM location_property_osmline
77                   WHERE indexed_status > 0
78                   ORDER BY geometry_sector"""
79
80     def sql_index_place(self, ids):
81         return """UPDATE location_property_osmline
82                   SET indexed_status = 0 WHERE place_id IN ({})"""\
83                .format(','.join((str(i) for i in ids)))
84
85 class BoundaryRunner(object):
86     """ Returns SQL commands for indexing the administrative boundaries
87         of a certain rank.
88     """
89
90     def __init__(self, rank):
91         self.rank = rank
92
93     def name(self):
94         return "boundaries rank {}".format(self.rank)
95
96     def sql_count_objects(self):
97         return """SELECT count(*) FROM placex
98                   WHERE indexed_status > 0
99                     AND rank_search = {}
100                     AND class = 'boundary' and type = 'administrative'""".format(self.rank)
101
102     def sql_get_objects(self):
103         return """SELECT place_id FROM placex
104                   WHERE indexed_status > 0 and rank_search = {}
105                         and class = 'boundary' and type = 'administrative'
106                   ORDER BY partition, admin_level""".format(self.rank)
107
108     def sql_index_place(self, ids):
109         return "UPDATE placex SET indexed_status = 0 WHERE place_id IN ({})"\
110                .format(','.join((str(i) for i in ids)))
111
112 class Indexer(object):
113     """ Main indexing routine.
114     """
115
116     def __init__(self, options):
117         self.minrank = max(0, options.minrank)
118         self.maxrank = min(30, options.maxrank)
119         self.conn = make_connection(options)
120         self.threads = [DBConnection(options) for i in range(options.threads)]
121
122     def index_boundaries(self):
123         log.warning("Starting indexing boundaries using {} threads".format(
124                       len(self.threads)))
125
126         for rank in range(max(self.minrank, 5), min(self.maxrank, 26)):
127             self.index(BoundaryRunner(rank))
128
129     def index_by_rank(self):
130         """ Run classic indexing by rank.
131         """
132         log.warning("Starting indexing rank ({} to {}) using {} threads".format(
133                  self.minrank, self.maxrank, len(self.threads)))
134
135         for rank in range(self.minrank, self.maxrank):
136             self.index(RankRunner(rank))
137
138         if self.maxrank == 30:
139             self.index(InterpolationRunner(), 20)
140
141         self.index(RankRunner(self.maxrank), 20)
142
143     def index(self, obj, batch=1):
144         """ Index a single rank or table. `obj` describes the SQL to use
145             for indexing. `batch` describes the number of objects that
146             should be processed with a single SQL statement
147         """
148         log.warning("Starting {}".format(obj.name()))
149
150         cur = self.conn.cursor()
151         cur.execute(obj.sql_count_objects())
152
153         total_tuples = cur.fetchone()[0]
154         log.debug("Total number of rows: {}".format(total_tuples))
155
156         cur.close()
157
158         progress = ProgressLogger(obj.name(), total_tuples)
159
160         if total_tuples > 0:
161             cur = self.conn.cursor(name='places')
162             cur.execute(obj.sql_get_objects())
163
164             next_thread = self.find_free_thread()
165             while True:
166                 places = [p[0] for p in cur.fetchmany(batch)]
167                 if len(places) == 0:
168                     break
169
170                 log.debug("Processing places: {}".format(places))
171                 thread = next(next_thread)
172
173                 thread.perform(obj.sql_index_place(places))
174                 progress.add(len(places))
175
176             cur.close()
177
178             for t in self.threads:
179                 t.wait()
180
181         progress.done()
182
183     def find_free_thread(self):
184         """ Generator that returns the next connection that is free for
185             sending a query.
186         """
187         ready = self.threads
188         command_stat = 0
189
190         while True:
191             for thread in ready:
192                 if thread.is_done():
193                     command_stat += 1
194                     yield thread
195
196             # refresh the connections occasionaly to avoid potential
197             # memory leaks in Postgresql.
198             if command_stat > 100000:
199                 for t in self.threads:
200                     while not t.is_done():
201                         t.wait()
202                     t.connect()
203                 command_stat = 0
204                 ready = self.threads
205             else:
206                 ready, _, _ = select.select(self.threads, [], [])
207
208         assert False, "Unreachable code"
209
210
211 def nominatim_arg_parser():
212     """ Setup the command-line parser for the tool.
213     """
214     def h(s):
215         return re.sub("\s\s+" , " ", s)
216
217     p = ArgumentParser(description="Indexing tool for Nominatim.",
218                        formatter_class=RawDescriptionHelpFormatter)
219
220     p.add_argument('-d', '--database',
221                    dest='dbname', action='store', default='nominatim',
222                    help='Name of the PostgreSQL database to connect to.')
223     p.add_argument('-U', '--username',
224                    dest='user', action='store',
225                    help='PostgreSQL user name.')
226     p.add_argument('-W', '--password',
227                    dest='password_prompt', action='store_true',
228                    help='Force password prompt.')
229     p.add_argument('-H', '--host',
230                    dest='host', action='store',
231                    help='PostgreSQL server hostname or socket location.')
232     p.add_argument('-P', '--port',
233                    dest='port', action='store',
234                    help='PostgreSQL server port')
235     p.add_argument('-b', '--boundary-only',
236                    dest='boundary_only', action='store_true',
237                    help='Only index administrative boundaries (ignores min/maxrank).')
238     p.add_argument('-r', '--minrank',
239                    dest='minrank', type=int, metavar='RANK', default=0,
240                    help='Minimum/starting rank.')
241     p.add_argument('-R', '--maxrank',
242                    dest='maxrank', type=int, metavar='RANK', default=30,
243                    help='Maximum/finishing rank.')
244     p.add_argument('-t', '--threads',
245                    dest='threads', type=int, metavar='NUM', default=1,
246                    help='Number of threads to create for indexing.')
247     p.add_argument('-v', '--verbose',
248                    dest='loglevel', action='count', default=0,
249                    help='Increase verbosity')
250
251     return p
252
253 if __name__ == '__main__':
254     logging.basicConfig(stream=sys.stderr, format='%(levelname)s: %(message)s')
255
256     options = nominatim_arg_parser().parse_args(sys.argv[1:])
257
258     log.setLevel(max(3 - options.loglevel, 0) * 10)
259
260     options.password = None
261     if options.password_prompt:
262         password = getpass.getpass("Database password: ")
263         options.password = password
264
265     if options.boundary_only:
266         Indexer(options).index_boundaries()
267     else:
268         Indexer(options).index_by_rank()