]> git.openstreetmap.org Git - nominatim.git/blob - nominatim/indexer/indexer.py
move name token creation into tokenizer
[nominatim.git] / nominatim / indexer / indexer.py
1 """
2 Main work horse for indexing (computing addresses) the database.
3 """
4 import logging
5 import select
6
7 from nominatim.indexer.progress import ProgressLogger
8 from nominatim.indexer import runners
9 from nominatim.db.async_connection import DBConnection
10 from nominatim.db.connection import connect
11
12 LOG = logging.getLogger()
13
14 class WorkerPool:
15     """ A pool of asynchronous database connections.
16
17         The pool may be used as a context manager.
18     """
19     REOPEN_CONNECTIONS_AFTER = 100000
20
21     def __init__(self, dsn, pool_size):
22         self.threads = [DBConnection(dsn) for _ in range(pool_size)]
23         self.free_workers = self._yield_free_worker()
24
25
26     def finish_all(self):
27         """ Wait for all connection to finish.
28         """
29         for thread in self.threads:
30             while not thread.is_done():
31                 thread.wait()
32
33         self.free_workers = self._yield_free_worker()
34
35     def close(self):
36         """ Close all connections and clear the pool.
37         """
38         for thread in self.threads:
39             thread.close()
40         self.threads = []
41         self.free_workers = None
42
43
44     def next_free_worker(self):
45         """ Get the next free connection.
46         """
47         return next(self.free_workers)
48
49
50     def _yield_free_worker(self):
51         ready = self.threads
52         command_stat = 0
53         while True:
54             for thread in ready:
55                 if thread.is_done():
56                     command_stat += 1
57                     yield thread
58
59             if command_stat > self.REOPEN_CONNECTIONS_AFTER:
60                 for thread in self.threads:
61                     while not thread.is_done():
62                         thread.wait()
63                     thread.connect()
64                 ready = self.threads
65                 command_stat = 0
66             else:
67                 _, ready, _ = select.select([], self.threads, [])
68
69
70     def __enter__(self):
71         return self
72
73
74     def __exit__(self, exc_type, exc_value, traceback):
75         self.close()
76
77
78 class Indexer:
79     """ Main indexing routine.
80     """
81
82     def __init__(self, dsn, tokenizer, num_threads):
83         self.dsn = dsn
84         self.tokenizer = tokenizer
85         self.num_threads = num_threads
86
87
88     def index_full(self, analyse=True):
89         """ Index the complete database. This will first index boudnaries
90             followed by all other objects. When `analyse` is True, then the
91             database will be analysed at the appropriate places to
92             ensure that database statistics are updated.
93         """
94         with connect(self.dsn) as conn:
95             conn.autocommit = True
96
97             if analyse:
98                 def _analyze():
99                     with conn.cursor() as cur:
100                         cur.execute('ANALYZE')
101             else:
102                 def _analyze():
103                     pass
104
105             self.index_by_rank(0, 4)
106             _analyze()
107
108             self.index_boundaries(0, 30)
109             _analyze()
110
111             self.index_by_rank(5, 25)
112             _analyze()
113
114             self.index_by_rank(26, 30)
115             _analyze()
116
117             self.index_postcodes()
118             _analyze()
119
120
121     def index_boundaries(self, minrank, maxrank):
122         """ Index only administrative boundaries within the given rank range.
123         """
124         LOG.warning("Starting indexing boundaries using %s threads",
125                     self.num_threads)
126
127         with self.tokenizer.name_analyzer() as analyzer:
128             for rank in range(max(minrank, 4), min(maxrank, 26)):
129                 self._index(runners.BoundaryRunner(rank, analyzer))
130
131     def index_by_rank(self, minrank, maxrank):
132         """ Index all entries of placex in the given rank range (inclusive)
133             in order of their address rank.
134
135             When rank 30 is requested then also interpolations and
136             places with address rank 0 will be indexed.
137         """
138         maxrank = min(maxrank, 30)
139         LOG.warning("Starting indexing rank (%i to %i) using %i threads",
140                     minrank, maxrank, self.num_threads)
141
142         with self.tokenizer.name_analyzer() as analyzer:
143             for rank in range(max(1, minrank), maxrank):
144                 self._index(runners.RankRunner(rank, analyzer))
145
146             if maxrank == 30:
147                 self._index(runners.RankRunner(0, analyzer))
148                 self._index(runners.InterpolationRunner(), 20)
149                 self._index(runners.RankRunner(30, analyzer), 20)
150             else:
151                 self._index(runners.RankRunner(maxrank, analyzer))
152
153
154     def index_postcodes(self):
155         """Index the entries ofthe location_postcode table.
156         """
157         LOG.warning("Starting indexing postcodes using %s threads", self.num_threads)
158
159         self._index(runners.PostcodeRunner(), 20)
160
161
162     def update_status_table(self):
163         """ Update the status in the status table to 'indexed'.
164         """
165         with connect(self.dsn) as conn:
166             with conn.cursor() as cur:
167                 cur.execute('UPDATE import_status SET indexed = true')
168
169             conn.commit()
170
171     def _index(self, runner, batch=1):
172         """ Index a single rank or table. `runner` describes the SQL to use
173             for indexing. `batch` describes the number of objects that
174             should be processed with a single SQL statement
175         """
176         LOG.warning("Starting %s (using batch size %s)", runner.name(), batch)
177
178         with connect(self.dsn) as conn:
179             with conn.cursor() as cur:
180                 total_tuples = cur.scalar(runner.sql_count_objects())
181                 LOG.debug("Total number of rows: %i", total_tuples)
182
183             conn.commit()
184
185             progress = ProgressLogger(runner.name(), total_tuples)
186
187             if total_tuples > 0:
188                 with conn.cursor(name='places') as cur:
189                     cur.execute(runner.sql_get_objects())
190
191                     with WorkerPool(self.dsn, self.num_threads) as pool:
192                         while True:
193                             places = cur.fetchmany(batch)
194                             if not places:
195                                 break
196
197                             LOG.debug("Processing places: %s", str(places))
198                             worker = pool.next_free_worker()
199
200                             runner.index_places(worker, places)
201                             progress.add(len(places))
202
203                         pool.finish_all()
204
205                 conn.commit()
206
207         progress.done()