2 Main work horse for indexing (computing addresses) the database.
7 from nominatim.indexer.progress import ProgressLogger
8 from nominatim.indexer import runners
9 from nominatim.db.async_connection import DBConnection
10 from nominatim.db.connection import connect
12 LOG = logging.getLogger()
15 """ A pool of asynchronous database connections.
17 The pool may be used as a context manager.
19 REOPEN_CONNECTIONS_AFTER = 100000
21 def __init__(self, dsn, pool_size):
22 self.threads = [DBConnection(dsn) for _ in range(pool_size)]
23 self.free_workers = self._yield_free_worker()
27 """ Wait for all connection to finish.
29 for thread in self.threads:
30 while not thread.is_done():
33 self.free_workers = self._yield_free_worker()
36 """ Close all connections and clear the pool.
38 for thread in self.threads:
41 self.free_workers = None
44 def next_free_worker(self):
45 """ Get the next free connection.
47 return next(self.free_workers)
50 def _yield_free_worker(self):
59 if command_stat > self.REOPEN_CONNECTIONS_AFTER:
60 for thread in self.threads:
61 while not thread.is_done():
66 _, ready, _ = select.select([], self.threads, [])
73 def __exit__(self, exc_type, exc_value, traceback):
78 """ Main indexing routine.
81 def __init__(self, dsn, num_threads):
83 self.num_threads = num_threads
86 def index_full(self, analyse=True):
87 """ Index the complete database. This will first index boudnaries
88 followed by all other objects. When `analyse` is True, then the
89 database will be analysed at the appropriate places to
90 ensure that database statistics are updated.
92 with connect(self.dsn) as conn:
93 conn.autocommit = True
97 with conn.cursor() as cur:
98 cur.execute('ANALYZE')
103 self.index_by_rank(0, 4)
106 self.index_boundaries(0, 30)
109 self.index_by_rank(5, 25)
112 self.index_by_rank(26, 30)
115 self.index_postcodes()
119 def index_boundaries(self, minrank, maxrank):
120 """ Index only administrative boundaries within the given rank range.
122 LOG.warning("Starting indexing boundaries using %s threads",
125 for rank in range(max(minrank, 4), min(maxrank, 26)):
126 self._index(runners.BoundaryRunner(rank))
128 def index_by_rank(self, minrank, maxrank):
129 """ Index all entries of placex in the given rank range (inclusive)
130 in order of their address rank.
132 When rank 30 is requested then also interpolations and
133 places with address rank 0 will be indexed.
135 maxrank = min(maxrank, 30)
136 LOG.warning("Starting indexing rank (%i to %i) using %i threads",
137 minrank, maxrank, self.num_threads)
139 for rank in range(max(1, minrank), maxrank):
140 self._index(runners.RankRunner(rank))
143 self._index(runners.RankRunner(0))
144 self._index(runners.InterpolationRunner(), 20)
145 self._index(runners.RankRunner(30), 20)
147 self._index(runners.RankRunner(maxrank))
150 def index_postcodes(self):
151 """Index the entries ofthe location_postcode table.
153 LOG.warning("Starting indexing postcodes using %s threads", self.num_threads)
155 self._index(runners.PostcodeRunner(), 20)
158 def update_status_table(self):
159 """ Update the status in the status table to 'indexed'.
161 with connect(self.dsn) as conn:
162 with conn.cursor() as cur:
163 cur.execute('UPDATE import_status SET indexed = true')
167 def _index(self, runner, batch=1):
168 """ Index a single rank or table. `runner` describes the SQL to use
169 for indexing. `batch` describes the number of objects that
170 should be processed with a single SQL statement
172 LOG.warning("Starting %s (using batch size %s)", runner.name(), batch)
174 with connect(self.dsn) as conn:
175 with conn.cursor() as cur:
176 total_tuples = cur.scalar(runner.sql_count_objects())
177 LOG.debug("Total number of rows: %i", total_tuples)
181 progress = ProgressLogger(runner.name(), total_tuples)
184 with conn.cursor(name='places') as cur:
185 cur.execute(runner.sql_get_objects())
187 with WorkerPool(self.dsn, self.num_threads) as pool:
189 places = [p[0] for p in cur.fetchmany(batch)]
193 LOG.debug("Processing places: %s", str(places))
194 worker = pool.next_free_worker()
196 worker.perform(runner.sql_index_place(places))
197 progress.add(len(places))