]> git.openstreetmap.org Git - nominatim.git/blob - nominatim/nominatim.py
do not run rank 0 objects in parallel
[nominatim.git] / nominatim / nominatim.py
1 #! /usr/bin/env python3
2 #-----------------------------------------------------------------------------
3 # nominatim - [description]
4 #-----------------------------------------------------------------------------
5 #
6 # Indexing tool for the Nominatim database.
7 #
8 # Based on C version by Brian Quinion
9 #
10 # This program is free software; you can redistribute it and/or
11 # modify it under the terms of the GNU General Public License
12 # as published by the Free Software Foundation; either version 2
13 # of the License, or (at your option) any later version.
14 #
15 # This program is distributed in the hope that it will be useful,
16 # but WITHOUT ANY WARRANTY; without even the implied warranty of
17 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18 # GNU General Public License for more details.
19 #
20 # You should have received a copy of the GNU General Public License
21 # along with this program; if not, write to the Free Software
22 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
23 #-----------------------------------------------------------------------------
24
25 from argparse import ArgumentParser, RawDescriptionHelpFormatter, ArgumentTypeError
26 import logging
27 import sys
28 import re
29 import getpass
30 from datetime import datetime
31 import select
32
33 from indexer.progress import ProgressLogger
34 from indexer.db import DBConnection, make_connection
35
36 log = logging.getLogger()
37
38 class RankRunner(object):
39     """ Returns SQL commands for indexing one rank within the placex table.
40     """
41
42     def __init__(self, rank):
43         self.rank = rank
44
45     def name(self):
46         return "rank {}".format(self.rank)
47
48     def sql_count_objects(self):
49         return """SELECT count(*) FROM placex
50                   WHERE rank_address = {} and indexed_status > 0
51                """.format(self.rank)
52
53     def sql_get_objects(self):
54         return """SELECT place_id FROM placex
55                   WHERE indexed_status > 0 and rank_address = {}
56                   ORDER BY geometry_sector""".format(self.rank)
57
58     def sql_index_place(self, ids):
59         return "UPDATE placex SET indexed_status = 0 WHERE place_id IN ({})"\
60                .format(','.join((str(i) for i in ids)))
61
62
63 class InterpolationRunner(object):
64     """ Returns SQL commands for indexing the address interpolation table
65         location_property_osmline.
66     """
67
68     def name(self):
69         return "interpolation lines (location_property_osmline)"
70
71     def sql_count_objects(self):
72         return """SELECT count(*) FROM location_property_osmline
73                   WHERE indexed_status > 0"""
74
75     def sql_get_objects(self):
76         return """SELECT place_id FROM location_property_osmline
77                   WHERE indexed_status > 0
78                   ORDER BY geometry_sector"""
79
80     def sql_index_place(self, ids):
81         return """UPDATE location_property_osmline
82                   SET indexed_status = 0 WHERE place_id IN ({})"""\
83                .format(','.join((str(i) for i in ids)))
84
85 class BoundaryRunner(object):
86     """ Returns SQL commands for indexing the administrative boundaries
87         of a certain rank.
88     """
89
90     def __init__(self, rank):
91         self.rank = rank
92
93     def name(self):
94         return "boundaries rank {}".format(self.rank)
95
96     def sql_count_objects(self):
97         return """SELECT count(*) FROM placex
98                   WHERE indexed_status > 0
99                     AND rank_search = {}
100                     AND class = 'boundary' and type = 'administrative'""".format(self.rank)
101
102     def sql_get_objects(self):
103         return """SELECT place_id FROM placex
104                   WHERE indexed_status > 0 and rank_search = {}
105                         and class = 'boundary' and type = 'administrative'
106                   ORDER BY partition, admin_level""".format(self.rank)
107
108     def sql_index_place(self, ids):
109         return "UPDATE placex SET indexed_status = 0 WHERE place_id IN ({})"\
110                .format(','.join((str(i) for i in ids)))
111
112 class Indexer(object):
113     """ Main indexing routine.
114     """
115
116     def __init__(self, options):
117         self.minrank = max(1, options.minrank)
118         self.maxrank = min(30, options.maxrank)
119         self.conn = make_connection(options)
120         self.threads = [DBConnection(options) for i in range(options.threads)]
121
122     def index_boundaries(self):
123         log.warning("Starting indexing boundaries using {} threads".format(
124                       len(self.threads)))
125
126         for rank in range(max(self.minrank, 5), min(self.maxrank, 26)):
127             self.index(BoundaryRunner(rank))
128
129     def index_by_rank(self):
130         """ Run classic indexing by rank.
131         """
132         log.warning("Starting indexing rank ({} to {}) using {} threads".format(
133                  self.minrank, self.maxrank, len(self.threads)))
134
135         for rank in range(max(1, self.minrank), self.maxrank):
136             self.index(RankRunner(rank))
137
138
139         if self.maxrank == 30:
140             self.index(RankRunner(0))
141             self.index(InterpolationRunner(), 20)
142
143         self.index(RankRunner(self.maxrank), 20)
144
145     def index(self, obj, batch=1):
146         """ Index a single rank or table. `obj` describes the SQL to use
147             for indexing. `batch` describes the number of objects that
148             should be processed with a single SQL statement
149         """
150         log.warning("Starting {}".format(obj.name()))
151
152         cur = self.conn.cursor()
153         cur.execute(obj.sql_count_objects())
154
155         total_tuples = cur.fetchone()[0]
156         log.debug("Total number of rows: {}".format(total_tuples))
157
158         cur.close()
159
160         progress = ProgressLogger(obj.name(), total_tuples)
161
162         if total_tuples > 0:
163             cur = self.conn.cursor(name='places')
164             cur.execute(obj.sql_get_objects())
165
166             next_thread = self.find_free_thread()
167             while True:
168                 places = [p[0] for p in cur.fetchmany(batch)]
169                 if len(places) == 0:
170                     break
171
172                 log.debug("Processing places: {}".format(places))
173                 thread = next(next_thread)
174
175                 thread.perform(obj.sql_index_place(places))
176                 progress.add(len(places))
177
178             cur.close()
179
180             for t in self.threads:
181                 t.wait()
182
183         progress.done()
184
185     def find_free_thread(self):
186         """ Generator that returns the next connection that is free for
187             sending a query.
188         """
189         ready = self.threads
190         command_stat = 0
191
192         while True:
193             for thread in ready:
194                 if thread.is_done():
195                     command_stat += 1
196                     yield thread
197
198             # refresh the connections occasionaly to avoid potential
199             # memory leaks in Postgresql.
200             if command_stat > 100000:
201                 for t in self.threads:
202                     while not t.is_done():
203                         t.wait()
204                     t.connect()
205                 command_stat = 0
206                 ready = self.threads
207             else:
208                 ready, _, _ = select.select(self.threads, [], [])
209
210         assert False, "Unreachable code"
211
212
213 def nominatim_arg_parser():
214     """ Setup the command-line parser for the tool.
215     """
216     def h(s):
217         return re.sub("\s\s+" , " ", s)
218
219     p = ArgumentParser(description="Indexing tool for Nominatim.",
220                        formatter_class=RawDescriptionHelpFormatter)
221
222     p.add_argument('-d', '--database',
223                    dest='dbname', action='store', default='nominatim',
224                    help='Name of the PostgreSQL database to connect to.')
225     p.add_argument('-U', '--username',
226                    dest='user', action='store',
227                    help='PostgreSQL user name.')
228     p.add_argument('-W', '--password',
229                    dest='password_prompt', action='store_true',
230                    help='Force password prompt.')
231     p.add_argument('-H', '--host',
232                    dest='host', action='store',
233                    help='PostgreSQL server hostname or socket location.')
234     p.add_argument('-P', '--port',
235                    dest='port', action='store',
236                    help='PostgreSQL server port')
237     p.add_argument('-b', '--boundary-only',
238                    dest='boundary_only', action='store_true',
239                    help='Only index administrative boundaries (ignores min/maxrank).')
240     p.add_argument('-r', '--minrank',
241                    dest='minrank', type=int, metavar='RANK', default=0,
242                    help='Minimum/starting rank.')
243     p.add_argument('-R', '--maxrank',
244                    dest='maxrank', type=int, metavar='RANK', default=30,
245                    help='Maximum/finishing rank.')
246     p.add_argument('-t', '--threads',
247                    dest='threads', type=int, metavar='NUM', default=1,
248                    help='Number of threads to create for indexing.')
249     p.add_argument('-v', '--verbose',
250                    dest='loglevel', action='count', default=0,
251                    help='Increase verbosity')
252
253     return p
254
255 if __name__ == '__main__':
256     logging.basicConfig(stream=sys.stderr, format='%(levelname)s: %(message)s')
257
258     options = nominatim_arg_parser().parse_args(sys.argv[1:])
259
260     log.setLevel(max(3 - options.loglevel, 0) * 10)
261
262     options.password = None
263     if options.password_prompt:
264         password = getpass.getpass("Database password: ")
265         options.password = password
266
267     if options.boundary_only:
268         Indexer(options).index_boundaries()
269     else:
270         Indexer(options).index_by_rank()