self.conn = make_connection(options)
self.threads = []
- self.poll = select.poll()
for i in range(options.threads):
t = IndexingThread(i, options)
self.threads.append(t)
- self.poll.register(t, select.EPOLLIN)
def run(self):
log.info("Starting indexing rank ({} to {}) using {} threads".format(
self.options.minrank, self.options.maxrank,
self.options.threads))
- for rank in range(self.options.minrank, 30):
+ for rank in range(self.options.minrank, min(self.options.maxrank, 30)):
self.index(RankRunner(rank))
if self.options.maxrank >= 30:
def index(self, obj):
log.info("Starting {}".format(obj.name()))
- cur = self.conn.cursor(name="main")
+ cur = self.conn.cursor(name='main')
cur.execute(obj.sql_index_sectors())
total_tuples = 0
next_thread = self.find_free_thread()
done_tuples = 0
rank_start_time = datetime.now()
+
+ sector_sql = obj.sql_sector_places()
+ index_sql = obj.sql_index_place()
+ min_grouped_tuples = total_tuples - len(self.threads) * 1000
for r in cur:
sector = r[0]
# Should we do the remaining ones together?
- do_all = total_tuples - done_tuples < len(self.threads) * 1000
+ do_all = done_tuples > min_grouped_tuples
pcur = self.conn.cursor(name='places')
if do_all:
pcur.execute(obj.sql_nosector_places())
else:
- pcur.execute(obj.sql_sector_places(), (sector, ))
+ pcur.execute(sector_sql, (sector, ))
for place in pcur:
place_id = place[0]
log.debug("Processing place {}".format(place_id))
thread = next(next_thread)
- thread.perform(obj.sql_index_place(), (place_id,))
+ thread.perform(index_sql, (place_id,))
done_tuples += 1
pcur.close()
rank_end_time = datetime.now()
diff_seconds = (rank_end_time-rank_start_time).total_seconds()
- log.info("Done {} in {} @ {} per second - FINISHED {}\n".format(
- done_tuples, int(diff_seconds),
+ log.info("Done {}/{} in {} @ {} per second - FINISHED {}\n".format(
+ done_tuples, total_tuples, int(diff_seconds),
done_tuples/diff_seconds, obj.name()))
def find_free_thread(self):
- thread_lookup = { t.fileno() : t for t in self.threads}
-
- done_fids = [ t.fileno() for t in self.threads ]
+ ready = self.threads
while True:
- for fid in done_fids:
- thread = thread_lookup[fid]
+ for thread in ready:
if thread.is_done():
yield thread
- else:
- print("not good", fid)
- done_fids = [ x[0] for x in self.poll.poll()]
+ ready, _, _ = select.select(self.threads, [], [])
assert(False, "Unreachable code")
def sql_sector_places(self):
return """SELECT place_id FROM placex
- WHERE indexed_status > 0 and geometry_sector = %s
- ORDER BY geometry_sector"""
+ WHERE indexed_status > 0 and rank_search = {}
+ and geometry_sector = %s""".format(self.rank)
def sql_index_place(self):
return "EXECUTE rnk_index(%s)"