]> git.openstreetmap.org Git - nominatim.git/blob - nominatim/index.c
log progress to stderr instead of stdout, to make use of unbuffered output
[nominatim.git] / nominatim / index.c
1 /*
2 */
3
4 #include <stdio.h>
5 #include <unistd.h>
6 #include <stdlib.h>
7 #include <string.h>
8 #include <assert.h>
9 #include <pthread.h>
10 #include <time.h>
11 #include <stdint.h>
12
13 #include <libpq-fe.h>
14
15 #include "nominatim.h"
16 #include "index.h"
17 #include "export.h"
18 #include "postgresql.h"
19
20 extern int verbose;
21
22 void nominatim_index(int rank_min, int rank_max, int num_threads, const char *conninfo, const char *structuredoutputfile)
23 {
24     struct index_thread_data * thread_data;
25     pthread_mutex_t count_mutex = PTHREAD_MUTEX_INITIALIZER;
26     int tuples, count, sleepcount;
27
28     time_t rankStartTime;
29     int rankTotalTuples;
30     int rankCountTuples;
31     float rankPerSecond;
32
33     PGconn *conn;
34     PGresult * res;
35     PGresult * resSectors;
36     PGresult * resPlaces;
37     PGresult * resNULL;
38
39     int rank;
40     int i;
41     int iSector;
42     int iResult;
43     int bSkip;
44
45     const char *paramValues[2];
46     int         paramLengths[2];
47     int         paramFormats[2];
48     uint32_t    paramRank;
49     uint32_t    paramSector;
50     uint32_t    sector;
51
52     xmlTextWriterPtr writer;
53     pthread_mutex_t writer_mutex = PTHREAD_MUTEX_INITIALIZER;
54
55     Oid pg_prepare_params[2];
56
57     conn = PQconnectdb(conninfo);
58     if (PQstatus(conn) != CONNECTION_OK)
59     {
60         fprintf(stderr, "Connection to database failed: %s\n", PQerrorMessage(conn));
61         exit(EXIT_FAILURE);
62     }
63
64     pg_prepare_params[0] = PG_OID_INT4;
65     res = PQprepare(conn, "index_sectors",
66                     "select geometry_sector,count(*) from placex where rank_search = $1 and indexed_status > 0 group by geometry_sector order by geometry_sector",
67                     1, pg_prepare_params);
68     if (PQresultStatus(res) != PGRES_COMMAND_OK)
69     {
70         fprintf(stderr, "Failed preparing index_sectors: %s\n", PQerrorMessage(conn));
71         exit(EXIT_FAILURE);
72     }
73     PQclear(res);
74
75     pg_prepare_params[0] = PG_OID_INT4;
76     res = PQprepare(conn, "index_nosectors",
77                     "select 0::integer,count(*) from placex where rank_search = $1 and indexed_status > 0",
78                     1, pg_prepare_params);
79     if (PQresultStatus(res) != PGRES_COMMAND_OK)
80     {
81         fprintf(stderr, "Failed preparing index_sectors: %s\n", PQerrorMessage(conn));
82         exit(EXIT_FAILURE);
83     }
84     PQclear(res);
85
86     pg_prepare_params[0] = PG_OID_INT4;
87     pg_prepare_params[1] = PG_OID_INT4;
88     res = PQprepare(conn, "index_sector_places",
89                     "select place_id from placex where rank_search = $1 and geometry_sector = $2 and indexed_status > 0",
90                     2, pg_prepare_params);
91     if (PQresultStatus(res) != PGRES_COMMAND_OK)
92     {
93         fprintf(stderr, "Failed preparing index_sector_places: %s\n", PQerrorMessage(conn));
94         exit(EXIT_FAILURE);
95     }
96     PQclear(res);
97
98     pg_prepare_params[0] = PG_OID_INT4;
99     res = PQprepare(conn, "index_nosector_places",
100                     "select place_id from placex where rank_search = $1 and indexed_status > 0 order by geometry_sector",
101                     1, pg_prepare_params);
102     if (PQresultStatus(res) != PGRES_COMMAND_OK)
103     {
104         fprintf(stderr, "Failed preparing index_nosector_places: %s\n", PQerrorMessage(conn));
105         exit(EXIT_FAILURE);
106     }
107     PQclear(res);
108
109     // Build the data for each thread
110     thread_data = (struct index_thread_data *)malloc(sizeof(struct index_thread_data)*num_threads);
111     for (i = 0; i < num_threads; i++)
112     {
113         thread_data[i].conn = PQconnectdb(conninfo);
114         if (PQstatus(thread_data[i].conn) != CONNECTION_OK)
115         {
116             fprintf(stderr, "Connection to database failed: %s\n", PQerrorMessage(thread_data[i].conn));
117             exit(EXIT_FAILURE);
118         }
119
120         pg_prepare_params[0] = PG_OID_INT4;
121         res = PQprepare(thread_data[i].conn, "index_placex",
122                         "update placex set indexed_status = 0 where place_id = $1",
123                         1, pg_prepare_params);
124         if (PQresultStatus(res) != PGRES_COMMAND_OK)
125         {
126             fprintf(stderr, "Failed preparing index_placex: %s\n", PQerrorMessage(conn));
127             exit(EXIT_FAILURE);
128         }
129         PQclear(res);
130
131         res = PQexec(thread_data[i].conn, "set enable_seqscan = false");
132         if (PQresultStatus(res) != PGRES_COMMAND_OK)
133         {
134             fprintf(stderr, "Failed disabling sequential scan: %s\n", PQerrorMessage(conn));
135             exit(EXIT_FAILURE);
136         }
137         PQclear(res);
138
139         nominatim_exportCreatePreparedQueries(thread_data[i].conn);
140     }
141
142     // Create the output file
143     writer = NULL;
144     if (structuredoutputfile)
145     {
146         writer = nominatim_exportXMLStart(structuredoutputfile);
147     }
148
149     fprintf(stderr, "Starting indexing rank (%i to %i) using %i treads\n", rank_min, rank_max, num_threads);
150
151     for (rank = rank_min; rank <= rank_max; rank++)
152     {
153         fprintf(stderr, "Starting rank %d\n", rank);
154         rankCountTuples = 0;
155         rankPerSecond = 0;
156
157         paramRank = PGint32(rank);
158         paramValues[0] = (char *)&paramRank;
159         paramLengths[0] = sizeof(paramRank);
160         paramFormats[0] = 1;
161 //        if (rank < 16)
162 //            resSectors = PQexecPrepared(conn, "index_nosectors", 1, paramValues, paramLengths, paramFormats, 1);
163 //        else
164         resSectors = PQexecPrepared(conn, "index_sectors", 1, paramValues, paramLengths, paramFormats, 1);
165
166         if (PQresultStatus(resSectors) != PGRES_TUPLES_OK)
167         {
168             fprintf(stderr, "index_sectors: SELECT failed: %s", PQerrorMessage(conn));
169             PQclear(resSectors);
170             exit(EXIT_FAILURE);
171         }
172         if (PQftype(resSectors, 0) != PG_OID_INT4)
173         {
174             fprintf(stderr, "Sector value has unexpected type\n");
175             PQclear(resSectors);
176             exit(EXIT_FAILURE);
177         }
178         if (PQftype(resSectors, 1) != PG_OID_INT8)
179         {
180             fprintf(stderr, "Sector value has unexpected type\n");
181             PQclear(resSectors);
182             exit(EXIT_FAILURE);
183         }
184
185         rankTotalTuples = 0;
186         for (iSector = 0; iSector < PQntuples(resSectors); iSector++)
187         {
188             rankTotalTuples += PGint64(*((uint64_t *)PQgetvalue(resSectors, iSector, 1)));
189         }
190
191         rankStartTime = time(0);
192         for (iSector = 0; iSector <= PQntuples(resSectors); iSector++)
193         {
194             if (iSector > 0)
195             {
196                 resPlaces = PQgetResult(conn);
197                 if (PQresultStatus(resPlaces) != PGRES_TUPLES_OK)
198                 {
199                     fprintf(stderr, "index_sector_places: SELECT failed: %s", PQerrorMessage(conn));
200                     PQclear(resPlaces);
201                     exit(EXIT_FAILURE);
202                 }
203                 if (PQftype(resPlaces, 0) != PG_OID_INT4)
204                 {
205                     fprintf(stderr, "Place_id value has unexpected type\n");
206                     PQclear(resPlaces);
207                     exit(EXIT_FAILURE);
208                 }
209                 resNULL = PQgetResult(conn);
210                 if (resNULL != NULL)
211                 {
212                     fprintf(stderr, "Unexpected non-null response\n");
213                     exit(EXIT_FAILURE);
214                 }
215             }
216
217             if (iSector < PQntuples(resSectors))
218             {
219                 sector = PGint32(*((uint32_t *)PQgetvalue(resSectors, iSector, 0)));
220 //                fprintf(stderr, "\n Starting sector %d size %ld\n", sector, PGint64(*((uint64_t *)PQgetvalue(resSectors, iSector, 1))));
221
222                 // Get all the place_id's for this sector
223                 paramRank = PGint32(rank);
224                 paramValues[0] = (char *)&paramRank;
225                 paramLengths[0] = sizeof(paramRank);
226                 paramFormats[0] = 1;
227                 paramSector = PGint32(sector);
228                 paramValues[1] = (char *)&paramSector;
229                 paramLengths[1] = sizeof(paramSector);
230                 paramFormats[1] = 1;
231                 if (rankTotalTuples-rankCountTuples < num_threads*1000)
232                 {
233                     iResult = PQsendQueryPrepared(conn, "index_nosector_places", 1, paramValues, paramLengths, paramFormats, 1);
234                 }
235                 else
236                 {
237                     iResult = PQsendQueryPrepared(conn, "index_sector_places", 2, paramValues, paramLengths, paramFormats, 1);
238                 }
239                 if (!iResult)
240                 {
241                     fprintf(stderr, "index_sector_places: SELECT failed: %s", PQerrorMessage(conn));
242                     PQclear(resPlaces);
243                     exit(EXIT_FAILURE);
244                 }
245             }
246
247             if (iSector > 0)
248             {
249                 count = 0;
250                 rankPerSecond = 0;
251                 tuples = PQntuples(resPlaces);
252
253                 if (tuples > 0)
254                 {
255                     // Spawn threads
256                     for (i = 0; i < num_threads; i++)
257                     {
258                         thread_data[i].res = resPlaces;
259                         thread_data[i].tuples = tuples;
260                         thread_data[i].count = &count;
261                         thread_data[i].count_mutex = &count_mutex;
262                         thread_data[i].writer = writer;
263                         thread_data[i].writer_mutex = &writer_mutex;
264                         pthread_create(&thread_data[i].thread, NULL, &nominatim_indexThread, (void *)&thread_data[i]);
265                     }
266
267                     // Monitor threads to give user feedback
268                     sleepcount = 0;
269                     while (count < tuples)
270                     {
271                         usleep(1000);
272
273                         // Aim for one update per second
274                         if (sleepcount++ > 500)
275                         {
276                             rankPerSecond = ((float)rankCountTuples + (float)count) / MAX(difftime(time(0), rankStartTime),1);
277                             fprintf(stderr, "  Done %i in %i @ %f per second - Rank %i ETA (seconds): %f\n", (rankCountTuples + count), (int)(difftime(time(0), rankStartTime)), rankPerSecond, rank, ((float)(rankTotalTuples - (rankCountTuples + count)))/rankPerSecond);
278                             sleepcount = 0;
279                         }
280                     }
281
282                     // Wait for everything to finish
283                     for (i = 0; i < num_threads; i++)
284                     {
285                         pthread_join(thread_data[i].thread, NULL);
286                     }
287
288                     rankCountTuples += tuples;
289                 }
290
291                 // Finished sector
292                 rankPerSecond = (float)rankCountTuples / MAX(difftime(time(0), rankStartTime),1);
293                 fprintf(stderr, "  Done %i in %i @ %f per second - ETA (seconds): %f\n", rankCountTuples, (int)(difftime(time(0), rankStartTime)), rankPerSecond, ((float)(rankTotalTuples - rankCountTuples))/rankPerSecond);
294
295                 PQclear(resPlaces);
296             }
297             if (rankTotalTuples-rankCountTuples < num_threads*20 && iSector < PQntuples(resSectors))
298             {
299                 iSector = PQntuples(resSectors) - 1;
300             }
301         }
302         // Finished rank
303         fprintf(stderr, "\r  Done %i in %i @ %f per second - FINISHED                      \n\n", rankCountTuples, (int)(difftime(time(0), rankStartTime)), rankPerSecond);
304
305         PQclear(resSectors);
306     }
307
308     if (writer)
309     {
310         nominatim_exportXMLEnd(writer);
311     }
312 }
313
314 void *nominatim_indexThread(void * thread_data_in)
315 {
316     struct index_thread_data * thread_data = (struct index_thread_data * )thread_data_in;
317     struct export_data  querySet;
318
319     PGresult   *res;
320
321     const char *paramValues[1];
322     int         paramLengths[1];
323     int         paramFormats[1];
324     uint32_t    paramPlaceID;
325     uint32_t    place_id;
326     time_t              updateStartTime;
327
328     while (1)
329     {
330         pthread_mutex_lock( thread_data->count_mutex );
331         if (*(thread_data->count) >= thread_data->tuples)
332         {
333             pthread_mutex_unlock( thread_data->count_mutex );
334             break;
335         }
336
337         place_id = PGint32(*((uint32_t *)PQgetvalue(thread_data->res, *thread_data->count, 0)));
338         (*thread_data->count)++;
339
340         pthread_mutex_unlock( thread_data->count_mutex );
341
342         if (verbose) fprintf(stderr, "  Processing place_id %d\n", place_id);
343
344         updateStartTime = time(0);
345         int done = 0;
346
347         if (thread_data->writer)
348         {
349              nominatim_exportPlaceQueries(place_id, thread_data->conn, &querySet);
350         }
351
352         while(!done)
353         {
354
355                 paramPlaceID = PGint32(place_id);
356                 paramValues[0] = (char *)&paramPlaceID;
357                 paramLengths[0] = sizeof(paramPlaceID);
358                 paramFormats[0] = 1;
359                 res = PQexecPrepared(thread_data->conn, "index_placex", 1, paramValues, paramLengths, paramFormats, 1);
360                 if (PQresultStatus(res) == PGRES_COMMAND_OK)
361                         done = 1;
362                 else
363                 {
364                         if (strncmp(PQerrorMessage(thread_data->conn), "ERROR:  deadlock detected", 25))
365                         {
366                             fprintf(stderr, "index_placex: UPDATE failed - deadlock, retrying\n");
367                             PQclear(res);
368                             sleep(rand() % 10);
369                         }
370                         else
371                         {
372                             fprintf(stderr, "index_placex: UPDATE failed: %s", PQerrorMessage(thread_data->conn));
373                             PQclear(res);
374                             sleep(rand() % 10);
375 //                          exit(EXIT_FAILURE);
376                         }
377                 }
378         }
379         PQclear(res);
380         if (difftime(time(0), updateStartTime) > 1) fprintf(stderr, "  Slow place_id %d\n", place_id);
381
382         if (thread_data->writer)
383         {
384             nominatim_exportPlace(place_id, thread_data->conn, thread_data->writer, thread_data->writer_mutex, &querySet);
385             nominatim_exportFreeQueries(&querySet);
386         }
387     }
388
389     return NULL;
390 }