]> git.openstreetmap.org Git - nominatim.git/blob - nominatim/indexer/indexer.py
properly close connections of indexer after use
[nominatim.git] / nominatim / indexer / indexer.py
1 """
2 Main work horse for indexing (computing addresses) the database.
3 """
4 # pylint: disable=C0111
5 import logging
6 import select
7
8 import psycopg2
9
10 from .progress import ProgressLogger
11 from ..db.async_connection import DBConnection
12
13 LOG = logging.getLogger()
14
15 class RankRunner:
16     """ Returns SQL commands for indexing one rank within the placex table.
17     """
18
19     def __init__(self, rank):
20         self.rank = rank
21
22     def name(self):
23         return "rank {}".format(self.rank)
24
25     def sql_count_objects(self):
26         return """SELECT count(*) FROM placex
27                   WHERE rank_address = {} and indexed_status > 0
28                """.format(self.rank)
29
30     def sql_get_objects(self):
31         return """SELECT place_id FROM placex
32                   WHERE indexed_status > 0 and rank_address = {}
33                   ORDER BY geometry_sector""".format(self.rank)
34
35     @staticmethod
36     def sql_index_place(ids):
37         return "UPDATE placex SET indexed_status = 0 WHERE place_id IN ({})"\
38                .format(','.join((str(i) for i in ids)))
39
40
41 class InterpolationRunner:
42     """ Returns SQL commands for indexing the address interpolation table
43         location_property_osmline.
44     """
45
46     @staticmethod
47     def name():
48         return "interpolation lines (location_property_osmline)"
49
50     @staticmethod
51     def sql_count_objects():
52         return """SELECT count(*) FROM location_property_osmline
53                   WHERE indexed_status > 0"""
54
55     @staticmethod
56     def sql_get_objects():
57         return """SELECT place_id FROM location_property_osmline
58                   WHERE indexed_status > 0
59                   ORDER BY geometry_sector"""
60
61     @staticmethod
62     def sql_index_place(ids):
63         return """UPDATE location_property_osmline
64                   SET indexed_status = 0 WHERE place_id IN ({})
65                """.format(','.join((str(i) for i in ids)))
66
67 class BoundaryRunner:
68     """ Returns SQL commands for indexing the administrative boundaries
69         of a certain rank.
70     """
71
72     def __init__(self, rank):
73         self.rank = rank
74
75     def name(self):
76         return "boundaries rank {}".format(self.rank)
77
78     def sql_count_objects(self):
79         return """SELECT count(*) FROM placex
80                   WHERE indexed_status > 0
81                     AND rank_search = {}
82                     AND class = 'boundary' and type = 'administrative'
83                """.format(self.rank)
84
85     def sql_get_objects(self):
86         return """SELECT place_id FROM placex
87                   WHERE indexed_status > 0 and rank_search = {}
88                         and class = 'boundary' and type = 'administrative'
89                   ORDER BY partition, admin_level
90                """.format(self.rank)
91
92     @staticmethod
93     def sql_index_place(ids):
94         return "UPDATE placex SET indexed_status = 0 WHERE place_id IN ({})"\
95                .format(','.join((str(i) for i in ids)))
96
97
98 class PostcodeRunner:
99     """ Provides the SQL commands for indexing the location_postcode table.
100     """
101
102     @staticmethod
103     def name():
104         return "postcodes (location_postcode)"
105
106     @staticmethod
107     def sql_count_objects():
108         return 'SELECT count(*) FROM location_postcode WHERE indexed_status > 0'
109
110     @staticmethod
111     def sql_get_objects():
112         return """SELECT place_id FROM location_postcode
113                   WHERE indexed_status > 0
114                   ORDER BY country_code, postcode"""
115
116     @staticmethod
117     def sql_index_place(ids):
118         return """UPDATE location_postcode SET indexed_status = 0
119                   WHERE place_id IN ({})
120                """.format(','.join((str(i) for i in ids)))
121
122 class Indexer:
123     """ Main indexing routine.
124     """
125
126     def __init__(self, dsn, num_threads):
127         self.dsn = dsn
128         self.num_threads = num_threads
129         self.conn = None
130         self.threads = []
131
132
133     def _setup_connections(self):
134         self.conn = psycopg2.connect(self.dsn)
135         self.threads = [DBConnection(self.dsn) for _ in range(self.num_threads)]
136
137
138     def _close_connections(self):
139         if self.conn:
140             self.conn.close()
141             self.conn = None
142
143         for thread in self.threads:
144             thread.close()
145         threads = []
146
147
148     def index_full(self, analyse=True):
149         """ Index the complete database. This will first index boudnaries
150             followed by all other objects. When `analyse` is True, then the
151             database will be analysed at the appropriate places to
152             ensure that database statistics are updated.
153         """
154         conn = psycopg2.connect(self.dsn)
155
156         try:
157             self.index_by_rank(0, 4)
158             self._analyse_db_if(conn, analyse)
159
160             self.index_boundaries(0, 30)
161             self._analyse_db_if(conn, analyse)
162
163             self.index_by_rank(5, 25)
164             self._analyse_db_if(conn, analyse)
165
166             self.index_by_rank(26, 30)
167             self._analyse_db_if(conn, analyse)
168
169             self.index_postcodes()
170             self._analyse_db_if(conn, analyse)
171         finally:
172             conn.close()
173
174     def _analyse_db_if(self, conn, condition):
175         if condition:
176             with conn.cursor() as cur:
177                 cur.execute('ANALYSE')
178
179     def index_boundaries(self, minrank, maxrank):
180         """ Index only administrative boundaries within the given rank range.
181         """
182         LOG.warning("Starting indexing boundaries using %s threads",
183                     self.num_threads)
184
185         self._setup_connections()
186
187         try:
188             for rank in range(max(minrank, 4), min(maxrank, 26)):
189                 self.index(BoundaryRunner(rank))
190         finally:
191             self._close_connections()
192
193     def index_by_rank(self, minrank, maxrank):
194         """ Index all entries of placex in the given rank range (inclusive)
195             in order of their address rank.
196
197             When rank 30 is requested then also interpolations and
198             places with address rank 0 will be indexed.
199         """
200         maxrank = min(maxrank, 30)
201         LOG.warning("Starting indexing rank (%i to %i) using %i threads",
202                     minrank, maxrank, self.num_threads)
203
204         self._setup_connections()
205
206         try:
207             for rank in range(max(1, minrank), maxrank):
208                 self.index(RankRunner(rank))
209
210             if maxrank == 30:
211                 self.index(RankRunner(0))
212                 self.index(InterpolationRunner(), 20)
213                 self.index(RankRunner(30), 20)
214             else:
215                 self.index(RankRunner(maxrank))
216         finally:
217             self._close_connections()
218
219
220     def index_postcodes(self):
221         """Index the entries ofthe location_postcode table.
222         """
223         LOG.warning("Starting indexing postcodes using %s threads", self.num_threads)
224
225         self._setup_connections()
226
227         try:
228             self.index(PostcodeRunner(), 20)
229         finally:
230             self._close_connections()
231
232     def update_status_table(self):
233         """ Update the status in the status table to 'indexed'.
234         """
235         conn = psycopg2.connect(self.dsn)
236
237         try:
238             with conn.cursor() as cur:
239                 cur.execute('UPDATE import_status SET indexed = true')
240
241             conn.commit()
242         finally:
243             conn.close()
244
245     def index(self, obj, batch=1):
246         """ Index a single rank or table. `obj` describes the SQL to use
247             for indexing. `batch` describes the number of objects that
248             should be processed with a single SQL statement
249         """
250         LOG.warning("Starting %s (using batch size %s)", obj.name(), batch)
251
252         cur = self.conn.cursor()
253         cur.execute(obj.sql_count_objects())
254
255         total_tuples = cur.fetchone()[0]
256         LOG.debug("Total number of rows: %i", total_tuples)
257
258         cur.close()
259
260         progress = ProgressLogger(obj.name(), total_tuples)
261
262         if total_tuples > 0:
263             cur = self.conn.cursor(name='places')
264             cur.execute(obj.sql_get_objects())
265
266             next_thread = self.find_free_thread()
267             while True:
268                 places = [p[0] for p in cur.fetchmany(batch)]
269                 if not places:
270                     break
271
272                 LOG.debug("Processing places: %s", str(places))
273                 thread = next(next_thread)
274
275                 thread.perform(obj.sql_index_place(places))
276                 progress.add(len(places))
277
278             cur.close()
279
280             for thread in self.threads:
281                 thread.wait()
282
283         progress.done()
284
285     def find_free_thread(self):
286         """ Generator that returns the next connection that is free for
287             sending a query.
288         """
289         ready = self.threads
290         command_stat = 0
291
292         while True:
293             for thread in ready:
294                 if thread.is_done():
295                     command_stat += 1
296                     yield thread
297
298             # refresh the connections occasionaly to avoid potential
299             # memory leaks in Postgresql.
300             if command_stat > 100000:
301                 for thread in self.threads:
302                     while not thread.is_done():
303                         thread.wait()
304                     thread.connect()
305                 command_stat = 0
306                 ready = self.threads
307             else:
308                 ready, _, _ = select.select(self.threads, [], [])
309
310         assert False, "Unreachable code"