]> git.openstreetmap.org Git - nominatim.git/blob - nominatim/nominatim.py
a4f4a62e7cdf262353a9917e2909db50ece46cad
[nominatim.git] / nominatim / nominatim.py
1 #! /usr/bin/env python3
2 #-----------------------------------------------------------------------------
3 # nominatim - [description]
4 #-----------------------------------------------------------------------------
5 #
6 # Indexing tool for the Nominatim database.
7 #
8 # Based on C version by Brian Quinion
9 #
10 # This program is free software; you can redistribute it and/or
11 # modify it under the terms of the GNU General Public License
12 # as published by the Free Software Foundation; either version 2
13 # of the License, or (at your option) any later version.
14 #
15 # This program is distributed in the hope that it will be useful,
16 # but WITHOUT ANY WARRANTY; without even the implied warranty of
17 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18 # GNU General Public License for more details.
19 #
20 # You should have received a copy of the GNU General Public License
21 # along with this program; if not, write to the Free Software
22 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
23 #-----------------------------------------------------------------------------
24
25 from argparse import ArgumentParser, RawDescriptionHelpFormatter, ArgumentTypeError
26 import logging
27 import sys
28 import re
29 import getpass
30 from datetime import datetime
31 import select
32
33 from indexer.progress import ProgressLogger
34 from indexer.db import DBConnection, make_connection
35
36 log = logging.getLogger()
37
38 class RankRunner(object):
39     """ Returns SQL commands for indexing one rank within the placex table.
40     """
41
42     def __init__(self, rank):
43         self.rank = rank
44
45     def name(self):
46         return "rank {}".format(self.rank)
47
48     def sql_count_objects(self):
49         return """SELECT count(*) FROM placex
50                   WHERE rank_search = {} and indexed_status > 0
51                """.format(self.rank)
52
53     def sql_get_objects(self):
54         return """SELECT place_id FROM placex
55                   WHERE indexed_status > 0 and rank_search = {}
56                   ORDER BY geometry_sector""".format(self.rank)
57
58     def sql_index_place(self, ids):
59         return "UPDATE placex SET indexed_status = 0 WHERE place_id IN ({})"\
60                .format(','.join((str(i) for i in ids)))
61
62
63 class InterpolationRunner(object):
64     """ Returns SQL commands for indexing the address interpolation table
65         location_property_osmline.
66     """
67
68     def name(self):
69         return "interpolation lines (location_property_osmline)"
70
71     def sql_count_objects(self):
72         return """SELECT count(*) FROM location_property_osmline
73                   WHERE indexed_status > 0"""
74
75     def sql_get_objects(self):
76         return """SELECT place_id FROM location_property_osmline
77                   WHERE indexed_status > 0
78                   ORDER BY geometry_sector"""
79
80     def sql_index_place(self, ids):
81         return """UPDATE location_property_osmline
82                   SET indexed_status = 0 WHERE place_id IN ({})"""\
83                .format(','.join((str(i) for i in ids)))
84
85 class BoundaryRunner(object):
86     """ Returns SQL commands for indexing the administrative boundaries
87         by partition.
88     """
89
90     def name(self):
91         return "boundaries"
92
93     def sql_count_objects(self):
94         return """SELECT count(*) FROM placex
95                   WHERE indexed_status > 0
96                     AND rank_search < 26
97                     AND class = 'boundary' and type = 'administrative'"""
98
99     def sql_get_objects(self):
100         return """SELECT place_id FROM placex
101                   WHERE indexed_status > 0 and rank_search < 26
102                         and class = 'boundary' and type = 'administrative'
103                   ORDER BY partition, admin_level"""
104
105     def sql_index_place(self, ids):
106         return "UPDATE placex SET indexed_status = 0 WHERE place_id IN ({})"\
107                .format(','.join((str(i) for i in ids)))
108
109 class Indexer(object):
110     """ Main indexing routine.
111     """
112
113     def __init__(self, options):
114         self.minrank = max(0, options.minrank)
115         self.maxrank = min(30, options.maxrank)
116         self.conn = make_connection(options)
117         self.threads = [DBConnection(options) for i in range(options.threads)]
118
119     def index_boundaries(self):
120         log.warning("Starting indexing boundaries using {} threads".format(
121                       len(self.threads)))
122
123         self.index(BoundaryRunner())
124
125     def index_by_rank(self):
126         """ Run classic indexing by rank.
127         """
128         log.warning("Starting indexing rank ({} to {}) using {} threads".format(
129                  self.minrank, self.maxrank, len(self.threads)))
130
131         for rank in range(self.minrank, self.maxrank):
132             self.index(RankRunner(rank))
133
134         if self.maxrank == 30:
135             self.index(InterpolationRunner(), 20)
136
137         self.index(RankRunner(self.maxrank), 20)
138
139     def index(self, obj, batch=1):
140         """ Index a single rank or table. `obj` describes the SQL to use
141             for indexing. `batch` describes the number of objects that
142             should be processed with a single SQL statement
143         """
144         log.warning("Starting {}".format(obj.name()))
145
146         cur = self.conn.cursor()
147         cur.execute(obj.sql_count_objects())
148
149         total_tuples = cur.fetchone()[0]
150         log.debug("Total number of rows: {}".format(total_tuples))
151
152         cur.close()
153
154         next_thread = self.find_free_thread()
155         progress = ProgressLogger(obj.name(), total_tuples)
156
157         cur = self.conn.cursor(name='places')
158         cur.execute(obj.sql_get_objects())
159
160         while True:
161             places = [p[0] for p in cur.fetchmany(batch)]
162             if len(places) == 0:
163                 break
164
165             log.debug("Processing places: {}".format(places))
166             thread = next(next_thread)
167
168             thread.perform(obj.sql_index_place(places))
169             progress.add(len(places))
170
171         cur.close()
172
173         for t in self.threads:
174             t.wait()
175
176         progress.done()
177
178     def find_free_thread(self):
179         """ Generator that returns the next connection that is free for
180             sending a query.
181         """
182         ready = self.threads
183         command_stat = 0
184
185         while True:
186             for thread in ready:
187                 if thread.is_done():
188                     command_stat += 1
189                     yield thread
190
191             # refresh the connections occasionaly to avoid potential
192             # memory leaks in Postgresql.
193             if command_stat > 100000:
194                 for t in self.threads:
195                     while not t.is_done():
196                         t.wait()
197                     t.connect()
198                 command_stat = 0
199                 ready = self.threads
200             else:
201                 ready, _, _ = select.select(self.threads, [], [])
202
203         assert False, "Unreachable code"
204
205
206 def nominatim_arg_parser():
207     """ Setup the command-line parser for the tool.
208     """
209     def h(s):
210         return re.sub("\s\s+" , " ", s)
211
212     p = ArgumentParser(description="Indexing tool for Nominatim.",
213                        formatter_class=RawDescriptionHelpFormatter)
214
215     p.add_argument('-d', '--database',
216                    dest='dbname', action='store', default='nominatim',
217                    help='Name of the PostgreSQL database to connect to.')
218     p.add_argument('-U', '--username',
219                    dest='user', action='store',
220                    help='PostgreSQL user name.')
221     p.add_argument('-W', '--password',
222                    dest='password_prompt', action='store_true',
223                    help='Force password prompt.')
224     p.add_argument('-H', '--host',
225                    dest='host', action='store',
226                    help='PostgreSQL server hostname or socket location.')
227     p.add_argument('-P', '--port',
228                    dest='port', action='store',
229                    help='PostgreSQL server port')
230     p.add_argument('-b', '--boundary-only',
231                    dest='boundary_only', action='store_true',
232                    help='Only index administrative boundaries (ignores min/maxrank).')
233     p.add_argument('-r', '--minrank',
234                    dest='minrank', type=int, metavar='RANK', default=0,
235                    help='Minimum/starting rank.')
236     p.add_argument('-R', '--maxrank',
237                    dest='maxrank', type=int, metavar='RANK', default=30,
238                    help='Maximum/finishing rank.')
239     p.add_argument('-t', '--threads',
240                    dest='threads', type=int, metavar='NUM', default=1,
241                    help='Number of threads to create for indexing.')
242     p.add_argument('-v', '--verbose',
243                    dest='loglevel', action='count', default=0,
244                    help='Increase verbosity')
245
246     return p
247
248 if __name__ == '__main__':
249     logging.basicConfig(stream=sys.stderr, format='%(levelname)s: %(message)s')
250
251     options = nominatim_arg_parser().parse_args(sys.argv[1:])
252
253     log.setLevel(max(3 - options.loglevel, 0) * 10)
254
255     options.password = None
256     if options.password_prompt:
257         password = getpass.getpass("Database password: ")
258         options.password = password
259
260     if options.boundary_only:
261         Indexer(options).index_boundaries()
262     else:
263         Indexer(options).index_by_rank()