2 #-----------------------------------------------------------------------------
3 # nominatim - [description]
4 #-----------------------------------------------------------------------------
6 # Indexing tool for the Nominatim database.
8 # Based on C version by Brian Quinion
10 # This program is free software; you can redistribute it and/or
11 # modify it under the terms of the GNU General Public License
12 # as published by the Free Software Foundation; either version 2
13 # of the License, or (at your option) any later version.
15 # This program is distributed in the hope that it will be useful,
16 # but WITHOUT ANY WARRANTY; without even the implied warranty of
17 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18 # GNU General Public License for more details.
20 # You should have received a copy of the GNU General Public License
21 # along with this program; if not, write to the Free Software
22 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
23 #-----------------------------------------------------------------------------
25 from argparse import ArgumentParser, RawDescriptionHelpFormatter, ArgumentTypeError
30 from datetime import datetime
32 from psycopg2.extras import wait_select
34 from queue import Queue
36 log = logging.getLogger()
38 def make_connection(options, asynchronous=False):
39 return psycopg2.connect(dbname=options.dbname, user=options.user,
40 password=options.password, host=options.host,
41 port=options.port, async_=asynchronous)
43 class IndexingThread(threading.Thread):
45 def __init__(self, queue, barrier, options):
47 self.conn = make_connection(options)
48 self.conn.autocommit = True
50 self.cursor = self.conn.cursor()
51 self.perform("SET lc_messages TO 'C'")
52 self.perform(InterpolationRunner.prepare())
53 self.perform(RankRunner.prepare())
55 self.barrier = barrier
60 item = self.queue.get()
63 elif isinstance(item, str):
67 self.perform(sql, (item,))
69 def perform(self, sql, args=None):
72 self.cursor.execute(sql, args)
74 except psycopg2.extensions.TransactionRollbackError as e:
76 raise RuntimeError("Postgres exception has no error code")
77 if e.pgcode == '40P01':
78 log.info("Deadlock detected, retry.")
84 class Indexer(object):
86 def __init__(self, options):
87 self.options = options
88 self.conn = make_connection(options)
91 self.queue = Queue(maxsize=1000)
92 self.barrier = threading.Barrier(options.threads + 1)
93 for i in range(options.threads):
94 t = IndexingThread(self.queue, self.barrier, options)
95 self.threads.append(t)
99 log.info("Starting indexing rank ({} to {}) using {} threads".format(
100 self.options.minrank, self.options.maxrank,
101 self.options.threads))
103 for rank in range(self.options.minrank, 30):
104 self.index(RankRunner(rank))
106 if self.options.maxrank >= 30:
107 self.index(InterpolationRunner())
108 self.index(RankRunner(30))
111 for t in self.threads:
114 def queue_all(self, item):
115 for t in self.threads:
118 def index(self, obj):
119 log.info("Starting {}".format(obj.name()))
121 self.queue_all(obj.sql_index_place())
124 cur = self.conn.cursor(name="main")
125 cur.execute(obj.sql_index_sectors())
130 log.debug("Total number of rows; {}".format(total_tuples))
132 cur.scroll(0, mode='absolute')
135 rank_start_time = datetime.now()
139 # Should we do the remaining ones together?
140 do_all = total_tuples - done_tuples < len(self.threads) * 1000
142 pcur = self.conn.cursor(name='places')
145 pcur.execute(obj.sql_nosector_places())
147 pcur.execute(obj.sql_sector_places(), (sector, ))
151 log.debug("Processing place {}".format(place_id))
153 self.queue.put(place_id)
166 rank_end_time = datetime.now()
167 diff_seconds = (rank_end_time-rank_start_time).total_seconds()
169 log.info("Done {} in {} @ {} per second - FINISHED {}\n".format(
170 done_tuples, int(diff_seconds),
171 done_tuples/diff_seconds, obj.name()))
174 class RankRunner(object):
176 def __init__(self, rank):
180 return "rank {}".format(self.rank)
184 return """PREPARE rnk_index AS
186 SET indexed_status = 0 WHERE place_id = $1"""
188 def sql_index_sectors(self):
189 return """SELECT geometry_sector, count(*) FROM placex
190 WHERE rank_search = {} and indexed_status > 0
191 GROUP BY geometry_sector
192 ORDER BY geometry_sector""".format(self.rank)
194 def sql_nosector_places(self):
195 return """SELECT place_id FROM placex
196 WHERE indexed_status > 0 and rank_search = {}
197 ORDER BY geometry_sector""".format(self.rank)
199 def sql_sector_places(self):
200 return """SELECT place_id FROM placex
201 WHERE indexed_status > 0 and geometry_sector = %s
202 ORDER BY geometry_sector"""
204 def sql_index_place(self):
205 return "EXECUTE rnk_index(%s)"
208 class InterpolationRunner(object):
211 return "interpolation lines (location_property_osmline)"
215 return """PREPARE ipl_index AS
216 UPDATE location_property_osmline
217 SET indexed_status = 0 WHERE place_id = $1"""
219 def sql_index_sectors(self):
220 return """SELECT geometry_sector, count(*) FROM location_property_osmline
221 WHERE indexed_status > 0
222 GROUP BY geometry_sector
223 ORDER BY geometry_sector"""
225 def sql_nosector_places(self):
226 return """SELECT place_id FROM location_property_osmline
227 WHERE indexed_status > 0
228 ORDER BY geometry_sector"""
230 def sql_sector_places(self):
231 return """SELECT place_id FROM location_property_osmline
232 WHERE indexed_status > 0 and geometry_sector = %s
233 ORDER BY geometry_sector"""
235 def sql_index_place(self):
236 return "EXECUTE ipl_index(%s)"
239 def nominatim_arg_parser():
240 """ Setup the command-line parser for the tool.
243 return re.sub("\s\s+" , " ", s)
245 p = ArgumentParser(description=__doc__,
246 formatter_class=RawDescriptionHelpFormatter)
248 p.add_argument('-d', '--database',
249 dest='dbname', action='store', default='nominatim',
250 help='Name of the PostgreSQL database to connect to.')
251 p.add_argument('-U', '--username',
252 dest='user', action='store',
253 help='PostgreSQL user name.')
254 p.add_argument('-W', '--password',
255 dest='password_prompt', action='store_true',
256 help='Force password prompt.')
257 p.add_argument('-H', '--host',
258 dest='host', action='store',
259 help='PostgreSQL server hostname or socket location.')
260 p.add_argument('-P', '--port',
261 dest='port', action='store',
262 help='PostgreSQL server port')
263 p.add_argument('-r', '--minrank',
264 dest='minrank', type=int, metavar='RANK', default=0,
265 help='Minimum/starting rank.')
266 p.add_argument('-R', '--maxrank',
267 dest='maxrank', type=int, metavar='RANK', default=30,
268 help='Maximum/finishing rank.')
269 p.add_argument('-t', '--threads',
270 dest='threads', type=int, metavar='NUM', default=1,
271 help='Number of threads to create for indexing.')
272 p.add_argument('-v', '--verbose',
273 dest='loglevel', action='count', default=0,
274 help='Increase verbosity')
278 if __name__ == '__main__':
279 logging.basicConfig(stream=sys.stderr, format='%(levelname)s: %(message)s')
281 options = nominatim_arg_parser().parse_args(sys.argv[1:])
283 log.setLevel(max(3 - options.loglevel, 0) * 10)
285 options.password = None
286 if options.password_prompt:
287 password = getpass.getpass("Database password: ")
288 options.password = password
290 Indexer(options).run()