2 Main work horse for indexing (computing addresses) the database.
7 from nominatim.indexer.progress import ProgressLogger
8 from nominatim.indexer import runners
9 from nominatim.db.async_connection import DBConnection
10 from nominatim.db.connection import connect
12 LOG = logging.getLogger()
15 """ A pool of asynchronous database connections.
17 The pool may be used as a context manager.
19 REOPEN_CONNECTIONS_AFTER = 100000
21 def __init__(self, dsn, pool_size):
22 self.threads = [DBConnection(dsn) for _ in range(pool_size)]
23 self.free_workers = self._yield_free_worker()
27 """ Wait for all connection to finish.
29 for thread in self.threads:
30 while not thread.is_done():
33 self.free_workers = self._yield_free_worker()
36 """ Close all connections and clear the pool.
38 for thread in self.threads:
41 self.free_workers = None
44 def next_free_worker(self):
45 """ Get the next free connection.
47 return next(self.free_workers)
50 def _yield_free_worker(self):
59 if command_stat > self.REOPEN_CONNECTIONS_AFTER:
60 for thread in self.threads:
61 while not thread.is_done():
67 _, ready, _ = select.select([], self.threads, [])
74 def __exit__(self, exc_type, exc_value, traceback):
79 """ Main indexing routine.
82 def __init__(self, dsn, tokenizer, num_threads):
84 self.tokenizer = tokenizer
85 self.num_threads = num_threads
88 def index_full(self, analyse=True):
89 """ Index the complete database. This will first index boudnaries
90 followed by all other objects. When `analyse` is True, then the
91 database will be analysed at the appropriate places to
92 ensure that database statistics are updated.
94 with connect(self.dsn) as conn:
95 conn.autocommit = True
99 with conn.cursor() as cur:
100 cur.execute('ANALYZE')
105 self.index_by_rank(0, 4)
108 self.index_boundaries(0, 30)
111 self.index_by_rank(5, 25)
114 self.index_by_rank(26, 30)
117 self.index_postcodes()
121 def index_boundaries(self, minrank, maxrank):
122 """ Index only administrative boundaries within the given rank range.
124 LOG.warning("Starting indexing boundaries using %s threads",
127 for rank in range(max(minrank, 4), min(maxrank, 26)):
128 self._index(runners.BoundaryRunner(rank))
130 def index_by_rank(self, minrank, maxrank):
131 """ Index all entries of placex in the given rank range (inclusive)
132 in order of their address rank.
134 When rank 30 is requested then also interpolations and
135 places with address rank 0 will be indexed.
137 maxrank = min(maxrank, 30)
138 LOG.warning("Starting indexing rank (%i to %i) using %i threads",
139 minrank, maxrank, self.num_threads)
141 for rank in range(max(1, minrank), maxrank):
142 self._index(runners.RankRunner(rank))
145 self._index(runners.RankRunner(0))
146 self._index(runners.InterpolationRunner(), 20)
147 self._index(runners.RankRunner(30), 20)
149 self._index(runners.RankRunner(maxrank))
152 def index_postcodes(self):
153 """Index the entries ofthe location_postcode table.
155 LOG.warning("Starting indexing postcodes using %s threads", self.num_threads)
157 self._index(runners.PostcodeRunner(), 20)
160 def update_status_table(self):
161 """ Update the status in the status table to 'indexed'.
163 with connect(self.dsn) as conn:
164 with conn.cursor() as cur:
165 cur.execute('UPDATE import_status SET indexed = true')
169 def _index(self, runner, batch=1):
170 """ Index a single rank or table. `runner` describes the SQL to use
171 for indexing. `batch` describes the number of objects that
172 should be processed with a single SQL statement
174 LOG.warning("Starting %s (using batch size %s)", runner.name(), batch)
176 with connect(self.dsn) as conn:
177 with conn.cursor() as cur:
178 total_tuples = cur.scalar(runner.sql_count_objects())
179 LOG.debug("Total number of rows: %i", total_tuples)
183 progress = ProgressLogger(runner.name(), total_tuples)
186 with conn.cursor(name='places') as cur:
187 cur.execute(runner.sql_get_objects())
189 with WorkerPool(self.dsn, self.num_threads) as pool:
191 places = [p for p in cur.fetchmany(batch)]
195 LOG.debug("Processing places: %s", str(places))
196 worker = pool.next_free_worker()
198 runner.index_places(worker, places)
199 progress.add(len(places))