]> git.openstreetmap.org Git - nominatim.git/blob - nominatim/indexer/indexer.py
move indexing function into its own Python module
[nominatim.git] / nominatim / indexer / indexer.py
1 """
2 Main work horse for indexing (computing addresses) the database.
3 """
4 # pylint: disable=C0111
5 import logging
6 import select
7
8 from .progress import ProgressLogger
9 from db.async_connection import DBConnection, make_connection
10
11 LOG = logging.getLogger()
12
13 class RankRunner:
14     """ Returns SQL commands for indexing one rank within the placex table.
15     """
16
17     def __init__(self, rank):
18         self.rank = rank
19
20     def name(self):
21         return "rank {}".format(self.rank)
22
23     def sql_count_objects(self):
24         return """SELECT count(*) FROM placex
25                   WHERE rank_address = {} and indexed_status > 0
26                """.format(self.rank)
27
28     def sql_get_objects(self):
29         return """SELECT place_id FROM placex
30                   WHERE indexed_status > 0 and rank_address = {}
31                   ORDER BY geometry_sector""".format(self.rank)
32
33     @staticmethod
34     def sql_index_place(ids):
35         return "UPDATE placex SET indexed_status = 0 WHERE place_id IN ({})"\
36                .format(','.join((str(i) for i in ids)))
37
38
39 class InterpolationRunner:
40     """ Returns SQL commands for indexing the address interpolation table
41         location_property_osmline.
42     """
43
44     @staticmethod
45     def name():
46         return "interpolation lines (location_property_osmline)"
47
48     @staticmethod
49     def sql_count_objects():
50         return """SELECT count(*) FROM location_property_osmline
51                   WHERE indexed_status > 0"""
52
53     @staticmethod
54     def sql_get_objects():
55         return """SELECT place_id FROM location_property_osmline
56                   WHERE indexed_status > 0
57                   ORDER BY geometry_sector"""
58
59     @staticmethod
60     def sql_index_place(ids):
61         return """UPDATE location_property_osmline
62                   SET indexed_status = 0 WHERE place_id IN ({})"""\
63                .format(','.join((str(i) for i in ids)))
64
65 class BoundaryRunner:
66     """ Returns SQL commands for indexing the administrative boundaries
67         of a certain rank.
68     """
69
70     def __init__(self, rank):
71         self.rank = rank
72
73     def name(self):
74         return "boundaries rank {}".format(self.rank)
75
76     def sql_count_objects(self):
77         return """SELECT count(*) FROM placex
78                   WHERE indexed_status > 0
79                     AND rank_search = {}
80                     AND class = 'boundary' and type = 'administrative'""".format(self.rank)
81
82     def sql_get_objects(self):
83         return """SELECT place_id FROM placex
84                   WHERE indexed_status > 0 and rank_search = {}
85                         and class = 'boundary' and type = 'administrative'
86                   ORDER BY partition, admin_level""".format(self.rank)
87
88     @staticmethod
89     def sql_index_place(ids):
90         return "UPDATE placex SET indexed_status = 0 WHERE place_id IN ({})"\
91                .format(','.join((str(i) for i in ids)))
92
93 class Indexer:
94     """ Main indexing routine.
95     """
96
97     def __init__(self, opts):
98         self.minrank = max(1, opts.minrank)
99         self.maxrank = min(30, opts.maxrank)
100         self.conn = make_connection(opts)
101         self.threads = [DBConnection(opts) for _ in range(opts.threads)]
102
103     def index_boundaries(self):
104         LOG.warning("Starting indexing boundaries using %s threads",
105                     len(self.threads))
106
107         for rank in range(max(self.minrank, 5), min(self.maxrank, 26)):
108             self.index(BoundaryRunner(rank))
109
110     def index_by_rank(self):
111         """ Run classic indexing by rank.
112         """
113         LOG.warning("Starting indexing rank (%i to %i) using %i threads",
114                     self.minrank, self.maxrank, len(self.threads))
115
116         for rank in range(max(1, self.minrank), self.maxrank):
117             self.index(RankRunner(rank))
118
119         if self.maxrank == 30:
120             self.index(RankRunner(0))
121             self.index(InterpolationRunner(), 20)
122             self.index(RankRunner(self.maxrank), 20)
123         else:
124             self.index(RankRunner(self.maxrank))
125
126     def index(self, obj, batch=1):
127         """ Index a single rank or table. `obj` describes the SQL to use
128             for indexing. `batch` describes the number of objects that
129             should be processed with a single SQL statement
130         """
131         LOG.warning("Starting %s (using batch size %s)", obj.name(), batch)
132
133         cur = self.conn.cursor()
134         cur.execute(obj.sql_count_objects())
135
136         total_tuples = cur.fetchone()[0]
137         LOG.debug("Total number of rows: %i", total_tuples)
138
139         cur.close()
140
141         progress = ProgressLogger(obj.name(), total_tuples)
142
143         if total_tuples > 0:
144             cur = self.conn.cursor(name='places')
145             cur.execute(obj.sql_get_objects())
146
147             next_thread = self.find_free_thread()
148             while True:
149                 places = [p[0] for p in cur.fetchmany(batch)]
150                 if not places:
151                     break
152
153                 LOG.debug("Processing places: %s", str(places))
154                 thread = next(next_thread)
155
156                 thread.perform(obj.sql_index_place(places))
157                 progress.add(len(places))
158
159             cur.close()
160
161             for thread in self.threads:
162                 thread.wait()
163
164         progress.done()
165
166     def find_free_thread(self):
167         """ Generator that returns the next connection that is free for
168             sending a query.
169         """
170         ready = self.threads
171         command_stat = 0
172
173         while True:
174             for thread in ready:
175                 if thread.is_done():
176                     command_stat += 1
177                     yield thread
178
179             # refresh the connections occasionaly to avoid potential
180             # memory leaks in Postgresql.
181             if command_stat > 100000:
182                 for thread in self.threads:
183                     while not thread.is_done():
184                         thread.wait()
185                     thread.connect()
186                 command_stat = 0
187                 ready = self.threads
188             else:
189                 ready, _, _ = select.select(self.threads, [], [])
190
191         assert False, "Unreachable code"