]> git.openstreetmap.org Git - nominatim.git/blob - nominatim/indexer/indexer.py
indexer: move runner into separate file
[nominatim.git] / nominatim / indexer / indexer.py
1 """
2 Main work horse for indexing (computing addresses) the database.
3 """
4 import logging
5 import select
6
7 import psycopg2
8
9 from nominatim.indexer.progress import ProgressLogger
10 from nominatim.indexer import runners
11 from nominatim.db.async_connection import DBConnection
12
13 LOG = logging.getLogger()
14
15
16 def _analyse_db_if(conn, condition):
17     if condition:
18         with conn.cursor() as cur:
19             cur.execute('ANALYSE')
20
21
22 class Indexer:
23     """ Main indexing routine.
24     """
25
26     def __init__(self, dsn, num_threads):
27         self.dsn = dsn
28         self.num_threads = num_threads
29         self.conn = None
30         self.threads = []
31
32
33     def _setup_connections(self):
34         self.conn = psycopg2.connect(self.dsn)
35         self.threads = [DBConnection(self.dsn) for _ in range(self.num_threads)]
36
37
38     def _close_connections(self):
39         if self.conn:
40             self.conn.close()
41             self.conn = None
42
43         for thread in self.threads:
44             thread.close()
45         self.threads = []
46
47
48     def index_full(self, analyse=True):
49         """ Index the complete database. This will first index boudnaries
50             followed by all other objects. When `analyse` is True, then the
51             database will be analysed at the appropriate places to
52             ensure that database statistics are updated.
53         """
54         conn = psycopg2.connect(self.dsn)
55         conn.autocommit = True
56
57         try:
58             self.index_by_rank(0, 4)
59             _analyse_db_if(conn, analyse)
60
61             self.index_boundaries(0, 30)
62             _analyse_db_if(conn, analyse)
63
64             self.index_by_rank(5, 25)
65             _analyse_db_if(conn, analyse)
66
67             self.index_by_rank(26, 30)
68             _analyse_db_if(conn, analyse)
69
70             self.index_postcodes()
71             _analyse_db_if(conn, analyse)
72         finally:
73             conn.close()
74
75
76     def index_boundaries(self, minrank, maxrank):
77         """ Index only administrative boundaries within the given rank range.
78         """
79         LOG.warning("Starting indexing boundaries using %s threads",
80                     self.num_threads)
81
82         self._setup_connections()
83
84         try:
85             for rank in range(max(minrank, 4), min(maxrank, 26)):
86                 self.index(runners.BoundaryRunner(rank))
87         finally:
88             self._close_connections()
89
90     def index_by_rank(self, minrank, maxrank):
91         """ Index all entries of placex in the given rank range (inclusive)
92             in order of their address rank.
93
94             When rank 30 is requested then also interpolations and
95             places with address rank 0 will be indexed.
96         """
97         maxrank = min(maxrank, 30)
98         LOG.warning("Starting indexing rank (%i to %i) using %i threads",
99                     minrank, maxrank, self.num_threads)
100
101         self._setup_connections()
102
103         try:
104             for rank in range(max(1, minrank), maxrank):
105                 self.index(runners.RankRunner(rank))
106
107             if maxrank == 30:
108                 self.index(runners.RankRunner(0))
109                 self.index(runners.InterpolationRunner(), 20)
110                 self.index(runners.RankRunner(30), 20)
111             else:
112                 self.index(runners.RankRunner(maxrank))
113         finally:
114             self._close_connections()
115
116
117     def index_postcodes(self):
118         """Index the entries ofthe location_postcode table.
119         """
120         LOG.warning("Starting indexing postcodes using %s threads", self.num_threads)
121
122         self._setup_connections()
123
124         try:
125             self.index(runners.PostcodeRunner(), 20)
126         finally:
127             self._close_connections()
128
129     def update_status_table(self):
130         """ Update the status in the status table to 'indexed'.
131         """
132         conn = psycopg2.connect(self.dsn)
133
134         try:
135             with conn.cursor() as cur:
136                 cur.execute('UPDATE import_status SET indexed = true')
137
138             conn.commit()
139         finally:
140             conn.close()
141
142     def index(self, obj, batch=1):
143         """ Index a single rank or table. `obj` describes the SQL to use
144             for indexing. `batch` describes the number of objects that
145             should be processed with a single SQL statement
146         """
147         LOG.warning("Starting %s (using batch size %s)", obj.name(), batch)
148
149         cur = self.conn.cursor()
150         cur.execute(obj.sql_count_objects())
151
152         total_tuples = cur.fetchone()[0]
153         LOG.debug("Total number of rows: %i", total_tuples)
154
155         cur.close()
156
157         progress = ProgressLogger(obj.name(), total_tuples)
158
159         if total_tuples > 0:
160             cur = self.conn.cursor(name='places')
161             cur.execute(obj.sql_get_objects())
162
163             next_thread = self.find_free_thread()
164             while True:
165                 places = [p[0] for p in cur.fetchmany(batch)]
166                 if not places:
167                     break
168
169                 LOG.debug("Processing places: %s", str(places))
170                 thread = next(next_thread)
171
172                 thread.perform(obj.sql_index_place(places))
173                 progress.add(len(places))
174
175             cur.close()
176
177             for thread in self.threads:
178                 thread.wait()
179
180         progress.done()
181
182     def find_free_thread(self):
183         """ Generator that returns the next connection that is free for
184             sending a query.
185         """
186         ready = self.threads
187         command_stat = 0
188
189         while True:
190             for thread in ready:
191                 if thread.is_done():
192                     command_stat += 1
193                     yield thread
194
195             # refresh the connections occasionaly to avoid potential
196             # memory leaks in Postgresql.
197             if command_stat > 100000:
198                 for thread in self.threads:
199                     while not thread.is_done():
200                         thread.wait()
201                     thread.connect()
202                 command_stat = 0
203                 ready = self.threads
204             else:
205                 ready, _, _ = select.select(self.threads, [], [])
206
207         assert False, "Unreachable code"