]> git.openstreetmap.org Git - nominatim.git/blob - nominatim/nominatim.py
619070604e1be63af9e9e881bbb910117bf1d6f4
[nominatim.git] / nominatim / nominatim.py
1 #! /usr/bin/env python
2 #-----------------------------------------------------------------------------
3 # nominatim - [description]
4 #-----------------------------------------------------------------------------
5 #
6 # Indexing tool for the Nominatim database.
7 #
8 # Based on C version by Brian Quinion
9 #
10 # This program is free software; you can redistribute it and/or
11 # modify it under the terms of the GNU General Public License
12 # as published by the Free Software Foundation; either version 2
13 # of the License, or (at your option) any later version.
14 #
15 # This program is distributed in the hope that it will be useful,
16 # but WITHOUT ANY WARRANTY; without even the implied warranty of
17 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18 # GNU General Public License for more details.
19 #
20 # You should have received a copy of the GNU General Public License
21 # along with this program; if not, write to the Free Software
22 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
23 #-----------------------------------------------------------------------------
24
25 from argparse import ArgumentParser, RawDescriptionHelpFormatter, ArgumentTypeError
26 import logging
27 import sys
28 import re
29 import getpass
30 from datetime import datetime
31 import psycopg2
32 from psycopg2.extras import wait_select
33 import threading
34 from queue import Queue
35
36 log = logging.getLogger()
37
38 def make_connection(options, asynchronous=False):
39     return psycopg2.connect(dbname=options.dbname, user=options.user,
40                             password=options.password, host=options.host,
41                             port=options.port, async_=asynchronous)
42
43 class IndexingThread(threading.Thread):
44
45     def __init__(self, queue, barrier, options):
46         super().__init__()
47         self.conn = make_connection(options)
48         self.conn.autocommit = True
49
50         self.cursor = self.conn.cursor()
51         self.perform("SET lc_messages TO 'C'")
52         self.perform(InterpolationRunner.prepare())
53         self.perform(RankRunner.prepare())
54         self.queue = queue
55         self.barrier = barrier
56
57     def run(self):
58         sql = None
59         while True:
60             item = self.queue.get()
61             if item is None:
62                 break
63             elif isinstance(item, str):
64                 sql = item
65                 self.barrier.wait()
66             else:
67                 self.perform(sql, (item,))
68
69     def perform(self, sql, args=None):
70         while True:
71             try:
72                 self.cursor.execute(sql, args)
73                 return
74             except psycopg2.extensions.TransactionRollbackError as e:
75                 if e.pgcode is None:
76                     raise RuntimeError("Postgres exception has no error code")
77                 if e.pgcode == '40P01':
78                     log.info("Deadlock detected, retry.")
79                 else:
80                     raise
81
82
83
84 class Indexer(object):
85
86     def __init__(self, options):
87         self.options = options
88         self.conn = make_connection(options)
89
90         self.threads = []
91         self.queue = Queue(maxsize=1000)
92         self.barrier = threading.Barrier(options.threads + 1)
93         for i in range(options.threads):
94             t = IndexingThread(self.queue, self.barrier, options)
95             self.threads.append(t)
96             t.start()
97
98     def run(self):
99         log.info("Starting indexing rank ({} to {}) using {} threads".format(
100                  self.options.minrank, self.options.maxrank,
101                  self.options.threads))
102
103         for rank in range(self.options.minrank, 30):
104             self.index(RankRunner(rank))
105
106         if self.options.maxrank >= 30:
107             self.index(InterpolationRunner())
108             self.index(RankRunner(30))
109
110         self.queue_all(None)
111         for t in self.threads:
112             t.join()
113
114     def queue_all(self, item):
115         for t in self.threads:
116             self.queue.put(item)
117
118     def index(self, obj):
119         log.info("Starting {}".format(obj.name()))
120
121         self.queue_all(obj.sql_index_place())
122         self.barrier.wait()
123
124         cur = self.conn.cursor(name="main")
125         cur.execute(obj.sql_index_sectors())
126
127         total_tuples = 0
128         for r in cur:
129             total_tuples += r[1]
130         log.debug("Total number of rows; {}".format(total_tuples))
131
132         cur.scroll(0, mode='absolute')
133
134         done_tuples = 0
135         rank_start_time = datetime.now()
136         for r in cur:
137             sector = r[0]
138
139             # Should we do the remaining ones together?
140             do_all = total_tuples - done_tuples < len(self.threads) * 1000
141
142             pcur = self.conn.cursor(name='places')
143
144             if do_all:
145                 pcur.execute(obj.sql_nosector_places())
146             else:
147                 pcur.execute(obj.sql_sector_places(), (sector, ))
148
149             for place in pcur:
150                 place_id = place[0]
151                 log.debug("Processing place {}".format(place_id))
152
153                 self.queue.put(place_id)
154                 done_tuples += 1
155
156             pcur.close()
157
158             if do_all:
159                 break
160
161         cur.close()
162
163         self.queue_all("")
164         self.barrier.wait()
165
166         rank_end_time = datetime.now()
167         diff_seconds = (rank_end_time-rank_start_time).total_seconds()
168
169         log.info("Done {} in {} @ {} per second - FINISHED {}\n".format(
170                  done_tuples, int(diff_seconds),
171                  done_tuples/diff_seconds, obj.name()))
172
173
174 class RankRunner(object):
175
176     def __init__(self, rank):
177         self.rank = rank
178
179     def name(self):
180         return "rank {}".format(self.rank)
181
182     @classmethod
183     def prepare(cls):
184         return """PREPARE rnk_index AS
185                   UPDATE placex
186                   SET indexed_status = 0 WHERE place_id = $1"""
187
188     def sql_index_sectors(self):
189         return """SELECT geometry_sector, count(*) FROM placex
190                   WHERE rank_search = {} and indexed_status > 0
191                   GROUP BY geometry_sector
192                   ORDER BY geometry_sector""".format(self.rank)
193
194     def sql_nosector_places(self):
195         return """SELECT place_id FROM placex
196                   WHERE indexed_status > 0 and rank_search = {}
197                   ORDER BY geometry_sector""".format(self.rank)
198
199     def sql_sector_places(self):
200         return """SELECT place_id FROM placex
201                   WHERE indexed_status > 0 and geometry_sector = %s
202                   ORDER BY geometry_sector"""
203
204     def sql_index_place(self):
205         return "EXECUTE rnk_index(%s)"
206
207
208 class InterpolationRunner(object):
209
210     def name(self):
211         return "interpolation lines (location_property_osmline)"
212
213     @classmethod
214     def prepare(cls):
215         return """PREPARE ipl_index AS
216                   UPDATE location_property_osmline
217                   SET indexed_status = 0 WHERE place_id = $1"""
218
219     def sql_index_sectors(self):
220         return """SELECT geometry_sector, count(*) FROM location_property_osmline
221                   WHERE indexed_status > 0
222                   GROUP BY geometry_sector
223                   ORDER BY geometry_sector"""
224
225     def sql_nosector_places(self):
226         return """SELECT place_id FROM location_property_osmline
227                   WHERE indexed_status > 0
228                   ORDER BY geometry_sector"""
229
230     def sql_sector_places(self):
231         return """SELECT place_id FROM location_property_osmline
232                   WHERE indexed_status > 0 and geometry_sector = %s
233                   ORDER BY geometry_sector"""
234
235     def sql_index_place(self):
236         return "EXECUTE ipl_index(%s)"
237
238
239 def nominatim_arg_parser():
240     """ Setup the command-line parser for the tool.
241     """
242     def h(s):
243         return re.sub("\s\s+" , " ", s)
244
245     p = ArgumentParser(description=__doc__,
246                        formatter_class=RawDescriptionHelpFormatter)
247
248     p.add_argument('-d', '--database',
249                    dest='dbname', action='store', default='nominatim',
250                    help='Name of the PostgreSQL database to connect to.')
251     p.add_argument('-U', '--username',
252                    dest='user', action='store',
253                    help='PostgreSQL user name.')
254     p.add_argument('-W', '--password',
255                    dest='password_prompt', action='store_true',
256                    help='Force password prompt.')
257     p.add_argument('-H', '--host',
258                    dest='host', action='store',
259                    help='PostgreSQL server hostname or socket location.')
260     p.add_argument('-P', '--port',
261                    dest='port', action='store',
262                    help='PostgreSQL server port')
263     p.add_argument('-r', '--minrank',
264                    dest='minrank', type=int, metavar='RANK', default=0,
265                    help='Minimum/starting rank.')
266     p.add_argument('-R', '--maxrank',
267                    dest='maxrank', type=int, metavar='RANK', default=30,
268                    help='Maximum/finishing rank.')
269     p.add_argument('-t', '--threads',
270                    dest='threads', type=int, metavar='NUM', default=1,
271                    help='Number of threads to create for indexing.')
272     p.add_argument('-v', '--verbose',
273                    dest='loglevel', action='count', default=0,
274                    help='Increase verbosity')
275
276     return p
277
278 if __name__ == '__main__':
279     logging.basicConfig(stream=sys.stderr, format='%(levelname)s: %(message)s')
280
281     options = nominatim_arg_parser().parse_args(sys.argv[1:])
282
283     log.setLevel(max(3 - options.loglevel, 0) * 10)
284
285     options.password = None
286     if options.password_prompt:
287         password = getpass.getpass("Database password: ")
288         options.password = password
289
290     Indexer(options).run()