]> git.openstreetmap.org Git - nominatim.git/blob - nominatim/nominatim.py
610e3de207c7542862c53d8b36c4023614855c7b
[nominatim.git] / nominatim / nominatim.py
1 #! /usr/bin/env python
2 #-----------------------------------------------------------------------------
3 # nominatim - [description]
4 #-----------------------------------------------------------------------------
5 #
6 # Indexing tool for the Nominatim database.
7 #
8 # Based on C version by Brian Quinion
9 #
10 # This program is free software; you can redistribute it and/or
11 # modify it under the terms of the GNU General Public License
12 # as published by the Free Software Foundation; either version 2
13 # of the License, or (at your option) any later version.
14 #
15 # This program is distributed in the hope that it will be useful,
16 # but WITHOUT ANY WARRANTY; without even the implied warranty of
17 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18 # GNU General Public License for more details.
19 #
20 # You should have received a copy of the GNU General Public License
21 # along with this program; if not, write to the Free Software
22 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
23 #-----------------------------------------------------------------------------
24
25 from argparse import ArgumentParser, RawDescriptionHelpFormatter, ArgumentTypeError
26 import logging
27 import sys
28 import re
29 import getpass
30 from datetime import datetime
31 import psycopg2
32 from psycopg2.extras import wait_select
33 import select
34
35 log = logging.getLogger()
36
37 def make_connection(options, asynchronous=False):
38     return psycopg2.connect(dbname=options.dbname, user=options.user,
39                             password=options.password, host=options.host,
40                             port=options.port, async_=asynchronous)
41
42 class IndexingThread(object):
43
44     def __init__(self, thread_num, options):
45         log.debug("Creating thread {}".format(thread_num))
46         self.thread_num = thread_num
47         self.conn = make_connection(options, asynchronous=True)
48         self.wait()
49
50         self.cursor = self.conn.cursor()
51         self.perform("SET lc_messages TO 'C'")
52         self.wait()
53
54         self.current_query = None
55
56     def wait(self):
57         wait_select(self.conn)
58         self.current_query = None
59
60     def perform(self, sql, args=None):
61         self.current_query = sql
62         self.cursor.execute(sql, args)
63
64     def is_done(self):
65         if self.current_query is None:
66             return True
67
68         try:
69             if self.conn.poll() == psycopg2.extensions.POLL_OK:
70                 self.current_query = None
71                 return True
72         except psycopg2.extensions.TransactionRollbackError as e:
73             if e.pgcode is None:
74                 raise RuntimeError("Postgres exception has no error code")
75             if e.pgcode == '40P01':
76                 log.info("Deadlock detected, retry.")
77                 self.cursor.execute(sql)
78             else:
79                 raise
80
81
82
83 class Indexer(object):
84
85     def __init__(self, options):
86         self.options = options
87         self.conn = make_connection(options)
88
89         self.threads = []
90         self.poll = select.poll()
91         for i in range(options.threads):
92             t = IndexingThread(i, options)
93             self.threads.append(t)
94             self.poll.register(t.conn.fileno(), select.EPOLLIN)
95         self.next_thread = 0
96
97     def run(self):
98         log.info("Starting indexing rank ({} to {}) using {} threads".format(
99                  self.options.minrank, self.options.maxrank,
100                  self.options.threads))
101
102         for rank in range(self.options.minrank, 30):
103             self.index(RankRunner(rank))
104
105         if self.options.maxrank >= 30:
106             self.index(InterpolationRunner())
107             self.index(RankRunner(30))
108
109     def index(self, obj):
110         log.info("Starting {}".format(obj.name()))
111
112         cur = self.conn.cursor(name="main")
113         cur.execute(obj.sql_index_sectors())
114
115         total_tuples = 0
116         for r in cur:
117             total_tuples += r[1]
118         log.debug("Total number of rows; {}".format(total_tuples))
119
120         cur.scroll(0, mode='absolute')
121
122         done_tuples = 0
123         rank_start_time = datetime.now()
124         for r in cur:
125             sector = r[0]
126
127             # Should we do the remaining ones together?
128             do_all = total_tuples - done_tuples < len(self.threads) * 1000
129
130             pcur = self.conn.cursor(name='places')
131
132             if do_all:
133                 pcur.execute(obj.sql_nosector_places())
134             else:
135                 pcur.execute(obj.sql_sector_places(), (sector, ))
136
137             for place in pcur:
138                 place_id = place[0]
139                 log.debug("Processing place {}".format(place_id))
140                 thread = self.find_free_thread()
141
142                 thread.perform(obj.sql_index_place(), (place_id,))
143                 done_tuples += 1
144
145             pcur.close()
146
147             if do_all:
148                 break
149
150         cur.close()
151
152         for t in self.threads:
153             t.wait()
154
155         rank_end_time = datetime.now()
156         diff_seconds = (rank_end_time-rank_start_time).total_seconds()
157
158         log.info("Done {} in {} @ {} per second - FINISHED {}\n".format(
159                  done_tuples, int(diff_seconds),
160                  done_tuples/diff_seconds, obj.name()))
161
162     def find_free_thread(self):
163         while True:
164             for t in self.threads:
165                 if t.is_done():
166                     return t
167
168             self.poll.poll()
169
170         assert(False, "Unreachable code")
171
172 class RankRunner(object):
173
174     def __init__(self, rank):
175         self.rank = rank
176
177     def name(self):
178         return "rank {}".format(self.rank)
179
180     def sql_index_sectors(self):
181         return """SELECT geometry_sector, count(*) FROM placex
182                   WHERE rank_search = {} and indexed_status > 0
183                   GROUP BY geometry_sector
184                   ORDER BY geometry_sector""".format(self.rank)
185
186     def sql_nosector_places(self):
187         return """SELECT place_id FROM placex
188                   WHERE indexed_status > 0 and rank_search = {}
189                   ORDER BY geometry_sector""".format(self.rank)
190
191     def sql_sector_places(self):
192         return """SELECT place_id FROM placex
193                   WHERE indexed_status > 0 and geometry_sector = %s
194                   ORDER BY geometry_sector"""
195
196     def sql_index_place(self):
197         return "UPDATE placex SET indexed_status = 0 WHERE place_id = %s"
198
199
200 class InterpolationRunner(object):
201
202     def name(self):
203         return "interpolation lines (location_property_osmline)"
204
205     def sql_index_sectors(self):
206         return """SELECT geometry_sector, count(*) FROM location_property_osmline
207                   WHERE indexed_status > 0
208                   GROUP BY geometry_sector
209                   ORDER BY geometry_sector"""
210
211     def sql_nosector_places(self):
212         return """SELECT place_id FROM location_property_osmline
213                   WHERE indexed_status > 0
214                   ORDER BY geometry_sector"""
215
216     def sql_sector_places(self):
217         return """SELECT place_id FROM location_property_osmline
218                   WHERE indexed_status > 0 and geometry_sector = %s
219                   ORDER BY geometry_sector"""
220
221     def sql_index_place(self):
222         return """UPDATE location_property_osmline
223                   SET indexed_status = 0 WHERE place_id = %s"""
224
225
226 def nominatim_arg_parser():
227     """ Setup the command-line parser for the tool.
228     """
229     def h(s):
230         return re.sub("\s\s+" , " ", s)
231
232     p = ArgumentParser(description=__doc__,
233                        formatter_class=RawDescriptionHelpFormatter)
234
235     p.add_argument('-d', '--database',
236                    dest='dbname', action='store', default='nominatim',
237                    help='Name of the PostgreSQL database to connect to.')
238     p.add_argument('-U', '--username',
239                    dest='user', action='store',
240                    help='PostgreSQL user name.')
241     p.add_argument('-W', '--password',
242                    dest='password_prompt', action='store_true',
243                    help='Force password prompt.')
244     p.add_argument('-H', '--host',
245                    dest='host', action='store',
246                    help='PostgreSQL server hostname or socket location.')
247     p.add_argument('-P', '--port',
248                    dest='port', action='store',
249                    help='PostgreSQL server port')
250     p.add_argument('-r', '--minrank',
251                    dest='minrank', type=int, metavar='RANK', default=0,
252                    help='Minimum/starting rank.')
253     p.add_argument('-R', '--maxrank',
254                    dest='maxrank', type=int, metavar='RANK', default=30,
255                    help='Maximum/finishing rank.')
256     p.add_argument('-t', '--threads',
257                    dest='threads', type=int, metavar='NUM', default=1,
258                    help='Number of threads to create for indexing.')
259     p.add_argument('-v', '--verbose',
260                    dest='loglevel', action='count', default=0,
261                    help='Increase verbosity')
262
263     return p
264
265 if __name__ == '__main__':
266     logging.basicConfig(stream=sys.stderr, format='%(levelname)s: %(message)s')
267
268     options = nominatim_arg_parser().parse_args(sys.argv[1:])
269
270     log.setLevel(max(3 - options.loglevel, 0) * 10)
271
272     options.password = None
273     if options.password_prompt:
274         password = getpass.getpass("Database password: ")
275         options.password = password
276
277     Indexer(options).run()