]> git.openstreetmap.org Git - nominatim.git/blob - nominatim/nominatim.py
023b4b23b9ff2fa6614e29ad6130cbd82269c5b8
[nominatim.git] / nominatim / nominatim.py
1 #! /usr/bin/env python
2 #-----------------------------------------------------------------------------
3 # nominatim - [description]
4 #-----------------------------------------------------------------------------
5 #
6 # Indexing tool for the Nominatim database.
7 #
8 # Based on C version by Brian Quinion
9 #
10 # This program is free software; you can redistribute it and/or
11 # modify it under the terms of the GNU General Public License
12 # as published by the Free Software Foundation; either version 2
13 # of the License, or (at your option) any later version.
14 #
15 # This program is distributed in the hope that it will be useful,
16 # but WITHOUT ANY WARRANTY; without even the implied warranty of
17 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18 # GNU General Public License for more details.
19 #
20 # You should have received a copy of the GNU General Public License
21 # along with this program; if not, write to the Free Software
22 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
23 #-----------------------------------------------------------------------------
24
25 from argparse import ArgumentParser, RawDescriptionHelpFormatter, ArgumentTypeError
26 import logging
27 import sys
28 import re
29 import getpass
30 from datetime import datetime
31 import psycopg2
32 from psycopg2.extras import wait_select
33 import select
34
35 log = logging.getLogger()
36
37 def make_connection(options, asynchronous=False):
38     return psycopg2.connect(dbname=options.dbname, user=options.user,
39                             password=options.password, host=options.host,
40                             port=options.port, async_=asynchronous)
41
42 class IndexingThread(object):
43
44     def __init__(self, thread_num, options):
45         log.debug("Creating thread {}".format(thread_num))
46         self.thread_num = thread_num
47         self.conn = make_connection(options, asynchronous=True)
48         self.wait()
49
50         self.cursor = self.conn.cursor()
51         self.perform("SET lc_messages TO 'C'")
52         self.wait()
53         self.perform(InterpolationRunner.prepare())
54         self.wait()
55         self.perform(RankRunner.prepare())
56         self.wait()
57
58         self.current_query = None
59
60     def wait(self):
61         wait_select(self.conn)
62         self.current_query = None
63
64     def perform(self, sql, args=None):
65         self.current_query = sql
66         self.cursor.execute(sql, args)
67
68     def is_done(self):
69         if self.current_query is None:
70             return True
71
72         try:
73             if self.conn.poll() == psycopg2.extensions.POLL_OK:
74                 self.current_query = None
75                 return True
76         except psycopg2.extensions.TransactionRollbackError as e:
77             if e.pgcode is None:
78                 raise RuntimeError("Postgres exception has no error code")
79             if e.pgcode == '40P01':
80                 log.info("Deadlock detected, retry.")
81                 self.cursor.execute(self.sql)
82             else:
83                 raise
84
85
86
87 class Indexer(object):
88
89     def __init__(self, options):
90         self.options = options
91         self.conn = make_connection(options)
92
93         self.threads = []
94         self.poll = select.poll()
95         for i in range(options.threads):
96             t = IndexingThread(i, options)
97             self.threads.append(t)
98             self.poll.register(t.conn.fileno(), select.EPOLLIN)
99         self.next_thread = 0
100
101     def run(self):
102         log.info("Starting indexing rank ({} to {}) using {} threads".format(
103                  self.options.minrank, self.options.maxrank,
104                  self.options.threads))
105
106         for rank in range(self.options.minrank, 30):
107             self.index(RankRunner(rank))
108
109         if self.options.maxrank >= 30:
110             self.index(InterpolationRunner())
111             self.index(RankRunner(30))
112
113     def index(self, obj):
114         log.info("Starting {}".format(obj.name()))
115
116         cur = self.conn.cursor(name="main")
117         cur.execute(obj.sql_index_sectors())
118
119         total_tuples = 0
120         for r in cur:
121             total_tuples += r[1]
122         log.debug("Total number of rows; {}".format(total_tuples))
123
124         cur.scroll(0, mode='absolute')
125
126         done_tuples = 0
127         rank_start_time = datetime.now()
128         for r in cur:
129             sector = r[0]
130
131             # Should we do the remaining ones together?
132             do_all = total_tuples - done_tuples < len(self.threads) * 1000
133
134             pcur = self.conn.cursor(name='places')
135
136             if do_all:
137                 pcur.execute(obj.sql_nosector_places())
138             else:
139                 pcur.execute(obj.sql_sector_places(), (sector, ))
140
141             for place in pcur:
142                 place_id = place[0]
143                 log.debug("Processing place {}".format(place_id))
144                 thread = self.find_free_thread()
145
146                 thread.perform(obj.sql_index_place(), (place_id,))
147                 done_tuples += 1
148
149             pcur.close()
150
151             if do_all:
152                 break
153
154         cur.close()
155
156         for t in self.threads:
157             t.wait()
158
159         rank_end_time = datetime.now()
160         diff_seconds = (rank_end_time-rank_start_time).total_seconds()
161
162         log.info("Done {} in {} @ {} per second - FINISHED {}\n".format(
163                  done_tuples, int(diff_seconds),
164                  done_tuples/diff_seconds, obj.name()))
165
166     def find_free_thread(self):
167         while True:
168             for t in self.threads:
169                 if t.is_done():
170                     return t
171
172             self.poll.poll()
173
174         assert(False, "Unreachable code")
175
176 class RankRunner(object):
177
178     def __init__(self, rank):
179         self.rank = rank
180
181     def name(self):
182         return "rank {}".format(self.rank)
183
184     @classmethod
185     def prepare(cls):
186         return """PREPARE rnk_index AS
187                   UPDATE placex
188                   SET indexed_status = 0 WHERE place_id = $1"""
189
190     def sql_index_sectors(self):
191         return """SELECT geometry_sector, count(*) FROM placex
192                   WHERE rank_search = {} and indexed_status > 0
193                   GROUP BY geometry_sector
194                   ORDER BY geometry_sector""".format(self.rank)
195
196     def sql_nosector_places(self):
197         return """SELECT place_id FROM placex
198                   WHERE indexed_status > 0 and rank_search = {}
199                   ORDER BY geometry_sector""".format(self.rank)
200
201     def sql_sector_places(self):
202         return """SELECT place_id FROM placex
203                   WHERE indexed_status > 0 and geometry_sector = %s
204                   ORDER BY geometry_sector"""
205
206     def sql_index_place(self):
207         return "EXECUTE rnk_index(%s)"
208
209
210 class InterpolationRunner(object):
211
212     def name(self):
213         return "interpolation lines (location_property_osmline)"
214
215     @classmethod
216     def prepare(cls):
217         return """PREPARE ipl_index AS
218                   UPDATE location_property_osmline
219                   SET indexed_status = 0 WHERE place_id = $1"""
220
221     def sql_index_sectors(self):
222         return """SELECT geometry_sector, count(*) FROM location_property_osmline
223                   WHERE indexed_status > 0
224                   GROUP BY geometry_sector
225                   ORDER BY geometry_sector"""
226
227     def sql_nosector_places(self):
228         return """SELECT place_id FROM location_property_osmline
229                   WHERE indexed_status > 0
230                   ORDER BY geometry_sector"""
231
232     def sql_sector_places(self):
233         return """SELECT place_id FROM location_property_osmline
234                   WHERE indexed_status > 0 and geometry_sector = %s
235                   ORDER BY geometry_sector"""
236
237     def sql_index_place(self):
238         return "EXECUTE ipl_index(%s)"
239
240
241 def nominatim_arg_parser():
242     """ Setup the command-line parser for the tool.
243     """
244     def h(s):
245         return re.sub("\s\s+" , " ", s)
246
247     p = ArgumentParser(description=__doc__,
248                        formatter_class=RawDescriptionHelpFormatter)
249
250     p.add_argument('-d', '--database',
251                    dest='dbname', action='store', default='nominatim',
252                    help='Name of the PostgreSQL database to connect to.')
253     p.add_argument('-U', '--username',
254                    dest='user', action='store',
255                    help='PostgreSQL user name.')
256     p.add_argument('-W', '--password',
257                    dest='password_prompt', action='store_true',
258                    help='Force password prompt.')
259     p.add_argument('-H', '--host',
260                    dest='host', action='store',
261                    help='PostgreSQL server hostname or socket location.')
262     p.add_argument('-P', '--port',
263                    dest='port', action='store',
264                    help='PostgreSQL server port')
265     p.add_argument('-r', '--minrank',
266                    dest='minrank', type=int, metavar='RANK', default=0,
267                    help='Minimum/starting rank.')
268     p.add_argument('-R', '--maxrank',
269                    dest='maxrank', type=int, metavar='RANK', default=30,
270                    help='Maximum/finishing rank.')
271     p.add_argument('-t', '--threads',
272                    dest='threads', type=int, metavar='NUM', default=1,
273                    help='Number of threads to create for indexing.')
274     p.add_argument('-v', '--verbose',
275                    dest='loglevel', action='count', default=0,
276                    help='Increase verbosity')
277
278     return p
279
280 if __name__ == '__main__':
281     logging.basicConfig(stream=sys.stderr, format='%(levelname)s: %(message)s')
282
283     options = nominatim_arg_parser().parse_args(sys.argv[1:])
284
285     log.setLevel(max(3 - options.loglevel, 0) * 10)
286
287     options.password = None
288     if options.password_prompt:
289         password = getpass.getpass("Database password: ")
290         options.password = password
291
292     Indexer(options).run()