2 Main work horse for indexing (computing addresses) the database.
4 # pylint: disable=C0111
10 from .progress import ProgressLogger
11 from ..db.async_connection import DBConnection
13 LOG = logging.getLogger()
16 """ Returns SQL commands for indexing one rank within the placex table.
19 def __init__(self, rank):
23 return "rank {}".format(self.rank)
25 def sql_count_objects(self):
26 return """SELECT count(*) FROM placex
27 WHERE rank_address = {} and indexed_status > 0
30 def sql_get_objects(self):
31 return """SELECT place_id FROM placex
32 WHERE indexed_status > 0 and rank_address = {}
33 ORDER BY geometry_sector""".format(self.rank)
36 def sql_index_place(ids):
37 return "UPDATE placex SET indexed_status = 0 WHERE place_id IN ({})"\
38 .format(','.join((str(i) for i in ids)))
41 class InterpolationRunner:
42 """ Returns SQL commands for indexing the address interpolation table
43 location_property_osmline.
48 return "interpolation lines (location_property_osmline)"
51 def sql_count_objects():
52 return """SELECT count(*) FROM location_property_osmline
53 WHERE indexed_status > 0"""
56 def sql_get_objects():
57 return """SELECT place_id FROM location_property_osmline
58 WHERE indexed_status > 0
59 ORDER BY geometry_sector"""
62 def sql_index_place(ids):
63 return """UPDATE location_property_osmline
64 SET indexed_status = 0 WHERE place_id IN ({})
65 """.format(','.join((str(i) for i in ids)))
68 """ Returns SQL commands for indexing the administrative boundaries
72 def __init__(self, rank):
76 return "boundaries rank {}".format(self.rank)
78 def sql_count_objects(self):
79 return """SELECT count(*) FROM placex
80 WHERE indexed_status > 0
82 AND class = 'boundary' and type = 'administrative'
85 def sql_get_objects(self):
86 return """SELECT place_id FROM placex
87 WHERE indexed_status > 0 and rank_search = {}
88 and class = 'boundary' and type = 'administrative'
89 ORDER BY partition, admin_level
93 def sql_index_place(ids):
94 return "UPDATE placex SET indexed_status = 0 WHERE place_id IN ({})"\
95 .format(','.join((str(i) for i in ids)))
99 """ Provides the SQL commands for indexing the location_postcode table.
104 return "postcodes (location_postcode)"
107 def sql_count_objects():
108 return 'SELECT count(*) FROM location_postcode WHERE indexed_status > 0'
111 def sql_get_objects():
112 return """SELECT place_id FROM location_postcode
113 WHERE indexed_status > 0
114 ORDER BY country_code, postcode"""
117 def sql_index_place(ids):
118 return """UPDATE location_postcode SET indexed_status = 0
119 WHERE place_id IN ({})
120 """.format(','.join((str(i) for i in ids)))
123 """ Main indexing routine.
126 def __init__(self, dsn, num_threads):
128 self.num_threads = num_threads
133 def _setup_connections(self):
134 self.conn = psycopg2.connect(self.dsn)
135 self.threads = [DBConnection(self.dsn) for _ in range(self.num_threads)]
138 def _close_connections(self):
143 for thread in self.threads:
148 def index_full(self, analyse=True):
149 """ Index the complete database. This will first index boudnaries
150 followed by all other objects. When `analyse` is True, then the
151 database will be analysed at the appropriate places to
152 ensure that database statistics are updated.
154 conn = psycopg2.connect(self.dsn)
157 self.index_by_rank(0, 4)
158 self._analyse_db_if(conn, analyse)
160 self.index_boundaries(0, 30)
161 self._analyse_db_if(conn, analyse)
163 self.index_by_rank(5, 25)
164 self._analyse_db_if(conn, analyse)
166 self.index_by_rank(26, 30)
167 self._analyse_db_if(conn, analyse)
169 self.index_postcodes()
170 self._analyse_db_if(conn, analyse)
174 def _analyse_db_if(self, conn, condition):
176 with conn.cursor() as cur:
177 cur.execute('ANALYSE')
179 def index_boundaries(self, minrank, maxrank):
180 """ Index only administrative boundaries within the given rank range.
182 LOG.warning("Starting indexing boundaries using %s threads",
185 self._setup_connections()
188 for rank in range(max(minrank, 4), min(maxrank, 26)):
189 self.index(BoundaryRunner(rank))
191 self._close_connections()
193 def index_by_rank(self, minrank, maxrank):
194 """ Index all entries of placex in the given rank range (inclusive)
195 in order of their address rank.
197 When rank 30 is requested then also interpolations and
198 places with address rank 0 will be indexed.
200 maxrank = min(maxrank, 30)
201 LOG.warning("Starting indexing rank (%i to %i) using %i threads",
202 minrank, maxrank, self.num_threads)
204 self._setup_connections()
207 for rank in range(max(1, minrank), maxrank):
208 self.index(RankRunner(rank))
211 self.index(RankRunner(0))
212 self.index(InterpolationRunner(), 20)
213 self.index(RankRunner(30), 20)
215 self.index(RankRunner(maxrank))
217 self._close_connections()
220 def index_postcodes(self):
221 """Index the entries ofthe location_postcode table.
223 LOG.warning("Starting indexing postcodes using %s threads", self.num_threads)
225 self._setup_connections()
228 self.index(PostcodeRunner(), 20)
230 self._close_connections()
232 def update_status_table(self):
233 """ Update the status in the status table to 'indexed'.
235 conn = psycopg2.connect(self.dsn)
238 with conn.cursor() as cur:
239 cur.execute('UPDATE import_status SET indexed = true')
245 def index(self, obj, batch=1):
246 """ Index a single rank or table. `obj` describes the SQL to use
247 for indexing. `batch` describes the number of objects that
248 should be processed with a single SQL statement
250 LOG.warning("Starting %s (using batch size %s)", obj.name(), batch)
252 cur = self.conn.cursor()
253 cur.execute(obj.sql_count_objects())
255 total_tuples = cur.fetchone()[0]
256 LOG.debug("Total number of rows: %i", total_tuples)
260 progress = ProgressLogger(obj.name(), total_tuples)
263 cur = self.conn.cursor(name='places')
264 cur.execute(obj.sql_get_objects())
266 next_thread = self.find_free_thread()
268 places = [p[0] for p in cur.fetchmany(batch)]
272 LOG.debug("Processing places: %s", str(places))
273 thread = next(next_thread)
275 thread.perform(obj.sql_index_place(places))
276 progress.add(len(places))
280 for thread in self.threads:
285 def find_free_thread(self):
286 """ Generator that returns the next connection that is free for
298 # refresh the connections occasionaly to avoid potential
299 # memory leaks in Postgresql.
300 if command_stat > 100000:
301 for thread in self.threads:
302 while not thread.is_done():
308 ready, _, _ = select.select(self.threads, [], [])
310 assert False, "Unreachable code"