]> git.openstreetmap.org Git - nominatim.git/blob - nominatim/indexer/indexer.py
make index() function private
[nominatim.git] / nominatim / indexer / indexer.py
1 """
2 Main work horse for indexing (computing addresses) the database.
3 """
4 import logging
5 import select
6
7 import psycopg2
8
9 from nominatim.indexer.progress import ProgressLogger
10 from nominatim.indexer import runners
11 from nominatim.db.async_connection import DBConnection
12
13 LOG = logging.getLogger()
14
15
16 class Indexer:
17     """ Main indexing routine.
18     """
19
20     def __init__(self, dsn, num_threads):
21         self.dsn = dsn
22         self.num_threads = num_threads
23         self.conn = None
24         self.threads = []
25
26
27     def _setup_connections(self):
28         self.conn = psycopg2.connect(self.dsn)
29         self.threads = [DBConnection(self.dsn) for _ in range(self.num_threads)]
30
31
32     def _close_connections(self):
33         if self.conn:
34             self.conn.close()
35             self.conn = None
36
37         for thread in self.threads:
38             thread.close()
39         self.threads = []
40
41
42     def index_full(self, analyse=True):
43         """ Index the complete database. This will first index boudnaries
44             followed by all other objects. When `analyse` is True, then the
45             database will be analysed at the appropriate places to
46             ensure that database statistics are updated.
47         """
48         with psycopg2.connect(self.dsn) as conn:
49             conn.autocommit = True
50
51             if analyse:
52                 def _analyse():
53                     with conn.cursor() as cur:
54                         cur.execute('ANALYSE')
55             else:
56                 def _analyse():
57                     pass
58
59             self.index_by_rank(0, 4)
60             _analyse()
61
62             self.index_boundaries(0, 30)
63             _analyse()
64
65             self.index_by_rank(5, 25)
66             _analyse()
67
68             self.index_by_rank(26, 30)
69             _analyse()
70
71             self.index_postcodes()
72             _analyse()
73
74
75     def index_boundaries(self, minrank, maxrank):
76         """ Index only administrative boundaries within the given rank range.
77         """
78         LOG.warning("Starting indexing boundaries using %s threads",
79                     self.num_threads)
80
81         self._setup_connections()
82
83         try:
84             for rank in range(max(minrank, 4), min(maxrank, 26)):
85                 self._index(runners.BoundaryRunner(rank))
86         finally:
87             self._close_connections()
88
89     def index_by_rank(self, minrank, maxrank):
90         """ Index all entries of placex in the given rank range (inclusive)
91             in order of their address rank.
92
93             When rank 30 is requested then also interpolations and
94             places with address rank 0 will be indexed.
95         """
96         maxrank = min(maxrank, 30)
97         LOG.warning("Starting indexing rank (%i to %i) using %i threads",
98                     minrank, maxrank, self.num_threads)
99
100         self._setup_connections()
101
102         try:
103             for rank in range(max(1, minrank), maxrank):
104                 self._index(runners.RankRunner(rank))
105
106             if maxrank == 30:
107                 self._index(runners.RankRunner(0))
108                 self._index(runners.InterpolationRunner(), 20)
109                 self._index(runners.RankRunner(30), 20)
110             else:
111                 self._index(runners.RankRunner(maxrank))
112         finally:
113             self._close_connections()
114
115
116     def index_postcodes(self):
117         """Index the entries ofthe location_postcode table.
118         """
119         LOG.warning("Starting indexing postcodes using %s threads", self.num_threads)
120
121         self._setup_connections()
122
123         try:
124             self._index(runners.PostcodeRunner(), 20)
125         finally:
126             self._close_connections()
127
128     def update_status_table(self):
129         """ Update the status in the status table to 'indexed'.
130         """
131         conn = psycopg2.connect(self.dsn)
132
133         try:
134             with conn.cursor() as cur:
135                 cur.execute('UPDATE import_status SET indexed = true')
136
137             conn.commit()
138         finally:
139             conn.close()
140
141     def _index(self, runner, batch=1):
142         """ Index a single rank or table. `runner` describes the SQL to use
143             for indexing. `batch` describes the number of objects that
144             should be processed with a single SQL statement
145         """
146         LOG.warning("Starting %s (using batch size %s)", runner.name(), batch)
147
148         cur = self.conn.cursor()
149         cur.execute(runner.sql_count_objects())
150
151         total_tuples = cur.fetchone()[0]
152         LOG.debug("Total number of rows: %i", total_tuples)
153
154         cur.close()
155
156         progress = ProgressLogger(runner.name(), total_tuples)
157
158         if total_tuples > 0:
159             cur = self.conn.cursor(name='places')
160             cur.execute(runner.sql_get_objects())
161
162             next_thread = self.find_free_thread()
163             while True:
164                 places = [p[0] for p in cur.fetchmany(batch)]
165                 if not places:
166                     break
167
168                 LOG.debug("Processing places: %s", str(places))
169                 thread = next(next_thread)
170
171                 thread.perform(runner.sql_index_place(places))
172                 progress.add(len(places))
173
174             cur.close()
175
176             for thread in self.threads:
177                 thread.wait()
178
179         progress.done()
180
181     def find_free_thread(self):
182         """ Generator that returns the next connection that is free for
183             sending a query.
184         """
185         ready = self.threads
186         command_stat = 0
187
188         while True:
189             for thread in ready:
190                 if thread.is_done():
191                     command_stat += 1
192                     yield thread
193
194             # refresh the connections occasionaly to avoid potential
195             # memory leaks in Postgresql.
196             if command_stat > 100000:
197                 for thread in self.threads:
198                     while not thread.is_done():
199                         thread.wait()
200                     thread.connect()
201                 command_stat = 0
202                 ready = self.threads
203             else:
204                 ready, _, _ = select.select(self.threads, [], [])
205
206         assert False, "Unreachable code"