]> git.openstreetmap.org Git - nominatim.git/blob - nominatim/nominatim.py
67cd42ee9bde66bbfe199245be6abc5989b3ef99
[nominatim.git] / nominatim / nominatim.py
1 #! /usr/bin/env python3
2 #-----------------------------------------------------------------------------
3 # nominatim - [description]
4 #-----------------------------------------------------------------------------
5 #
6 # Indexing tool for the Nominatim database.
7 #
8 # Based on C version by Brian Quinion
9 #
10 # This program is free software; you can redistribute it and/or
11 # modify it under the terms of the GNU General Public License
12 # as published by the Free Software Foundation; either version 2
13 # of the License, or (at your option) any later version.
14 #
15 # This program is distributed in the hope that it will be useful,
16 # but WITHOUT ANY WARRANTY; without even the implied warranty of
17 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18 # GNU General Public License for more details.
19 #
20 # You should have received a copy of the GNU General Public License
21 # along with this program; if not, write to the Free Software
22 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
23 #-----------------------------------------------------------------------------
24
25 from argparse import ArgumentParser, RawDescriptionHelpFormatter, ArgumentTypeError
26 import logging
27 import sys
28 import re
29 import getpass
30 from datetime import datetime
31 import select
32
33 from indexer.progress import ProgressLogger
34 from indexer.db import DBConnection, make_connection
35
36 log = logging.getLogger()
37
38 class RankRunner(object):
39     """ Returns SQL commands for indexing one rank within the placex table.
40     """
41
42     def __init__(self, rank):
43         self.rank = rank
44
45     def name(self):
46         return "rank {}".format(self.rank)
47
48     def sql_count_objects(self):
49         return """SELECT count(*) FROM placex
50                   WHERE rank_search = {} and indexed_status > 0
51                """.format(self.rank)
52
53     def sql_get_objects(self):
54         return """SELECT place_id FROM placex
55                   WHERE indexed_status > 0 and rank_search = {}
56                   ORDER BY geometry_sector""".format(self.rank)
57
58     def sql_index_place(self, ids):
59         return "UPDATE placex SET indexed_status = 0 WHERE place_id IN ({})"\
60                .format(','.join((str(i) for i in ids)))
61
62
63 class InterpolationRunner(object):
64     """ Returns SQL commands for indexing the address interpolation table
65         location_property_osmline.
66     """
67
68     def name(self):
69         return "interpolation lines (location_property_osmline)"
70
71     def sql_count_objects(self):
72         return """SELECT count(*) FROM location_property_osmline
73                   WHERE indexed_status > 0"""
74
75     def sql_get_objects(self):
76         return """SELECT place_id FROM location_property_osmline
77                   WHERE indexed_status > 0
78                   ORDER BY geometry_sector"""
79
80     def sql_index_place(self, ids):
81         return """UPDATE location_property_osmline
82                   SET indexed_status = 0 WHERE place_id IN ({})"""\
83                .format(','.join((str(i) for i in ids)))
84
85
86 class Indexer(object):
87     """ Main indexing routine.
88     """
89
90     def __init__(self, options):
91         self.minrank = max(0, options.minrank)
92         self.maxrank = min(30, options.maxrank)
93         self.conn = make_connection(options)
94         self.threads = [DBConnection(options) for i in range(options.threads)]
95
96     def run(self):
97         """ Run indexing over the entire database.
98         """
99         log.warning("Starting indexing rank ({} to {}) using {} threads".format(
100                  self.minrank, self.maxrank, len(self.threads)))
101
102         for rank in range(self.minrank, self.maxrank):
103             self.index(RankRunner(rank))
104
105         if self.maxrank == 30:
106             self.index(InterpolationRunner(), 20)
107
108         self.index(RankRunner(self.maxrank), 20)
109
110     def index(self, obj, batch=1):
111         """ Index a single rank or table. `obj` describes the SQL to use
112             for indexing. `batch` describes the number of objects that
113             should be processed with a single SQL statement
114         """
115         log.warning("Starting {}".format(obj.name()))
116
117         cur = self.conn.cursor()
118         cur.execute(obj.sql_count_objects())
119
120         total_tuples = cur.fetchone()[0]
121         log.debug("Total number of rows: {}".format(total_tuples))
122
123         cur.close()
124
125         next_thread = self.find_free_thread()
126         progress = ProgressLogger(obj.name(), total_tuples)
127
128         cur = self.conn.cursor(name='places')
129         cur.execute(obj.sql_get_objects())
130
131         while True:
132             places = [p[0] for p in cur.fetchmany(batch)]
133             if len(places) == 0:
134                 break
135
136             log.debug("Processing places: {}".format(places))
137             thread = next(next_thread)
138
139             thread.perform(obj.sql_index_place(places))
140             progress.add(len(places))
141
142         cur.close()
143
144         for t in self.threads:
145             t.wait()
146
147         progress.done()
148
149     def find_free_thread(self):
150         """ Generator that returns the next connection that is free for
151             sending a query.
152         """
153         ready = self.threads
154         command_stat = 0
155
156         while True:
157             for thread in ready:
158                 if thread.is_done():
159                     command_stat += 1
160                     yield thread
161
162             # refresh the connections occasionaly to avoid potential
163             # memory leaks in Postgresql.
164             if command_stat > 100000:
165                 for t in self.threads:
166                     while not t.is_done():
167                         t.wait()
168                     t.connect()
169                 command_stat = 0
170                 ready = self.threads
171             else:
172                 ready, _, _ = select.select(self.threads, [], [])
173
174         assert False, "Unreachable code"
175
176
177 def nominatim_arg_parser():
178     """ Setup the command-line parser for the tool.
179     """
180     def h(s):
181         return re.sub("\s\s+" , " ", s)
182
183     p = ArgumentParser(description="Indexing tool for Nominatim.",
184                        formatter_class=RawDescriptionHelpFormatter)
185
186     p.add_argument('-d', '--database',
187                    dest='dbname', action='store', default='nominatim',
188                    help='Name of the PostgreSQL database to connect to.')
189     p.add_argument('-U', '--username',
190                    dest='user', action='store',
191                    help='PostgreSQL user name.')
192     p.add_argument('-W', '--password',
193                    dest='password_prompt', action='store_true',
194                    help='Force password prompt.')
195     p.add_argument('-H', '--host',
196                    dest='host', action='store',
197                    help='PostgreSQL server hostname or socket location.')
198     p.add_argument('-P', '--port',
199                    dest='port', action='store',
200                    help='PostgreSQL server port')
201     p.add_argument('-r', '--minrank',
202                    dest='minrank', type=int, metavar='RANK', default=0,
203                    help='Minimum/starting rank.')
204     p.add_argument('-R', '--maxrank',
205                    dest='maxrank', type=int, metavar='RANK', default=30,
206                    help='Maximum/finishing rank.')
207     p.add_argument('-t', '--threads',
208                    dest='threads', type=int, metavar='NUM', default=1,
209                    help='Number of threads to create for indexing.')
210     p.add_argument('-v', '--verbose',
211                    dest='loglevel', action='count', default=0,
212                    help='Increase verbosity')
213
214     return p
215
216 if __name__ == '__main__':
217     logging.basicConfig(stream=sys.stderr, format='%(levelname)s: %(message)s')
218
219     options = nominatim_arg_parser().parse_args(sys.argv[1:])
220
221     log.setLevel(max(3 - options.loglevel, 0) * 10)
222
223     options.password = None
224     if options.password_prompt:
225         password = getpass.getpass("Database password: ")
226         options.password = password
227
228     Indexer(options).run()