2 Main work horse for indexing (computing addresses) the database.
9 from nominatim.indexer.progress import ProgressLogger
10 from nominatim.indexer import runners
11 from nominatim.db.async_connection import DBConnection
13 LOG = logging.getLogger()
17 """ Main indexing routine.
20 def __init__(self, dsn, num_threads):
22 self.num_threads = num_threads
27 def _setup_connections(self):
28 self.conn = psycopg2.connect(self.dsn)
29 self.threads = [DBConnection(self.dsn) for _ in range(self.num_threads)]
32 def _close_connections(self):
37 for thread in self.threads:
42 def index_full(self, analyse=True):
43 """ Index the complete database. This will first index boudnaries
44 followed by all other objects. When `analyse` is True, then the
45 database will be analysed at the appropriate places to
46 ensure that database statistics are updated.
48 with psycopg2.connect(self.dsn) as conn:
49 conn.autocommit = True
53 with conn.cursor() as cur:
54 cur.execute('ANALYSE')
59 self.index_by_rank(0, 4)
62 self.index_boundaries(0, 30)
65 self.index_by_rank(5, 25)
68 self.index_by_rank(26, 30)
71 self.index_postcodes()
75 def index_boundaries(self, minrank, maxrank):
76 """ Index only administrative boundaries within the given rank range.
78 LOG.warning("Starting indexing boundaries using %s threads",
81 self._setup_connections()
84 for rank in range(max(minrank, 4), min(maxrank, 26)):
85 self._index(runners.BoundaryRunner(rank))
87 self._close_connections()
89 def index_by_rank(self, minrank, maxrank):
90 """ Index all entries of placex in the given rank range (inclusive)
91 in order of their address rank.
93 When rank 30 is requested then also interpolations and
94 places with address rank 0 will be indexed.
96 maxrank = min(maxrank, 30)
97 LOG.warning("Starting indexing rank (%i to %i) using %i threads",
98 minrank, maxrank, self.num_threads)
100 self._setup_connections()
103 for rank in range(max(1, minrank), maxrank):
104 self._index(runners.RankRunner(rank))
107 self._index(runners.RankRunner(0))
108 self._index(runners.InterpolationRunner(), 20)
109 self._index(runners.RankRunner(30), 20)
111 self._index(runners.RankRunner(maxrank))
113 self._close_connections()
116 def index_postcodes(self):
117 """Index the entries ofthe location_postcode table.
119 LOG.warning("Starting indexing postcodes using %s threads", self.num_threads)
121 self._setup_connections()
124 self._index(runners.PostcodeRunner(), 20)
126 self._close_connections()
128 def update_status_table(self):
129 """ Update the status in the status table to 'indexed'.
131 conn = psycopg2.connect(self.dsn)
134 with conn.cursor() as cur:
135 cur.execute('UPDATE import_status SET indexed = true')
141 def _index(self, runner, batch=1):
142 """ Index a single rank or table. `runner` describes the SQL to use
143 for indexing. `batch` describes the number of objects that
144 should be processed with a single SQL statement
146 LOG.warning("Starting %s (using batch size %s)", runner.name(), batch)
148 cur = self.conn.cursor()
149 cur.execute(runner.sql_count_objects())
151 total_tuples = cur.fetchone()[0]
152 LOG.debug("Total number of rows: %i", total_tuples)
156 progress = ProgressLogger(runner.name(), total_tuples)
159 cur = self.conn.cursor(name='places')
160 cur.execute(runner.sql_get_objects())
162 next_thread = self.find_free_thread()
164 places = [p[0] for p in cur.fetchmany(batch)]
168 LOG.debug("Processing places: %s", str(places))
169 thread = next(next_thread)
171 thread.perform(runner.sql_index_place(places))
172 progress.add(len(places))
176 for thread in self.threads:
181 def find_free_thread(self):
182 """ Generator that returns the next connection that is free for
194 # refresh the connections occasionaly to avoid potential
195 # memory leaks in Postgresql.
196 if command_stat > 100000:
197 for thread in self.threads:
198 while not thread.is_done():
204 ready, _, _ = select.select(self.threads, [], [])
206 assert False, "Unreachable code"