]> git.openstreetmap.org Git - nominatim.git/blob - nominatim/indexer/indexer.py
factor out async connection handling into separate class
[nominatim.git] / nominatim / indexer / indexer.py
1 """
2 Main work horse for indexing (computing addresses) the database.
3 """
4 import logging
5 import select
6
7 from nominatim.indexer.progress import ProgressLogger
8 from nominatim.indexer import runners
9 from nominatim.db.async_connection import DBConnection
10 from nominatim.db.connection import connect
11
12 LOG = logging.getLogger()
13
14 class WorkerPool:
15     """ A pool of asynchronous database connections.
16
17         The pool may be used as a context manager.
18     """
19     REOPEN_CONNECTIONS_AFTER = 100000
20
21     def __init__(self, dsn, pool_size):
22         self.threads = [DBConnection(dsn) for _ in range(pool_size)]
23         self.free_workers = self._yield_free_worker()
24
25
26     def finish_all(self):
27         """ Wait for all connection to finish.
28         """
29         for thread in self.threads:
30             while not thread.is_done():
31                 thread.wait()
32
33         self.free_workers = self._yield_free_worker()
34
35     def close(self):
36         """ Close all connections and clear the pool.
37         """
38         for thread in self.threads:
39             thread.close()
40         self.threads = []
41         self.free_workers = None
42
43
44     def next_free_worker(self):
45         """ Get the next free connection.
46         """
47         return next(self.free_workers)
48
49
50     def _yield_free_worker(self):
51         ready = self.threads
52         command_stat = 0
53         while True:
54             for thread in ready:
55                 if thread.is_done():
56                     command_stat += 1
57                     yield thread
58
59             if command_stat > self.REOPEN_CONNECTIONS_AFTER:
60                 for thread in self.threads:
61                     while not thread.is_done():
62                         thread.wait()
63                     thread.connect()
64                 ready = self.threads
65             else:
66                 _, ready, _ = select.select([], self.threads, [])
67
68
69     def __enter__(self):
70         return self
71
72
73     def __exit__(self, exc_type, exc_value, traceback):
74         self.close()
75
76
77 class Indexer:
78     """ Main indexing routine.
79     """
80
81     def __init__(self, dsn, num_threads):
82         self.dsn = dsn
83         self.num_threads = num_threads
84
85
86     def index_full(self, analyse=True):
87         """ Index the complete database. This will first index boudnaries
88             followed by all other objects. When `analyse` is True, then the
89             database will be analysed at the appropriate places to
90             ensure that database statistics are updated.
91         """
92         with connect(self.dsn) as conn:
93             conn.autocommit = True
94
95             if analyse:
96                 def _analyze():
97                     with conn.cursor() as cur:
98                         cur.execute('ANALYZE')
99             else:
100                 def _analyze():
101                     pass
102
103             self.index_by_rank(0, 4)
104             _analyze()
105
106             self.index_boundaries(0, 30)
107             _analyze()
108
109             self.index_by_rank(5, 25)
110             _analyze()
111
112             self.index_by_rank(26, 30)
113             _analyze()
114
115             self.index_postcodes()
116             _analyze()
117
118
119     def index_boundaries(self, minrank, maxrank):
120         """ Index only administrative boundaries within the given rank range.
121         """
122         LOG.warning("Starting indexing boundaries using %s threads",
123                     self.num_threads)
124
125         for rank in range(max(minrank, 4), min(maxrank, 26)):
126             self._index(runners.BoundaryRunner(rank))
127
128     def index_by_rank(self, minrank, maxrank):
129         """ Index all entries of placex in the given rank range (inclusive)
130             in order of their address rank.
131
132             When rank 30 is requested then also interpolations and
133             places with address rank 0 will be indexed.
134         """
135         maxrank = min(maxrank, 30)
136         LOG.warning("Starting indexing rank (%i to %i) using %i threads",
137                     minrank, maxrank, self.num_threads)
138
139         for rank in range(max(1, minrank), maxrank):
140             self._index(runners.RankRunner(rank))
141
142         if maxrank == 30:
143             self._index(runners.RankRunner(0))
144             self._index(runners.InterpolationRunner(), 20)
145             self._index(runners.RankRunner(30), 20)
146         else:
147             self._index(runners.RankRunner(maxrank))
148
149
150     def index_postcodes(self):
151         """Index the entries ofthe location_postcode table.
152         """
153         LOG.warning("Starting indexing postcodes using %s threads", self.num_threads)
154
155         self._index(runners.PostcodeRunner(), 20)
156
157
158     def update_status_table(self):
159         """ Update the status in the status table to 'indexed'.
160         """
161         with connect(self.dsn) as conn:
162             with conn.cursor() as cur:
163                 cur.execute('UPDATE import_status SET indexed = true')
164
165             conn.commit()
166
167     def _index(self, runner, batch=1):
168         """ Index a single rank or table. `runner` describes the SQL to use
169             for indexing. `batch` describes the number of objects that
170             should be processed with a single SQL statement
171         """
172         LOG.warning("Starting %s (using batch size %s)", runner.name(), batch)
173
174         with connect(self.dsn) as conn:
175             with conn.cursor() as cur:
176                 total_tuples = cur.scalar(runner.sql_count_objects())
177                 LOG.debug("Total number of rows: %i", total_tuples)
178
179             conn.commit()
180
181             progress = ProgressLogger(runner.name(), total_tuples)
182
183             if total_tuples > 0:
184                 with conn.cursor(name='places') as cur:
185                     cur.execute(runner.sql_get_objects())
186
187                     with WorkerPool(self.dsn, self.num_threads) as pool:
188                         while True:
189                             places = [p[0] for p in cur.fetchmany(batch)]
190                             if not places:
191                                 break
192
193                             LOG.debug("Processing places: %s", str(places))
194                             worker = pool.next_free_worker()
195
196                             worker.perform(runner.sql_index_place(places))
197                             progress.add(len(places))
198
199                         pool.finish_all()
200
201                 conn.commit()
202
203         progress.done()