]> git.openstreetmap.org Git - nominatim.git/blob - nominatim/indexer/indexer.py
Merge pull request #2475 from lonvia/catchup-mode
[nominatim.git] / nominatim / indexer / indexer.py
1 """
2 Main work horse for indexing (computing addresses) the database.
3 """
4 import logging
5 import time
6
7 import psycopg2.extras
8
9 from nominatim.indexer.progress import ProgressLogger
10 from nominatim.indexer import runners
11 from nominatim.db.async_connection import DBConnection, WorkerPool
12 from nominatim.db.connection import connect
13
14 LOG = logging.getLogger()
15
16
17 class PlaceFetcher:
18     """ Asynchronous connection that fetches place details for processing.
19     """
20     def __init__(self, dsn, setup_conn):
21         self.wait_time = 0
22         self.current_ids = None
23         self.conn = DBConnection(dsn, cursor_factory=psycopg2.extras.DictCursor)
24
25         with setup_conn.cursor() as cur:
26             # need to fetch those manually because register_hstore cannot
27             # fetch them on an asynchronous connection below.
28             hstore_oid = cur.scalar("SELECT 'hstore'::regtype::oid")
29             hstore_array_oid = cur.scalar("SELECT 'hstore[]'::regtype::oid")
30
31         psycopg2.extras.register_hstore(self.conn.conn, oid=hstore_oid,
32                                         array_oid=hstore_array_oid)
33
34     def close(self):
35         """ Close the underlying asynchronous connection.
36         """
37         if self.conn:
38             self.conn.close()
39             self.conn = None
40
41
42     def fetch_next_batch(self, cur, runner):
43         """ Send a request for the next batch of places.
44             If details for the places are required, they will be fetched
45             asynchronously.
46
47             Returns true if there is still data available.
48         """
49         ids = cur.fetchmany(100)
50
51         if not ids:
52             self.current_ids = None
53             return False
54
55         if hasattr(runner, 'get_place_details'):
56             runner.get_place_details(self.conn, ids)
57             self.current_ids = []
58         else:
59             self.current_ids = ids
60
61         return True
62
63     def get_batch(self):
64         """ Get the next batch of data, previously requested with
65             `fetch_next_batch`.
66         """
67         if self.current_ids is not None and not self.current_ids:
68             tstart = time.time()
69             self.conn.wait()
70             self.wait_time += time.time() - tstart
71             self.current_ids = self.conn.cursor.fetchall()
72
73         return self.current_ids
74
75     def __enter__(self):
76         return self
77
78
79     def __exit__(self, exc_type, exc_value, traceback):
80         self.conn.wait()
81         self.close()
82
83
84 class Indexer:
85     """ Main indexing routine.
86     """
87
88     def __init__(self, dsn, tokenizer, num_threads):
89         self.dsn = dsn
90         self.tokenizer = tokenizer
91         self.num_threads = num_threads
92
93
94     def has_pending(self):
95         """ Check if any data still needs indexing.
96             This function must only be used after the import has finished.
97             Otherwise it will be very expensive.
98         """
99         with connect(self.dsn) as conn:
100             with conn.cursor() as cur:
101                 cur.execute("SELECT 'a' FROM placex WHERE indexed_status > 0 LIMIT 1")
102                 return cur.rowcount > 0
103
104
105     def index_full(self, analyse=True):
106         """ Index the complete database. This will first index boundaries
107             followed by all other objects. When `analyse` is True, then the
108             database will be analysed at the appropriate places to
109             ensure that database statistics are updated.
110         """
111         with connect(self.dsn) as conn:
112             conn.autocommit = True
113
114             def _analyze():
115                 if analyse:
116                     with conn.cursor() as cur:
117                         cur.execute('ANALYZE')
118
119             self.index_by_rank(0, 4)
120             _analyze()
121
122             self.index_boundaries(0, 30)
123             _analyze()
124
125             self.index_by_rank(5, 25)
126             _analyze()
127
128             self.index_by_rank(26, 30)
129             _analyze()
130
131             self.index_postcodes()
132             _analyze()
133
134
135     def index_boundaries(self, minrank, maxrank):
136         """ Index only administrative boundaries within the given rank range.
137         """
138         LOG.warning("Starting indexing boundaries using %s threads",
139                     self.num_threads)
140
141         with self.tokenizer.name_analyzer() as analyzer:
142             for rank in range(max(minrank, 4), min(maxrank, 26)):
143                 self._index(runners.BoundaryRunner(rank, analyzer))
144
145     def index_by_rank(self, minrank, maxrank):
146         """ Index all entries of placex in the given rank range (inclusive)
147             in order of their address rank.
148
149             When rank 30 is requested then also interpolations and
150             places with address rank 0 will be indexed.
151         """
152         maxrank = min(maxrank, 30)
153         LOG.warning("Starting indexing rank (%i to %i) using %i threads",
154                     minrank, maxrank, self.num_threads)
155
156         with self.tokenizer.name_analyzer() as analyzer:
157             for rank in range(max(1, minrank), maxrank):
158                 self._index(runners.RankRunner(rank, analyzer))
159
160             if maxrank == 30:
161                 self._index(runners.RankRunner(0, analyzer))
162                 self._index(runners.InterpolationRunner(analyzer), 20)
163                 self._index(runners.RankRunner(30, analyzer), 20)
164             else:
165                 self._index(runners.RankRunner(maxrank, analyzer))
166
167
168     def index_postcodes(self):
169         """Index the entries ofthe location_postcode table.
170         """
171         LOG.warning("Starting indexing postcodes using %s threads", self.num_threads)
172
173         self._index(runners.PostcodeRunner(), 20)
174
175
176     def update_status_table(self):
177         """ Update the status in the status table to 'indexed'.
178         """
179         with connect(self.dsn) as conn:
180             with conn.cursor() as cur:
181                 cur.execute('UPDATE import_status SET indexed = true')
182
183             conn.commit()
184
185     def _index(self, runner, batch=1):
186         """ Index a single rank or table. `runner` describes the SQL to use
187             for indexing. `batch` describes the number of objects that
188             should be processed with a single SQL statement
189         """
190         LOG.warning("Starting %s (using batch size %s)", runner.name(), batch)
191
192         with connect(self.dsn) as conn:
193             psycopg2.extras.register_hstore(conn)
194             with conn.cursor() as cur:
195                 total_tuples = cur.scalar(runner.sql_count_objects())
196                 LOG.debug("Total number of rows: %i", total_tuples)
197
198             conn.commit()
199
200             progress = ProgressLogger(runner.name(), total_tuples)
201
202             if total_tuples > 0:
203                 with conn.cursor(name='places') as cur:
204                     cur.execute(runner.sql_get_objects())
205
206                     with PlaceFetcher(self.dsn, conn) as fetcher:
207                         with WorkerPool(self.dsn, self.num_threads) as pool:
208                             has_more = fetcher.fetch_next_batch(cur, runner)
209                             while has_more:
210                                 places = fetcher.get_batch()
211
212                                 # asynchronously get the next batch
213                                 has_more = fetcher.fetch_next_batch(cur, runner)
214
215                                 # And insert the curent batch
216                                 for idx in range(0, len(places), batch):
217                                     part = places[idx:idx + batch]
218                                     LOG.debug("Processing places: %s", str(part))
219                                     runner.index_places(pool.next_free_worker(), part)
220                                     progress.add(len(part))
221
222                             LOG.info("Wait time: fetcher: %.2fs,  pool: %.2fs",
223                                      fetcher.wait_time, pool.wait_time)
224
225                 conn.commit()
226
227         progress.done()