]> git.openstreetmap.org Git - nominatim.git/blob - nominatim/indexer/indexer.py
move setup function to python
[nominatim.git] / nominatim / indexer / indexer.py
1 """
2 Main work horse for indexing (computing addresses) the database.
3 """
4 # pylint: disable=C0111
5 import logging
6 import select
7
8 import psycopg2
9
10 from .progress import ProgressLogger
11 from ..db.async_connection import DBConnection
12
13 LOG = logging.getLogger()
14
15 class RankRunner:
16     """ Returns SQL commands for indexing one rank within the placex table.
17     """
18
19     def __init__(self, rank):
20         self.rank = rank
21
22     def name(self):
23         return "rank {}".format(self.rank)
24
25     def sql_count_objects(self):
26         return """SELECT count(*) FROM placex
27                   WHERE rank_address = {} and indexed_status > 0
28                """.format(self.rank)
29
30     def sql_get_objects(self):
31         return """SELECT place_id FROM placex
32                   WHERE indexed_status > 0 and rank_address = {}
33                   ORDER BY geometry_sector""".format(self.rank)
34
35     @staticmethod
36     def sql_index_place(ids):
37         return "UPDATE placex SET indexed_status = 0 WHERE place_id IN ({})"\
38                .format(','.join((str(i) for i in ids)))
39
40
41 class InterpolationRunner:
42     """ Returns SQL commands for indexing the address interpolation table
43         location_property_osmline.
44     """
45
46     @staticmethod
47     def name():
48         return "interpolation lines (location_property_osmline)"
49
50     @staticmethod
51     def sql_count_objects():
52         return """SELECT count(*) FROM location_property_osmline
53                   WHERE indexed_status > 0"""
54
55     @staticmethod
56     def sql_get_objects():
57         return """SELECT place_id FROM location_property_osmline
58                   WHERE indexed_status > 0
59                   ORDER BY geometry_sector"""
60
61     @staticmethod
62     def sql_index_place(ids):
63         return """UPDATE location_property_osmline
64                   SET indexed_status = 0 WHERE place_id IN ({})
65                """.format(','.join((str(i) for i in ids)))
66
67 class BoundaryRunner:
68     """ Returns SQL commands for indexing the administrative boundaries
69         of a certain rank.
70     """
71
72     def __init__(self, rank):
73         self.rank = rank
74
75     def name(self):
76         return "boundaries rank {}".format(self.rank)
77
78     def sql_count_objects(self):
79         return """SELECT count(*) FROM placex
80                   WHERE indexed_status > 0
81                     AND rank_search = {}
82                     AND class = 'boundary' and type = 'administrative'
83                """.format(self.rank)
84
85     def sql_get_objects(self):
86         return """SELECT place_id FROM placex
87                   WHERE indexed_status > 0 and rank_search = {}
88                         and class = 'boundary' and type = 'administrative'
89                   ORDER BY partition, admin_level
90                """.format(self.rank)
91
92     @staticmethod
93     def sql_index_place(ids):
94         return "UPDATE placex SET indexed_status = 0 WHERE place_id IN ({})"\
95                .format(','.join((str(i) for i in ids)))
96
97
98 class PostcodeRunner:
99     """ Provides the SQL commands for indexing the location_postcode table.
100     """
101
102     @staticmethod
103     def name():
104         return "postcodes (location_postcode)"
105
106     @staticmethod
107     def sql_count_objects():
108         return 'SELECT count(*) FROM location_postcode WHERE indexed_status > 0'
109
110     @staticmethod
111     def sql_get_objects():
112         return """SELECT place_id FROM location_postcode
113                   WHERE indexed_status > 0
114                   ORDER BY country_code, postcode"""
115
116     @staticmethod
117     def sql_index_place(ids):
118         return """UPDATE location_postcode SET indexed_status = 0
119                   WHERE place_id IN ({})
120                """.format(','.join((str(i) for i in ids)))
121
122
123 def _analyse_db_if(conn, condition):
124     if condition:
125         with conn.cursor() as cur:
126             cur.execute('ANALYSE')
127
128
129 class Indexer:
130     """ Main indexing routine.
131     """
132
133     def __init__(self, dsn, num_threads):
134         self.dsn = dsn
135         self.num_threads = num_threads
136         self.conn = None
137         self.threads = []
138
139
140     def _setup_connections(self):
141         self.conn = psycopg2.connect(self.dsn)
142         self.threads = [DBConnection(self.dsn) for _ in range(self.num_threads)]
143
144
145     def _close_connections(self):
146         if self.conn:
147             self.conn.close()
148             self.conn = None
149
150         for thread in self.threads:
151             thread.close()
152         self.threads = []
153
154
155     def index_full(self, analyse=True):
156         """ Index the complete database. This will first index boudnaries
157             followed by all other objects. When `analyse` is True, then the
158             database will be analysed at the appropriate places to
159             ensure that database statistics are updated.
160         """
161         conn = psycopg2.connect(self.dsn)
162
163         try:
164             self.index_by_rank(0, 4)
165             _analyse_db_if(conn, analyse)
166
167             self.index_boundaries(0, 30)
168             _analyse_db_if(conn, analyse)
169
170             self.index_by_rank(5, 25)
171             _analyse_db_if(conn, analyse)
172
173             self.index_by_rank(26, 30)
174             _analyse_db_if(conn, analyse)
175
176             self.index_postcodes()
177             _analyse_db_if(conn, analyse)
178         finally:
179             conn.close()
180
181
182     def index_boundaries(self, minrank, maxrank):
183         """ Index only administrative boundaries within the given rank range.
184         """
185         LOG.warning("Starting indexing boundaries using %s threads",
186                     self.num_threads)
187
188         self._setup_connections()
189
190         try:
191             for rank in range(max(minrank, 4), min(maxrank, 26)):
192                 self.index(BoundaryRunner(rank))
193         finally:
194             self._close_connections()
195
196     def index_by_rank(self, minrank, maxrank):
197         """ Index all entries of placex in the given rank range (inclusive)
198             in order of their address rank.
199
200             When rank 30 is requested then also interpolations and
201             places with address rank 0 will be indexed.
202         """
203         maxrank = min(maxrank, 30)
204         LOG.warning("Starting indexing rank (%i to %i) using %i threads",
205                     minrank, maxrank, self.num_threads)
206
207         self._setup_connections()
208
209         try:
210             for rank in range(max(1, minrank), maxrank):
211                 self.index(RankRunner(rank))
212
213             if maxrank == 30:
214                 self.index(RankRunner(0))
215                 self.index(InterpolationRunner(), 20)
216                 self.index(RankRunner(30), 20)
217             else:
218                 self.index(RankRunner(maxrank))
219         finally:
220             self._close_connections()
221
222
223     def index_postcodes(self):
224         """Index the entries ofthe location_postcode table.
225         """
226         LOG.warning("Starting indexing postcodes using %s threads", self.num_threads)
227
228         self._setup_connections()
229
230         try:
231             self.index(PostcodeRunner(), 20)
232         finally:
233             self._close_connections()
234
235     def update_status_table(self):
236         """ Update the status in the status table to 'indexed'.
237         """
238         conn = psycopg2.connect(self.dsn)
239
240         try:
241             with conn.cursor() as cur:
242                 cur.execute('UPDATE import_status SET indexed = true')
243
244             conn.commit()
245         finally:
246             conn.close()
247
248     def index(self, obj, batch=1):
249         """ Index a single rank or table. `obj` describes the SQL to use
250             for indexing. `batch` describes the number of objects that
251             should be processed with a single SQL statement
252         """
253         LOG.warning("Starting %s (using batch size %s)", obj.name(), batch)
254
255         cur = self.conn.cursor()
256         cur.execute(obj.sql_count_objects())
257
258         total_tuples = cur.fetchone()[0]
259         LOG.debug("Total number of rows: %i", total_tuples)
260
261         cur.close()
262
263         progress = ProgressLogger(obj.name(), total_tuples)
264
265         if total_tuples > 0:
266             cur = self.conn.cursor(name='places')
267             cur.execute(obj.sql_get_objects())
268
269             next_thread = self.find_free_thread()
270             while True:
271                 places = [p[0] for p in cur.fetchmany(batch)]
272                 if not places:
273                     break
274
275                 LOG.debug("Processing places: %s", str(places))
276                 thread = next(next_thread)
277
278                 thread.perform(obj.sql_index_place(places))
279                 progress.add(len(places))
280
281             cur.close()
282
283             for thread in self.threads:
284                 thread.wait()
285
286         progress.done()
287
288     def find_free_thread(self):
289         """ Generator that returns the next connection that is free for
290             sending a query.
291         """
292         ready = self.threads
293         command_stat = 0
294
295         while True:
296             for thread in ready:
297                 if thread.is_done():
298                     command_stat += 1
299                     yield thread
300
301             # refresh the connections occasionaly to avoid potential
302             # memory leaks in Postgresql.
303             if command_stat > 100000:
304                 for thread in self.threads:
305                     while not thread.is_done():
306                         thread.wait()
307                     thread.connect()
308                 command_stat = 0
309                 ready = self.threads
310             else:
311                 ready, _, _ = select.select(self.threads, [], [])
312
313         assert False, "Unreachable code"