]> git.openstreetmap.org Git - nominatim.git/blob - nominatim/indexer/indexer.py
Merge pull request #2197 from lonvia/use-jinja-for-sql-preprocessing
[nominatim.git] / nominatim / indexer / indexer.py
1 """
2 Main work horse for indexing (computing addresses) the database.
3 """
4 # pylint: disable=C0111
5 import logging
6 import select
7
8 import psycopg2
9
10 from .progress import ProgressLogger
11 from ..db.async_connection import DBConnection
12
13 LOG = logging.getLogger()
14
15 class RankRunner:
16     """ Returns SQL commands for indexing one rank within the placex table.
17     """
18
19     def __init__(self, rank):
20         self.rank = rank
21
22     def name(self):
23         return "rank {}".format(self.rank)
24
25     def sql_count_objects(self):
26         return """SELECT count(*) FROM placex
27                   WHERE rank_address = {} and indexed_status > 0
28                """.format(self.rank)
29
30     def sql_get_objects(self):
31         return """SELECT place_id FROM placex
32                   WHERE indexed_status > 0 and rank_address = {}
33                   ORDER BY geometry_sector""".format(self.rank)
34
35     @staticmethod
36     def sql_index_place(ids):
37         return "UPDATE placex SET indexed_status = 0 WHERE place_id IN ({})"\
38                .format(','.join((str(i) for i in ids)))
39
40
41 class InterpolationRunner:
42     """ Returns SQL commands for indexing the address interpolation table
43         location_property_osmline.
44     """
45
46     @staticmethod
47     def name():
48         return "interpolation lines (location_property_osmline)"
49
50     @staticmethod
51     def sql_count_objects():
52         return """SELECT count(*) FROM location_property_osmline
53                   WHERE indexed_status > 0"""
54
55     @staticmethod
56     def sql_get_objects():
57         return """SELECT place_id FROM location_property_osmline
58                   WHERE indexed_status > 0
59                   ORDER BY geometry_sector"""
60
61     @staticmethod
62     def sql_index_place(ids):
63         return """UPDATE location_property_osmline
64                   SET indexed_status = 0 WHERE place_id IN ({})
65                """.format(','.join((str(i) for i in ids)))
66
67 class BoundaryRunner:
68     """ Returns SQL commands for indexing the administrative boundaries
69         of a certain rank.
70     """
71
72     def __init__(self, rank):
73         self.rank = rank
74
75     def name(self):
76         return "boundaries rank {}".format(self.rank)
77
78     def sql_count_objects(self):
79         return """SELECT count(*) FROM placex
80                   WHERE indexed_status > 0
81                     AND rank_search = {}
82                     AND class = 'boundary' and type = 'administrative'
83                """.format(self.rank)
84
85     def sql_get_objects(self):
86         return """SELECT place_id FROM placex
87                   WHERE indexed_status > 0 and rank_search = {}
88                         and class = 'boundary' and type = 'administrative'
89                   ORDER BY partition, admin_level
90                """.format(self.rank)
91
92     @staticmethod
93     def sql_index_place(ids):
94         return "UPDATE placex SET indexed_status = 0 WHERE place_id IN ({})"\
95                .format(','.join((str(i) for i in ids)))
96
97
98 class PostcodeRunner:
99     """ Provides the SQL commands for indexing the location_postcode table.
100     """
101
102     @staticmethod
103     def name():
104         return "postcodes (location_postcode)"
105
106     @staticmethod
107     def sql_count_objects():
108         return 'SELECT count(*) FROM location_postcode WHERE indexed_status > 0'
109
110     @staticmethod
111     def sql_get_objects():
112         return """SELECT place_id FROM location_postcode
113                   WHERE indexed_status > 0
114                   ORDER BY country_code, postcode"""
115
116     @staticmethod
117     def sql_index_place(ids):
118         return """UPDATE location_postcode SET indexed_status = 0
119                   WHERE place_id IN ({})
120                """.format(','.join((str(i) for i in ids)))
121
122
123 def _analyse_db_if(conn, condition):
124     if condition:
125         with conn.cursor() as cur:
126             cur.execute('ANALYSE')
127
128
129 class Indexer:
130     """ Main indexing routine.
131     """
132
133     def __init__(self, dsn, num_threads):
134         self.dsn = dsn
135         self.num_threads = num_threads
136         self.conn = None
137         self.threads = []
138
139
140     def _setup_connections(self):
141         self.conn = psycopg2.connect(self.dsn)
142         self.threads = [DBConnection(self.dsn) for _ in range(self.num_threads)]
143
144
145     def _close_connections(self):
146         if self.conn:
147             self.conn.close()
148             self.conn = None
149
150         for thread in self.threads:
151             thread.close()
152         self.threads = []
153
154
155     def index_full(self, analyse=True):
156         """ Index the complete database. This will first index boudnaries
157             followed by all other objects. When `analyse` is True, then the
158             database will be analysed at the appropriate places to
159             ensure that database statistics are updated.
160         """
161         conn = psycopg2.connect(self.dsn)
162         conn.autocommit = True
163
164         try:
165             self.index_by_rank(0, 4)
166             _analyse_db_if(conn, analyse)
167
168             self.index_boundaries(0, 30)
169             _analyse_db_if(conn, analyse)
170
171             self.index_by_rank(5, 25)
172             _analyse_db_if(conn, analyse)
173
174             self.index_by_rank(26, 30)
175             _analyse_db_if(conn, analyse)
176
177             self.index_postcodes()
178             _analyse_db_if(conn, analyse)
179         finally:
180             conn.close()
181
182
183     def index_boundaries(self, minrank, maxrank):
184         """ Index only administrative boundaries within the given rank range.
185         """
186         LOG.warning("Starting indexing boundaries using %s threads",
187                     self.num_threads)
188
189         self._setup_connections()
190
191         try:
192             for rank in range(max(minrank, 4), min(maxrank, 26)):
193                 self.index(BoundaryRunner(rank))
194         finally:
195             self._close_connections()
196
197     def index_by_rank(self, minrank, maxrank):
198         """ Index all entries of placex in the given rank range (inclusive)
199             in order of their address rank.
200
201             When rank 30 is requested then also interpolations and
202             places with address rank 0 will be indexed.
203         """
204         maxrank = min(maxrank, 30)
205         LOG.warning("Starting indexing rank (%i to %i) using %i threads",
206                     minrank, maxrank, self.num_threads)
207
208         self._setup_connections()
209
210         try:
211             for rank in range(max(1, minrank), maxrank):
212                 self.index(RankRunner(rank))
213
214             if maxrank == 30:
215                 self.index(RankRunner(0))
216                 self.index(InterpolationRunner(), 20)
217                 self.index(RankRunner(30), 20)
218             else:
219                 self.index(RankRunner(maxrank))
220         finally:
221             self._close_connections()
222
223
224     def index_postcodes(self):
225         """Index the entries ofthe location_postcode table.
226         """
227         LOG.warning("Starting indexing postcodes using %s threads", self.num_threads)
228
229         self._setup_connections()
230
231         try:
232             self.index(PostcodeRunner(), 20)
233         finally:
234             self._close_connections()
235
236     def update_status_table(self):
237         """ Update the status in the status table to 'indexed'.
238         """
239         conn = psycopg2.connect(self.dsn)
240
241         try:
242             with conn.cursor() as cur:
243                 cur.execute('UPDATE import_status SET indexed = true')
244
245             conn.commit()
246         finally:
247             conn.close()
248
249     def index(self, obj, batch=1):
250         """ Index a single rank or table. `obj` describes the SQL to use
251             for indexing. `batch` describes the number of objects that
252             should be processed with a single SQL statement
253         """
254         LOG.warning("Starting %s (using batch size %s)", obj.name(), batch)
255
256         cur = self.conn.cursor()
257         cur.execute(obj.sql_count_objects())
258
259         total_tuples = cur.fetchone()[0]
260         LOG.debug("Total number of rows: %i", total_tuples)
261
262         cur.close()
263
264         progress = ProgressLogger(obj.name(), total_tuples)
265
266         if total_tuples > 0:
267             cur = self.conn.cursor(name='places')
268             cur.execute(obj.sql_get_objects())
269
270             next_thread = self.find_free_thread()
271             while True:
272                 places = [p[0] for p in cur.fetchmany(batch)]
273                 if not places:
274                     break
275
276                 LOG.debug("Processing places: %s", str(places))
277                 thread = next(next_thread)
278
279                 thread.perform(obj.sql_index_place(places))
280                 progress.add(len(places))
281
282             cur.close()
283
284             for thread in self.threads:
285                 thread.wait()
286
287         progress.done()
288
289     def find_free_thread(self):
290         """ Generator that returns the next connection that is free for
291             sending a query.
292         """
293         ready = self.threads
294         command_stat = 0
295
296         while True:
297             for thread in ready:
298                 if thread.is_done():
299                     command_stat += 1
300                     yield thread
301
302             # refresh the connections occasionaly to avoid potential
303             # memory leaks in Postgresql.
304             if command_stat > 100000:
305                 for thread in self.threads:
306                     while not thread.is_done():
307                         thread.wait()
308                     thread.connect()
309                 command_stat = 0
310                 ready = self.threads
311             else:
312                 ready, _, _ = select.select(self.threads, [], [])
313
314         assert False, "Unreachable code"