]> git.openstreetmap.org Git - nominatim.git/blob - nominatim/indexer/indexer.py
require tokeinzer for indexer
[nominatim.git] / nominatim / indexer / indexer.py
1 """
2 Main work horse for indexing (computing addresses) the database.
3 """
4 import logging
5 import select
6
7 from nominatim.indexer.progress import ProgressLogger
8 from nominatim.indexer import runners
9 from nominatim.db.async_connection import DBConnection
10 from nominatim.db.connection import connect
11
12 LOG = logging.getLogger()
13
14 class WorkerPool:
15     """ A pool of asynchronous database connections.
16
17         The pool may be used as a context manager.
18     """
19     REOPEN_CONNECTIONS_AFTER = 100000
20
21     def __init__(self, dsn, pool_size):
22         self.threads = [DBConnection(dsn) for _ in range(pool_size)]
23         self.free_workers = self._yield_free_worker()
24
25
26     def finish_all(self):
27         """ Wait for all connection to finish.
28         """
29         for thread in self.threads:
30             while not thread.is_done():
31                 thread.wait()
32
33         self.free_workers = self._yield_free_worker()
34
35     def close(self):
36         """ Close all connections and clear the pool.
37         """
38         for thread in self.threads:
39             thread.close()
40         self.threads = []
41         self.free_workers = None
42
43
44     def next_free_worker(self):
45         """ Get the next free connection.
46         """
47         return next(self.free_workers)
48
49
50     def _yield_free_worker(self):
51         ready = self.threads
52         command_stat = 0
53         while True:
54             for thread in ready:
55                 if thread.is_done():
56                     command_stat += 1
57                     yield thread
58
59             if command_stat > self.REOPEN_CONNECTIONS_AFTER:
60                 for thread in self.threads:
61                     while not thread.is_done():
62                         thread.wait()
63                     thread.connect()
64                 ready = self.threads
65                 command_stat = 0
66             else:
67                 _, ready, _ = select.select([], self.threads, [])
68
69
70     def __enter__(self):
71         return self
72
73
74     def __exit__(self, exc_type, exc_value, traceback):
75         self.close()
76
77
78 class Indexer:
79     """ Main indexing routine.
80     """
81
82     def __init__(self, dsn, tokenizer, num_threads):
83         self.dsn = dsn
84         self.tokenizer = tokenizer
85         self.num_threads = num_threads
86
87
88     def index_full(self, analyse=True):
89         """ Index the complete database. This will first index boudnaries
90             followed by all other objects. When `analyse` is True, then the
91             database will be analysed at the appropriate places to
92             ensure that database statistics are updated.
93         """
94         with connect(self.dsn) as conn:
95             conn.autocommit = True
96
97             if analyse:
98                 def _analyze():
99                     with conn.cursor() as cur:
100                         cur.execute('ANALYZE')
101             else:
102                 def _analyze():
103                     pass
104
105             self.index_by_rank(0, 4)
106             _analyze()
107
108             self.index_boundaries(0, 30)
109             _analyze()
110
111             self.index_by_rank(5, 25)
112             _analyze()
113
114             self.index_by_rank(26, 30)
115             _analyze()
116
117             self.index_postcodes()
118             _analyze()
119
120
121     def index_boundaries(self, minrank, maxrank):
122         """ Index only administrative boundaries within the given rank range.
123         """
124         LOG.warning("Starting indexing boundaries using %s threads",
125                     self.num_threads)
126
127         for rank in range(max(minrank, 4), min(maxrank, 26)):
128             self._index(runners.BoundaryRunner(rank))
129
130     def index_by_rank(self, minrank, maxrank):
131         """ Index all entries of placex in the given rank range (inclusive)
132             in order of their address rank.
133
134             When rank 30 is requested then also interpolations and
135             places with address rank 0 will be indexed.
136         """
137         maxrank = min(maxrank, 30)
138         LOG.warning("Starting indexing rank (%i to %i) using %i threads",
139                     minrank, maxrank, self.num_threads)
140
141         for rank in range(max(1, minrank), maxrank):
142             self._index(runners.RankRunner(rank))
143
144         if maxrank == 30:
145             self._index(runners.RankRunner(0))
146             self._index(runners.InterpolationRunner(), 20)
147             self._index(runners.RankRunner(30), 20)
148         else:
149             self._index(runners.RankRunner(maxrank))
150
151
152     def index_postcodes(self):
153         """Index the entries ofthe location_postcode table.
154         """
155         LOG.warning("Starting indexing postcodes using %s threads", self.num_threads)
156
157         self._index(runners.PostcodeRunner(), 20)
158
159
160     def update_status_table(self):
161         """ Update the status in the status table to 'indexed'.
162         """
163         with connect(self.dsn) as conn:
164             with conn.cursor() as cur:
165                 cur.execute('UPDATE import_status SET indexed = true')
166
167             conn.commit()
168
169     def _index(self, runner, batch=1):
170         """ Index a single rank or table. `runner` describes the SQL to use
171             for indexing. `batch` describes the number of objects that
172             should be processed with a single SQL statement
173         """
174         LOG.warning("Starting %s (using batch size %s)", runner.name(), batch)
175
176         with connect(self.dsn) as conn:
177             with conn.cursor() as cur:
178                 total_tuples = cur.scalar(runner.sql_count_objects())
179                 LOG.debug("Total number of rows: %i", total_tuples)
180
181             conn.commit()
182
183             progress = ProgressLogger(runner.name(), total_tuples)
184
185             if total_tuples > 0:
186                 with conn.cursor(name='places') as cur:
187                     cur.execute(runner.sql_get_objects())
188
189                     with WorkerPool(self.dsn, self.num_threads) as pool:
190                         while True:
191                             places = [p for p in cur.fetchmany(batch)]
192                             if not places:
193                                 break
194
195                             LOG.debug("Processing places: %s", str(places))
196                             worker = pool.next_free_worker()
197
198                             runner.index_places(worker, places)
199                             progress.add(len(places))
200
201                         pool.finish_all()
202
203                 conn.commit()
204
205         progress.done()