]> git.openstreetmap.org Git - nominatim.git/blob - nominatim/nominatim.py
indexer: move progress tracker into separate class
[nominatim.git] / nominatim / nominatim.py
1 #! /usr/bin/env python3
2 #-----------------------------------------------------------------------------
3 # nominatim - [description]
4 #-----------------------------------------------------------------------------
5 #
6 # Indexing tool for the Nominatim database.
7 #
8 # Based on C version by Brian Quinion
9 #
10 # This program is free software; you can redistribute it and/or
11 # modify it under the terms of the GNU General Public License
12 # as published by the Free Software Foundation; either version 2
13 # of the License, or (at your option) any later version.
14 #
15 # This program is distributed in the hope that it will be useful,
16 # but WITHOUT ANY WARRANTY; without even the implied warranty of
17 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18 # GNU General Public License for more details.
19 #
20 # You should have received a copy of the GNU General Public License
21 # along with this program; if not, write to the Free Software
22 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
23 #-----------------------------------------------------------------------------
24
25 from argparse import ArgumentParser, RawDescriptionHelpFormatter, ArgumentTypeError
26 import logging
27 import sys
28 import re
29 import getpass
30 from datetime import datetime
31 import psycopg2
32 from psycopg2.extras import wait_select
33 import select
34
35 from indexer.progress import ProgressLogger
36
37 log = logging.getLogger()
38
39 def make_connection(options, asynchronous=False):
40     params = {'dbname' : options.dbname,
41               'user' : options.user,
42               'password' : options.password,
43               'host' : options.host,
44               'port' : options.port,
45               'async' : asynchronous}
46
47     return psycopg2.connect(**params)
48
49
50 class RankRunner(object):
51     """ Returns SQL commands for indexing one rank within the placex table.
52     """
53
54     def __init__(self, rank):
55         self.rank = rank
56
57     def name(self):
58         return "rank {}".format(self.rank)
59
60     def sql_count_objects(self):
61         return """SELECT count(*) FROM placex
62                   WHERE rank_search = {} and indexed_status > 0
63                """.format(self.rank)
64
65     def sql_get_objects(self):
66         return """SELECT place_id FROM placex
67                   WHERE indexed_status > 0 and rank_search = {}
68                   ORDER BY geometry_sector""".format(self.rank)
69
70     def sql_index_place(self):
71         return "UPDATE placex SET indexed_status = 0 WHERE place_id = %s"
72
73
74 class InterpolationRunner(object):
75     """ Returns SQL commands for indexing the address interpolation table
76         location_property_osmline.
77     """
78
79     def name(self):
80         return "interpolation lines (location_property_osmline)"
81
82     def sql_count_objects(self):
83         return """SELECT count(*) FROM location_property_osmline
84                   WHERE indexed_status > 0"""
85
86     def sql_get_objects(self):
87         return """SELECT place_id FROM location_property_osmline
88                   WHERE indexed_status > 0
89                   ORDER BY geometry_sector"""
90
91     def sql_index_place(self):
92         return """UPDATE location_property_osmline
93                   SET indexed_status = 0 WHERE place_id = %s"""
94
95
96 class DBConnection(object):
97     """ A single non-blocking database connection.
98     """
99
100     def __init__(self, options):
101         self.current_query = None
102         self.current_params = None
103
104         self.conn = None
105         self.connect()
106
107     def connect(self):
108         if self.conn is not None:
109             self.cursor.close()
110             self.conn.close()
111
112         self.conn = make_connection(options, asynchronous=True)
113         self.wait()
114
115         self.cursor = self.conn.cursor()
116         # Disable JIT and parallel workers as they are known to cause problems.
117         # Update pg_settings instead of using SET because it does not yield
118         # errors on older versions of Postgres where the settings are not
119         # implemented.
120         self.perform(
121             """ UPDATE pg_settings SET setting = -1 WHERE name = 'jit_above_cost';
122                 UPDATE pg_settings SET setting = 0 
123                    WHERE name = 'max_parallel_workers_per_gather';""")
124         self.wait()
125
126     def wait(self):
127         """ Block until any pending operation is done.
128         """
129         while True:
130             try:
131                 wait_select(self.conn)
132                 self.current_query = None
133                 return
134             except psycopg2.extensions.TransactionRollbackError as e:
135                 if e.pgcode == '40P01':
136                     log.info("Deadlock detected (params = {}), retry."
137                               .format(self.current_params))
138                     self.cursor.execute(self.current_query, self.current_params)
139                 else:
140                     raise
141             except psycopg2.errors.DeadlockDetected:
142                 self.cursor.execute(self.current_query, self.current_params)
143
144     def perform(self, sql, args=None):
145         """ Send SQL query to the server. Returns immediately without
146             blocking.
147         """
148         self.current_query = sql
149         self.current_params = args
150         self.cursor.execute(sql, args)
151
152     def fileno(self):
153         """ File descriptor to wait for. (Makes this class select()able.)
154         """
155         return self.conn.fileno()
156
157     def is_done(self):
158         """ Check if the connection is available for a new query.
159
160             Also checks if the previous query has run into a deadlock.
161             If so, then the previous query is repeated.
162         """
163         if self.current_query is None:
164             return True
165
166         try:
167             if self.conn.poll() == psycopg2.extensions.POLL_OK:
168                 self.current_query = None
169                 return True
170         except psycopg2.extensions.TransactionRollbackError as e:
171             if e.pgcode == '40P01':
172                 log.info("Deadlock detected (params = {}), retry.".format(self.current_params))
173                 self.cursor.execute(self.current_query, self.current_params)
174             else:
175                 raise
176         except psycopg2.errors.DeadlockDetected:
177             self.cursor.execute(self.current_query, self.current_params)
178
179         return False
180
181
182 class Indexer(object):
183     """ Main indexing routine.
184     """
185
186     def __init__(self, options):
187         self.minrank = max(0, options.minrank)
188         self.maxrank = min(30, options.maxrank)
189         self.conn = make_connection(options)
190         self.threads = [DBConnection(options) for i in range(options.threads)]
191
192     def run(self):
193         """ Run indexing over the entire database.
194         """
195         log.warning("Starting indexing rank ({} to {}) using {} threads".format(
196                  self.minrank, self.maxrank, len(self.threads)))
197
198         for rank in range(self.minrank, self.maxrank):
199             self.index(RankRunner(rank))
200
201         if self.maxrank == 30:
202             self.index(InterpolationRunner())
203
204         self.index(RankRunner(self.maxrank))
205
206     def index(self, obj):
207         """ Index a single rank or table. `obj` describes the SQL to use
208             for indexing.
209         """
210         log.warning("Starting {}".format(obj.name()))
211
212         cur = self.conn.cursor()
213         cur.execute(obj.sql_count_objects())
214
215         total_tuples = cur.fetchone()[0]
216         log.debug("Total number of rows: {}".format(total_tuples))
217
218         cur.close()
219
220         next_thread = self.find_free_thread()
221         progress = ProgressLogger(obj.name(), total_tuples)
222
223         cur = self.conn.cursor(name='places')
224         cur.execute(obj.sql_get_objects())
225
226         for place in cur:
227             place_id = place[0]
228             log.debug("Processing place {}".format(place_id))
229             thread = next(next_thread)
230
231             thread.perform(obj.sql_index_place(), (place_id,))
232             progress.add()
233
234         cur.close()
235
236         for t in self.threads:
237             t.wait()
238
239         progress.done()
240
241     def find_free_thread(self):
242         """ Generator that returns the next connection that is free for
243             sending a query.
244         """
245         ready = self.threads
246         command_stat = 0
247
248         while True:
249             for thread in ready:
250                 if thread.is_done():
251                     command_stat += 1
252                     yield thread
253
254             # refresh the connections occasionaly to avoid potential
255             # memory leaks in Postgresql.
256             if command_stat > 100000:
257                 for t in self.threads:
258                     while not t.is_done():
259                         t.wait()
260                     t.connect()
261                 command_stat = 0
262                 ready = self.threads
263             else:
264                 ready, _, _ = select.select(self.threads, [], [])
265
266         assert False, "Unreachable code"
267
268
269 def nominatim_arg_parser():
270     """ Setup the command-line parser for the tool.
271     """
272     def h(s):
273         return re.sub("\s\s+" , " ", s)
274
275     p = ArgumentParser(description="Indexing tool for Nominatim.",
276                        formatter_class=RawDescriptionHelpFormatter)
277
278     p.add_argument('-d', '--database',
279                    dest='dbname', action='store', default='nominatim',
280                    help='Name of the PostgreSQL database to connect to.')
281     p.add_argument('-U', '--username',
282                    dest='user', action='store',
283                    help='PostgreSQL user name.')
284     p.add_argument('-W', '--password',
285                    dest='password_prompt', action='store_true',
286                    help='Force password prompt.')
287     p.add_argument('-H', '--host',
288                    dest='host', action='store',
289                    help='PostgreSQL server hostname or socket location.')
290     p.add_argument('-P', '--port',
291                    dest='port', action='store',
292                    help='PostgreSQL server port')
293     p.add_argument('-r', '--minrank',
294                    dest='minrank', type=int, metavar='RANK', default=0,
295                    help='Minimum/starting rank.')
296     p.add_argument('-R', '--maxrank',
297                    dest='maxrank', type=int, metavar='RANK', default=30,
298                    help='Maximum/finishing rank.')
299     p.add_argument('-t', '--threads',
300                    dest='threads', type=int, metavar='NUM', default=1,
301                    help='Number of threads to create for indexing.')
302     p.add_argument('-v', '--verbose',
303                    dest='loglevel', action='count', default=0,
304                    help='Increase verbosity')
305
306     return p
307
308 if __name__ == '__main__':
309     logging.basicConfig(stream=sys.stderr, format='%(levelname)s: %(message)s')
310
311     options = nominatim_arg_parser().parse_args(sys.argv[1:])
312
313     log.setLevel(max(3 - options.loglevel, 0) * 10)
314
315     options.password = None
316     if options.password_prompt:
317         password = getpass.getpass("Database password: ")
318         options.password = password
319
320     Indexer(options).run()