]> git.openstreetmap.org Git - nominatim.git/blob - nominatim/nominatim.py
fix SQL and some other stuff
[nominatim.git] / nominatim / nominatim.py
1 #! /usr/bin/env python
2 #-----------------------------------------------------------------------------
3 # nominatim - [description]
4 #-----------------------------------------------------------------------------
5 #
6 # Indexing tool for the Nominatim database.
7 #
8 # Based on C version by Brian Quinion
9 #
10 # This program is free software; you can redistribute it and/or
11 # modify it under the terms of the GNU General Public License
12 # as published by the Free Software Foundation; either version 2
13 # of the License, or (at your option) any later version.
14 #
15 # This program is distributed in the hope that it will be useful,
16 # but WITHOUT ANY WARRANTY; without even the implied warranty of
17 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18 # GNU General Public License for more details.
19 #
20 # You should have received a copy of the GNU General Public License
21 # along with this program; if not, write to the Free Software
22 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
23 #-----------------------------------------------------------------------------
24
25 from argparse import ArgumentParser, RawDescriptionHelpFormatter, ArgumentTypeError
26 import logging
27 import sys
28 import re
29 import getpass
30 from datetime import datetime
31 import psycopg2
32 from psycopg2.extras import wait_select
33 import select
34
35 log = logging.getLogger()
36
37 def make_connection(options, asynchronous=False):
38     return psycopg2.connect(dbname=options.dbname, user=options.user,
39                             password=options.password, host=options.host,
40                             port=options.port, async_=asynchronous)
41
42 class IndexingThread(object):
43
44     def __init__(self, thread_num, options):
45         log.debug("Creating thread {}".format(thread_num))
46         self.thread_num = thread_num
47         self.conn = make_connection(options, asynchronous=True)
48         self.wait()
49
50         self.cursor = self.conn.cursor()
51         self.perform("SET lc_messages TO 'C'")
52         self.wait()
53         self.perform(InterpolationRunner.prepare())
54         self.wait()
55         self.perform(RankRunner.prepare())
56         self.wait()
57
58         self.current_query = None
59         self.current_params = None
60
61     def wait(self):
62         wait_select(self.conn)
63         self.current_query = None
64
65     def perform(self, sql, args=None):
66         self.current_query = sql
67         self.current_params = args
68         self.cursor.execute(sql, args)
69
70     def fileno(self):
71         return self.conn.fileno()
72
73     def is_done(self):
74         if self.current_query is None:
75             return True
76
77         try:
78             if self.conn.poll() == psycopg2.extensions.POLL_OK:
79                 self.current_query = None
80                 return True
81         except psycopg2.extensions.TransactionRollbackError as e:
82             if e.pgcode is None:
83                 raise RuntimeError("Postgres exception has no error code")
84             if e.pgcode == '40P01':
85                 log.info("Deadlock detected, retry.")
86                 self.cursor.execute(self.current_query, self.current_params)
87             else:
88                 raise
89
90
91
92 class Indexer(object):
93
94     def __init__(self, options):
95         self.options = options
96         self.conn = make_connection(options)
97
98         self.threads = []
99         for i in range(options.threads):
100             t = IndexingThread(i, options)
101             self.threads.append(t)
102
103     def run(self):
104         log.info("Starting indexing rank ({} to {}) using {} threads".format(
105                  self.options.minrank, self.options.maxrank,
106                  self.options.threads))
107
108         for rank in range(self.options.minrank, min(self.options.maxrank, 30)):
109             self.index(RankRunner(rank))
110
111         if self.options.maxrank >= 30:
112             self.index(InterpolationRunner())
113             self.index(RankRunner(30))
114
115     def index(self, obj):
116         log.info("Starting {}".format(obj.name()))
117
118         cur = self.conn.cursor(name='main')
119         cur.execute(obj.sql_index_sectors())
120
121         total_tuples = 0
122         for r in cur:
123             total_tuples += r[1]
124         log.debug("Total number of rows; {}".format(total_tuples))
125
126         cur.scroll(0, mode='absolute')
127
128         next_thread = self.find_free_thread()
129         done_tuples = 0
130         rank_start_time = datetime.now()
131
132         sector_sql = obj.sql_sector_places()
133         index_sql = obj.sql_index_place()
134         min_grouped_tuples = total_tuples - len(self.threads) * 1000
135         for r in cur:
136             sector = r[0]
137
138             # Should we do the remaining ones together?
139             do_all = done_tuples > min_grouped_tuples
140
141             pcur = self.conn.cursor(name='places')
142
143             if do_all:
144                 pcur.execute(obj.sql_nosector_places())
145             else:
146                 pcur.execute(sector_sql, (sector, ))
147
148             for place in pcur:
149                 place_id = place[0]
150                 log.debug("Processing place {}".format(place_id))
151                 thread = next(next_thread)
152
153                 thread.perform(index_sql, (place_id,))
154                 done_tuples += 1
155
156             pcur.close()
157
158             if do_all:
159                 break
160
161         cur.close()
162
163         for t in self.threads:
164             t.wait()
165
166         rank_end_time = datetime.now()
167         diff_seconds = (rank_end_time-rank_start_time).total_seconds()
168
169         log.info("Done {}/{} in {} @ {} per second - FINISHED {}\n".format(
170                  done_tuples, total_tuples, int(diff_seconds),
171                  done_tuples/diff_seconds, obj.name()))
172
173     def find_free_thread(self):
174         ready = self.threads
175
176         while True:
177             for thread in ready:
178                 if thread.is_done():
179                     yield thread
180
181             ready, _, _ = select.select(self.threads, [], [])
182
183         assert(False, "Unreachable code")
184
185 class RankRunner(object):
186
187     def __init__(self, rank):
188         self.rank = rank
189
190     def name(self):
191         return "rank {}".format(self.rank)
192
193     @classmethod
194     def prepare(cls):
195         return """PREPARE rnk_index AS
196                   UPDATE placex
197                   SET indexed_status = 0 WHERE place_id = $1"""
198
199     def sql_index_sectors(self):
200         return """SELECT geometry_sector, count(*) FROM placex
201                   WHERE rank_search = {} and indexed_status > 0
202                   GROUP BY geometry_sector
203                   ORDER BY geometry_sector""".format(self.rank)
204
205     def sql_nosector_places(self):
206         return """SELECT place_id FROM placex
207                   WHERE indexed_status > 0 and rank_search = {}
208                   ORDER BY geometry_sector""".format(self.rank)
209
210     def sql_sector_places(self):
211         return """SELECT place_id FROM placex
212                   WHERE indexed_status > 0 and rank_search = {}
213                         and geometry_sector = %s""".format(self.rank)
214
215     def sql_index_place(self):
216         return "EXECUTE rnk_index(%s)"
217
218
219 class InterpolationRunner(object):
220
221     def name(self):
222         return "interpolation lines (location_property_osmline)"
223
224     @classmethod
225     def prepare(cls):
226         return """PREPARE ipl_index AS
227                   UPDATE location_property_osmline
228                   SET indexed_status = 0 WHERE place_id = $1"""
229
230     def sql_index_sectors(self):
231         return """SELECT geometry_sector, count(*) FROM location_property_osmline
232                   WHERE indexed_status > 0
233                   GROUP BY geometry_sector
234                   ORDER BY geometry_sector"""
235
236     def sql_nosector_places(self):
237         return """SELECT place_id FROM location_property_osmline
238                   WHERE indexed_status > 0
239                   ORDER BY geometry_sector"""
240
241     def sql_sector_places(self):
242         return """SELECT place_id FROM location_property_osmline
243                   WHERE indexed_status > 0 and geometry_sector = %s
244                   ORDER BY geometry_sector"""
245
246     def sql_index_place(self):
247         return "EXECUTE ipl_index(%s)"
248
249
250 def nominatim_arg_parser():
251     """ Setup the command-line parser for the tool.
252     """
253     def h(s):
254         return re.sub("\s\s+" , " ", s)
255
256     p = ArgumentParser(description=__doc__,
257                        formatter_class=RawDescriptionHelpFormatter)
258
259     p.add_argument('-d', '--database',
260                    dest='dbname', action='store', default='nominatim',
261                    help='Name of the PostgreSQL database to connect to.')
262     p.add_argument('-U', '--username',
263                    dest='user', action='store',
264                    help='PostgreSQL user name.')
265     p.add_argument('-W', '--password',
266                    dest='password_prompt', action='store_true',
267                    help='Force password prompt.')
268     p.add_argument('-H', '--host',
269                    dest='host', action='store',
270                    help='PostgreSQL server hostname or socket location.')
271     p.add_argument('-P', '--port',
272                    dest='port', action='store',
273                    help='PostgreSQL server port')
274     p.add_argument('-r', '--minrank',
275                    dest='minrank', type=int, metavar='RANK', default=0,
276                    help='Minimum/starting rank.')
277     p.add_argument('-R', '--maxrank',
278                    dest='maxrank', type=int, metavar='RANK', default=30,
279                    help='Maximum/finishing rank.')
280     p.add_argument('-t', '--threads',
281                    dest='threads', type=int, metavar='NUM', default=1,
282                    help='Number of threads to create for indexing.')
283     p.add_argument('-v', '--verbose',
284                    dest='loglevel', action='count', default=0,
285                    help='Increase verbosity')
286
287     return p
288
289 if __name__ == '__main__':
290     logging.basicConfig(stream=sys.stderr, format='%(levelname)s: %(message)s')
291
292     options = nominatim_arg_parser().parse_args(sys.argv[1:])
293
294     log.setLevel(max(3 - options.loglevel, 0) * 10)
295
296     options.password = None
297     if options.password_prompt:
298         password = getpass.getpass("Database password: ")
299         options.password = password
300
301     Indexer(options).run()