]> git.openstreetmap.org Git - nominatim.git/blob - nominatim/indexer/indexer.py
indexer: make self.conn function-local
[nominatim.git] / nominatim / indexer / indexer.py
1 """
2 Main work horse for indexing (computing addresses) the database.
3 """
4 import logging
5 import select
6
7 from nominatim.indexer.progress import ProgressLogger
8 from nominatim.indexer import runners
9 from nominatim.db.async_connection import DBConnection
10 from nominatim.db.connection import connect
11
12 LOG = logging.getLogger()
13
14
15 class Indexer:
16     """ Main indexing routine.
17     """
18
19     def __init__(self, dsn, num_threads):
20         self.dsn = dsn
21         self.num_threads = num_threads
22         self.threads = []
23
24
25     def _setup_connections(self):
26         self.threads = [DBConnection(self.dsn) for _ in range(self.num_threads)]
27
28
29     def _close_connections(self):
30         for thread in self.threads:
31             thread.close()
32         self.threads = []
33
34
35     def index_full(self, analyse=True):
36         """ Index the complete database. This will first index boudnaries
37             followed by all other objects. When `analyse` is True, then the
38             database will be analysed at the appropriate places to
39             ensure that database statistics are updated.
40         """
41         with connect(self.dsn) as conn:
42             conn.autocommit = True
43
44             if analyse:
45                 def _analyse():
46                     with conn.cursor() as cur:
47                         cur.execute('ANALYSE')
48             else:
49                 def _analyse():
50                     pass
51
52             self.index_by_rank(0, 4)
53             _analyse()
54
55             self.index_boundaries(0, 30)
56             _analyse()
57
58             self.index_by_rank(5, 25)
59             _analyse()
60
61             self.index_by_rank(26, 30)
62             _analyse()
63
64             self.index_postcodes()
65             _analyse()
66
67
68     def index_boundaries(self, minrank, maxrank):
69         """ Index only administrative boundaries within the given rank range.
70         """
71         LOG.warning("Starting indexing boundaries using %s threads",
72                     self.num_threads)
73
74         self._setup_connections()
75
76         try:
77             for rank in range(max(minrank, 4), min(maxrank, 26)):
78                 self._index(runners.BoundaryRunner(rank))
79         finally:
80             self._close_connections()
81
82     def index_by_rank(self, minrank, maxrank):
83         """ Index all entries of placex in the given rank range (inclusive)
84             in order of their address rank.
85
86             When rank 30 is requested then also interpolations and
87             places with address rank 0 will be indexed.
88         """
89         maxrank = min(maxrank, 30)
90         LOG.warning("Starting indexing rank (%i to %i) using %i threads",
91                     minrank, maxrank, self.num_threads)
92
93         self._setup_connections()
94
95         try:
96             for rank in range(max(1, minrank), maxrank):
97                 self._index(runners.RankRunner(rank))
98
99             if maxrank == 30:
100                 self._index(runners.RankRunner(0))
101                 self._index(runners.InterpolationRunner(), 20)
102                 self._index(runners.RankRunner(30), 20)
103             else:
104                 self._index(runners.RankRunner(maxrank))
105         finally:
106             self._close_connections()
107
108
109     def index_postcodes(self):
110         """Index the entries ofthe location_postcode table.
111         """
112         LOG.warning("Starting indexing postcodes using %s threads", self.num_threads)
113
114         self._setup_connections()
115
116         try:
117             self._index(runners.PostcodeRunner(), 20)
118         finally:
119             self._close_connections()
120
121     def update_status_table(self):
122         """ Update the status in the status table to 'indexed'.
123         """
124         with connect(self.dsn) as conn:
125             with conn.cursor() as cur:
126                 cur.execute('UPDATE import_status SET indexed = true')
127
128             conn.commit()
129
130     def _index(self, runner, batch=1):
131         """ Index a single rank or table. `runner` describes the SQL to use
132             for indexing. `batch` describes the number of objects that
133             should be processed with a single SQL statement
134         """
135         LOG.warning("Starting %s (using batch size %s)", runner.name(), batch)
136
137         with connect(self.dsn) as conn:
138             with conn.cursor() as cur:
139                 total_tuples = cur.scalar(runner.sql_count_objects())
140                 LOG.debug("Total number of rows: %i", total_tuples)
141
142             conn.commit()
143
144             progress = ProgressLogger(runner.name(), total_tuples)
145
146             if total_tuples > 0:
147                 with conn.cursor(name='places') as cur:
148                     cur.execute(runner.sql_get_objects())
149
150                     next_thread = self.find_free_thread()
151                     while True:
152                         places = [p[0] for p in cur.fetchmany(batch)]
153                         if not places:
154                             break
155
156                         LOG.debug("Processing places: %s", str(places))
157                         thread = next(next_thread)
158
159                         thread.perform(runner.sql_index_place(places))
160                         progress.add(len(places))
161
162             conn.commit()
163
164         for thread in self.threads:
165             thread.wait()
166
167         progress.done()
168
169     def find_free_thread(self):
170         """ Generator that returns the next connection that is free for
171             sending a query.
172         """
173         ready = self.threads
174         command_stat = 0
175
176         while True:
177             for thread in ready:
178                 if thread.is_done():
179                     command_stat += 1
180                     yield thread
181
182             # refresh the connections occasionaly to avoid potential
183             # memory leaks in Postgresql.
184             if command_stat > 100000:
185                 for thread in self.threads:
186                     while not thread.is_done():
187                         thread.wait()
188                     thread.connect()
189                 command_stat = 0
190                 ready = self.threads
191             else:
192                 ready, _, _ = select.select(self.threads, [], [])
193
194         assert False, "Unreachable code"