]> git.openstreetmap.org Git - nominatim.git/blob - nominatim/indexer/indexer.py
add consistent SPDX copyright headers
[nominatim.git] / nominatim / indexer / indexer.py
1 # SPDX-License-Identifier: GPL-2.0-only
2 #
3 # This file is part of Nominatim. (https://nominatim.org)
4 #
5 # Copyright (C) 2022 by the Nominatim developer community.
6 # For a full list of authors see the git log.
7 """
8 Main work horse for indexing (computing addresses) the database.
9 """
10 import logging
11 import time
12
13 import psycopg2.extras
14
15 from nominatim.indexer.progress import ProgressLogger
16 from nominatim.indexer import runners
17 from nominatim.db.async_connection import DBConnection, WorkerPool
18 from nominatim.db.connection import connect
19
20 LOG = logging.getLogger()
21
22
23 class PlaceFetcher:
24     """ Asynchronous connection that fetches place details for processing.
25     """
26     def __init__(self, dsn, setup_conn):
27         self.wait_time = 0
28         self.current_ids = None
29         self.conn = DBConnection(dsn, cursor_factory=psycopg2.extras.DictCursor)
30
31         with setup_conn.cursor() as cur:
32             # need to fetch those manually because register_hstore cannot
33             # fetch them on an asynchronous connection below.
34             hstore_oid = cur.scalar("SELECT 'hstore'::regtype::oid")
35             hstore_array_oid = cur.scalar("SELECT 'hstore[]'::regtype::oid")
36
37         psycopg2.extras.register_hstore(self.conn.conn, oid=hstore_oid,
38                                         array_oid=hstore_array_oid)
39
40     def close(self):
41         """ Close the underlying asynchronous connection.
42         """
43         if self.conn:
44             self.conn.close()
45             self.conn = None
46
47
48     def fetch_next_batch(self, cur, runner):
49         """ Send a request for the next batch of places.
50             If details for the places are required, they will be fetched
51             asynchronously.
52
53             Returns true if there is still data available.
54         """
55         ids = cur.fetchmany(100)
56
57         if not ids:
58             self.current_ids = None
59             return False
60
61         if hasattr(runner, 'get_place_details'):
62             runner.get_place_details(self.conn, ids)
63             self.current_ids = []
64         else:
65             self.current_ids = ids
66
67         return True
68
69     def get_batch(self):
70         """ Get the next batch of data, previously requested with
71             `fetch_next_batch`.
72         """
73         if self.current_ids is not None and not self.current_ids:
74             tstart = time.time()
75             self.conn.wait()
76             self.wait_time += time.time() - tstart
77             self.current_ids = self.conn.cursor.fetchall()
78
79         return self.current_ids
80
81     def __enter__(self):
82         return self
83
84
85     def __exit__(self, exc_type, exc_value, traceback):
86         self.conn.wait()
87         self.close()
88
89
90 class Indexer:
91     """ Main indexing routine.
92     """
93
94     def __init__(self, dsn, tokenizer, num_threads):
95         self.dsn = dsn
96         self.tokenizer = tokenizer
97         self.num_threads = num_threads
98
99
100     def has_pending(self):
101         """ Check if any data still needs indexing.
102             This function must only be used after the import has finished.
103             Otherwise it will be very expensive.
104         """
105         with connect(self.dsn) as conn:
106             with conn.cursor() as cur:
107                 cur.execute("SELECT 'a' FROM placex WHERE indexed_status > 0 LIMIT 1")
108                 return cur.rowcount > 0
109
110
111     def index_full(self, analyse=True):
112         """ Index the complete database. This will first index boundaries
113             followed by all other objects. When `analyse` is True, then the
114             database will be analysed at the appropriate places to
115             ensure that database statistics are updated.
116         """
117         with connect(self.dsn) as conn:
118             conn.autocommit = True
119
120             def _analyze():
121                 if analyse:
122                     with conn.cursor() as cur:
123                         cur.execute('ANALYZE')
124
125             self.index_by_rank(0, 4)
126             _analyze()
127
128             self.index_boundaries(0, 30)
129             _analyze()
130
131             self.index_by_rank(5, 25)
132             _analyze()
133
134             self.index_by_rank(26, 30)
135             _analyze()
136
137             self.index_postcodes()
138             _analyze()
139
140
141     def index_boundaries(self, minrank, maxrank):
142         """ Index only administrative boundaries within the given rank range.
143         """
144         LOG.warning("Starting indexing boundaries using %s threads",
145                     self.num_threads)
146
147         with self.tokenizer.name_analyzer() as analyzer:
148             for rank in range(max(minrank, 4), min(maxrank, 26)):
149                 self._index(runners.BoundaryRunner(rank, analyzer))
150
151     def index_by_rank(self, minrank, maxrank):
152         """ Index all entries of placex in the given rank range (inclusive)
153             in order of their address rank.
154
155             When rank 30 is requested then also interpolations and
156             places with address rank 0 will be indexed.
157         """
158         maxrank = min(maxrank, 30)
159         LOG.warning("Starting indexing rank (%i to %i) using %i threads",
160                     minrank, maxrank, self.num_threads)
161
162         with self.tokenizer.name_analyzer() as analyzer:
163             for rank in range(max(1, minrank), maxrank):
164                 self._index(runners.RankRunner(rank, analyzer))
165
166             if maxrank == 30:
167                 self._index(runners.RankRunner(0, analyzer))
168                 self._index(runners.InterpolationRunner(analyzer), 20)
169                 self._index(runners.RankRunner(30, analyzer), 20)
170             else:
171                 self._index(runners.RankRunner(maxrank, analyzer))
172
173
174     def index_postcodes(self):
175         """Index the entries ofthe location_postcode table.
176         """
177         LOG.warning("Starting indexing postcodes using %s threads", self.num_threads)
178
179         self._index(runners.PostcodeRunner(), 20)
180
181
182     def update_status_table(self):
183         """ Update the status in the status table to 'indexed'.
184         """
185         with connect(self.dsn) as conn:
186             with conn.cursor() as cur:
187                 cur.execute('UPDATE import_status SET indexed = true')
188
189             conn.commit()
190
191     def _index(self, runner, batch=1):
192         """ Index a single rank or table. `runner` describes the SQL to use
193             for indexing. `batch` describes the number of objects that
194             should be processed with a single SQL statement
195         """
196         LOG.warning("Starting %s (using batch size %s)", runner.name(), batch)
197
198         with connect(self.dsn) as conn:
199             psycopg2.extras.register_hstore(conn)
200             with conn.cursor() as cur:
201                 total_tuples = cur.scalar(runner.sql_count_objects())
202                 LOG.debug("Total number of rows: %i", total_tuples)
203
204             conn.commit()
205
206             progress = ProgressLogger(runner.name(), total_tuples)
207
208             if total_tuples > 0:
209                 with conn.cursor(name='places') as cur:
210                     cur.execute(runner.sql_get_objects())
211
212                     with PlaceFetcher(self.dsn, conn) as fetcher:
213                         with WorkerPool(self.dsn, self.num_threads) as pool:
214                             has_more = fetcher.fetch_next_batch(cur, runner)
215                             while has_more:
216                                 places = fetcher.get_batch()
217
218                                 # asynchronously get the next batch
219                                 has_more = fetcher.fetch_next_batch(cur, runner)
220
221                                 # And insert the curent batch
222                                 for idx in range(0, len(places), batch):
223                                     part = places[idx:idx + batch]
224                                     LOG.debug("Processing places: %s", str(part))
225                                     runner.index_places(pool.next_free_worker(), part)
226                                     progress.add(len(part))
227
228                             LOG.info("Wait time: fetcher: %.2fs,  pool: %.2fs",
229                                      fetcher.wait_time, pool.wait_time)
230
231                 conn.commit()
232
233         progress.done()