]> git.openstreetmap.org Git - nominatim.git/blob - nominatim/index.c
643b49d28efa7ca4864d3092791dc0485bb855a1
[nominatim.git] / nominatim / index.c
1 /*
2 */
3
4 #include <stdio.h>
5 #include <unistd.h>
6 #include <stdlib.h>
7 #include <string.h>
8 #include <assert.h>
9 #include <pthread.h>
10 #include <time.h>
11 #include <stdint.h>
12
13 #include <libpq-fe.h>
14
15 #include "nominatim.h"
16 #include "index.h"
17 #include "export.h"
18 #include "postgresql.h"
19
20 extern int verbose;
21
22 void nominatim_index(int rank_min, int rank_max, int num_threads, const char *conninfo, const char *structuredoutputfile)
23 {
24         struct index_thread_data * thread_data;
25         pthread_mutex_t count_mutex = PTHREAD_MUTEX_INITIALIZER;
26         int tuples, count, sleepcount;
27
28         time_t rankStartTime;
29         int rankTotalTuples;
30         int rankCountTuples;
31         float rankPerSecond;
32
33         PGconn *conn;
34         PGresult * res;
35         PGresult * resSectors;
36         PGresult * resPlaces;
37         PGresult * resNULL;
38
39         int rank;
40         int i;
41         int iSector;
42         int iResult;
43
44     const char *paramValues[2];
45     int         paramLengths[2];
46     int         paramFormats[2];
47     uint32_t    paramRank;
48     uint32_t    paramSector;
49     uint32_t    sector;
50
51     xmlTextWriterPtr writer;
52         pthread_mutex_t writer_mutex = PTHREAD_MUTEX_INITIALIZER;
53
54     Oid pg_prepare_params[2];
55
56     conn = PQconnectdb(conninfo);
57     if (PQstatus(conn) != CONNECTION_OK) {
58         fprintf(stderr, "Connection to database failed: %s\n", PQerrorMessage(conn));
59         exit(EXIT_FAILURE);
60     }
61
62     pg_prepare_params[0] = PG_OID_INT4;
63     res = PQprepare(conn, "index_sectors",
64         "select geometry_sector,count(*) from placex where rank_search = $1 and indexed_status > 0 group by geometry_sector order by geometry_sector",
65         1, pg_prepare_params);
66     if (PQresultStatus(res) != PGRES_COMMAND_OK)
67     {
68         fprintf(stderr, "Failed preparing index_sectors: %s\n", PQerrorMessage(conn));
69         exit(EXIT_FAILURE);
70     }
71     PQclear(res);
72
73     pg_prepare_params[0] = PG_OID_INT4;
74     res = PQprepare(conn, "index_nosectors",
75         "select 0::integer,count(*) from placex where rank_search = $1 and indexed_status > 0",
76         1, pg_prepare_params);
77     if (PQresultStatus(res) != PGRES_COMMAND_OK)
78     {
79         fprintf(stderr, "Failed preparing index_sectors: %s\n", PQerrorMessage(conn));
80         exit(EXIT_FAILURE);
81     }
82     PQclear(res);
83
84     pg_prepare_params[0] = PG_OID_INT4;
85     pg_prepare_params[1] = PG_OID_INT4;
86     res = PQprepare(conn, "index_sector_places",
87         "select place_id from placex where rank_search = $1 and geometry_sector = $2 and indexed_status > 0",
88         2, pg_prepare_params);
89     if (PQresultStatus(res) != PGRES_COMMAND_OK)
90     {
91         fprintf(stderr, "Failed preparing index_sector_places: %s\n", PQerrorMessage(conn));
92         exit(EXIT_FAILURE);
93     }
94     PQclear(res);
95
96     pg_prepare_params[0] = PG_OID_INT4;
97     res = PQprepare(conn, "index_nosector_places",
98         "select place_id from placex where rank_search = $1 and indexed_status > 0 order by geometry_sector",
99         1, pg_prepare_params);
100     if (PQresultStatus(res) != PGRES_COMMAND_OK)
101     {
102         fprintf(stderr, "Failed preparing index_nosector_places: %s\n", PQerrorMessage(conn));
103         exit(EXIT_FAILURE);
104     }
105     PQclear(res);
106
107     // Build the data for each thread
108     thread_data = (struct index_thread_data *)malloc(sizeof(struct index_thread_data)*num_threads);
109         for (i = 0; i < num_threads; i++)
110         {
111                 thread_data[i].conn = PQconnectdb(conninfo);
112             if (PQstatus(thread_data[i].conn) != CONNECTION_OK) {
113                 fprintf(stderr, "Connection to database failed: %s\n", PQerrorMessage(thread_data[i].conn));
114                 exit(EXIT_FAILURE);
115             }
116
117             pg_prepare_params[0] = PG_OID_INT4;
118             res = PQprepare(thread_data[i].conn, "index_placex",
119                 "update placex set indexed_status = 0 where place_id = $1",
120                 1, pg_prepare_params);
121             if (PQresultStatus(res) != PGRES_COMMAND_OK)
122             {
123                 fprintf(stderr, "Failed preparing index_placex: %s\n", PQerrorMessage(conn));
124                 exit(EXIT_FAILURE);
125             }
126             PQclear(res);
127
128                 res = PQexec(thread_data[i].conn, "set enable_seqscan = false");
129                 if (PQresultStatus(res) != PGRES_COMMAND_OK)
130             {
131                 fprintf(stderr, "Failed disabling sequential scan: %s\n", PQerrorMessage(conn));
132                 exit(EXIT_FAILURE);
133             }
134             PQclear(res);
135
136             nominatim_exportCreatePreparedQueries(thread_data[i].conn);
137         }
138
139         // Create the output file
140         writer = NULL;
141         if (structuredoutputfile)
142         {
143                 writer = nominatim_exportXMLStart(structuredoutputfile);
144         }
145
146     fprintf(stderr, "Starting indexing rank (%i to %i) using %i treads\n", rank_min, rank_max, num_threads);
147
148     for (rank = rank_min; rank <= rank_max; rank++)
149     {
150         printf("Starting rank %d\n", rank);
151         rankCountTuples = 0;
152         rankPerSecond = 0;
153
154         paramRank = PGint32(rank);
155         paramValues[0] = (char *)&paramRank;
156         paramLengths[0] = sizeof(paramRank);
157         paramFormats[0] = 1;
158                 if (rank < 16)
159                 resSectors = PQexecPrepared(conn, "index_nosectors", 1, paramValues, paramLengths, paramFormats, 1);
160                 else
161                 resSectors = PQexecPrepared(conn, "index_sectors", 1, paramValues, paramLengths, paramFormats, 1);
162         if (PQresultStatus(resSectors) != PGRES_TUPLES_OK)
163         {
164             fprintf(stderr, "index_sectors: SELECT failed: %s", PQerrorMessage(conn));
165             PQclear(resSectors);
166             exit(EXIT_FAILURE);
167         }
168                 if (PQftype(resSectors, 0) != PG_OID_INT4)
169                 {
170             fprintf(stderr, "Sector value has unexpected type\n");
171             PQclear(resSectors);
172             exit(EXIT_FAILURE);
173                 }
174                 if (PQftype(resSectors, 1) != PG_OID_INT8)
175                 {
176             fprintf(stderr, "Sector value has unexpected type\n");
177             PQclear(resSectors);
178             exit(EXIT_FAILURE);
179                 }
180
181                 rankTotalTuples = 0;
182         for (iSector = 0; iSector < PQntuples(resSectors); iSector++)
183         {
184                 rankTotalTuples += PGint64(*((uint64_t *)PQgetvalue(resSectors, iSector, 1)));
185         }
186
187         rankStartTime = time(0);
188         for (iSector = 0; iSector <= PQntuples(resSectors); iSector++)
189         {
190                         if (iSector > 0)
191                         {
192                                 resPlaces = PQgetResult(conn);
193                     if (PQresultStatus(resPlaces) != PGRES_TUPLES_OK)
194                         {
195                             fprintf(stderr, "index_sector_places: SELECT failed: %s", PQerrorMessage(conn));
196                         PQclear(resPlaces);
197                             exit(EXIT_FAILURE);
198                     }
199                                 if (PQftype(resPlaces, 0) != PG_OID_INT4)
200                                 {
201                             fprintf(stderr, "Place_id value has unexpected type\n");
202                         PQclear(resPlaces);
203                             exit(EXIT_FAILURE);
204                                 }
205                                 resNULL = PQgetResult(conn);
206                                 if (resNULL != NULL)
207                                 {
208                                         fprintf(stderr, "Unexpected non-null response\n");
209                                         exit(EXIT_FAILURE);
210                                 }
211                         }
212
213                         if (iSector < PQntuples(resSectors))
214                         {
215                                 sector = PGint32(*((uint32_t *)PQgetvalue(resSectors, iSector, 0)));
216                         //printf("\n Starting sector %d size %ld\n", sector, PGint64(*((uint64_t *)PQgetvalue(resSectors, iSector, 1))));
217
218                                 // Get all the place_id's for this sector
219                             paramRank = PGint32(rank);
220                         paramValues[0] = (char *)&paramRank;
221                         paramLengths[0] = sizeof(paramRank);
222                             paramFormats[0] = 1;
223                         paramSector = PGint32(sector);
224                     paramValues[1] = (char *)&paramSector;
225                                 paramLengths[1] = sizeof(paramSector);
226                             paramFormats[1] = 1;
227                                 if (rank < 16)
228                             iResult = PQsendQueryPrepared(conn, "index_nosector_places", 1, paramValues, paramLengths, paramFormats, 1);
229                                 else
230                                 iResult = PQsendQueryPrepared(conn, "index_sector_places", 2, paramValues, paramLengths, paramFormats, 1);
231                                 if (!iResult)
232                                 {
233                             fprintf(stderr, "index_sector_places: SELECT failed: %s", PQerrorMessage(conn));
234                         PQclear(resPlaces);
235                             exit(EXIT_FAILURE);
236                                 }
237                         }
238
239                         if (iSector > 0)
240                         {
241                                 count = 0;
242                                 rankPerSecond = 0;
243                                 tuples = PQntuples(resPlaces);
244
245                                 if (tuples > 0)
246                         {
247                                         // Spawn threads
248                                         for (i = 0; i < num_threads; i++)
249                                         {
250                                                 thread_data[i].res = resPlaces;
251                                                 thread_data[i].tuples = tuples;
252                                                 thread_data[i].count = &count;
253                                                 thread_data[i].count_mutex = &count_mutex;
254                                                 thread_data[i].writer = writer;
255                                                 thread_data[i].writer_mutex = &writer_mutex;
256                                                 pthread_create(&thread_data[i].thread, NULL, &nominatim_indexThread, (void *)&thread_data[i]);
257                                         }
258
259                                         // Monitor threads to give user feedback
260                                         sleepcount = 0;
261                                         while(count < tuples)
262                                         {
263                                                 usleep(1000);
264
265                                                 // Aim for one update per second
266                                                 if (sleepcount++ > 500)
267                                                 {
268                                                         rankPerSecond = ((float)rankCountTuples + (float)count) / MAX(difftime(time(0), rankStartTime),1);
269                                                         printf("  Done %i in %i @ %f per second - Rank %i ETA (seconds): %f\n", (rankCountTuples + count), (int)(difftime(time(0), rankStartTime)), rankPerSecond, rank, ((float)(rankTotalTuples - (rankCountTuples + count)))/rankPerSecond);
270                                                         sleepcount = 0;
271                                                 }
272                                         }
273
274                                         // Wait for everything to finish
275                                         for (i = 0; i < num_threads; i++)
276                                         {
277                                                 pthread_join(thread_data[i].thread, NULL);
278                                         }
279
280                                         rankCountTuples += tuples;
281                                 }
282
283                                 // Finished sector
284                                 rankPerSecond = (float)rankCountTuples / MAX(difftime(time(0), rankStartTime),1);
285                                 printf("  Done %i in %i @ %f per second - ETA (seconds): %f\n", rankCountTuples, (int)(difftime(time(0), rankStartTime)), rankPerSecond, ((float)(rankTotalTuples - rankCountTuples))/rankPerSecond);
286
287                     PQclear(resPlaces);
288                 }
289         }
290         // Finished rank
291                 printf("\r  Done %i in %i @ %f per second - FINISHED                      \n\n", rankCountTuples, (int)(difftime(time(0), rankStartTime)), rankPerSecond);
292
293         PQclear(resSectors);
294     }
295
296     if (writer)
297     {
298         nominatim_exportXMLEnd(writer);
299     }
300 }
301
302 void *nominatim_indexThread(void * thread_data_in)
303 {
304         struct index_thread_data * thread_data = (struct index_thread_data * )thread_data_in;
305
306         PGresult   *res;
307
308     const char *paramValues[1];
309     int         paramLengths[1];
310     int         paramFormats[1];
311     uint32_t    paramPlaceID;
312     uint32_t    place_id;
313         time_t          updateStartTime;
314
315         while(1)
316         {
317                 pthread_mutex_lock( thread_data->count_mutex );
318                 if (*(thread_data->count) >= thread_data->tuples)
319                 {
320                         pthread_mutex_unlock( thread_data->count_mutex );
321                         break;
322                 }
323
324                 place_id = PGint32(*((uint32_t *)PQgetvalue(thread_data->res, *thread_data->count, 0)));
325                 (*thread_data->count)++;
326
327                 pthread_mutex_unlock( thread_data->count_mutex );
328
329                 if (verbose) printf("  Processing place_id %d\n", place_id);
330                 
331                 updateStartTime = time(0);
332                 paramPlaceID = PGint32(place_id);
333         paramValues[0] = (char *)&paramPlaceID;
334         paramLengths[0] = sizeof(paramPlaceID);
335         paramFormats[0] = 1;
336         res = PQexecPrepared(thread_data->conn, "index_placex", 1, paramValues, paramLengths, paramFormats, 1);
337         if (PQresultStatus(res) != PGRES_COMMAND_OK)
338         {
339             fprintf(stderr, "index_placex: UPDATE failed: %s", PQerrorMessage(thread_data->conn));
340             PQclear(res);
341             exit(EXIT_FAILURE);
342         }
343         PQclear(res);
344                 if (difftime(time(0), updateStartTime) > 1) printf("  Slow place_id %d\n", place_id);
345
346         if (thread_data->writer)
347         {
348                 nominatim_exportPlace(place_id, thread_data->conn, thread_data->writer, thread_data->writer_mutex);
349         }
350         }
351
352         return NULL;
353 }