2 * triggers indexing (reparenting etc.) through setting resetting indexed_status: update placex/osmline set indexed_status = 0 where indexed_status > 0
3 * triggers placex_update and osmline_update
17 #include "nominatim.h"
20 #include "postgresql.h"
24 void run_indexing(int rank, int interpolation, PGconn *conn, int num_threads,
25 struct index_thread_data * thread_data, const char *structuredoutputfile)
27 int tuples, count, sleepcount;
28 pthread_mutex_t count_mutex = PTHREAD_MUTEX_INITIALIZER;
35 PGresult * resSectors;
43 const char *paramValues[2];
50 xmlTextWriterPtr writer;
51 pthread_mutex_t writer_mutex = PTHREAD_MUTEX_INITIALIZER;
53 // Create the output file
55 if (structuredoutputfile)
57 writer = nominatim_exportXMLStart(structuredoutputfile);
62 fprintf(stderr, "Starting interpolation lines (location_property_osmline)\n");
66 fprintf(stderr, "Starting rank %d\n", rank);
72 paramRank = PGint32(rank);
73 paramValues[0] = (char *)¶mRank;
74 paramLengths[0] = sizeof(paramRank);
79 resSectors = PQexecPrepared(conn, "index_sectors_osmline", 0, NULL, 0, NULL, 1);
83 resSectors = PQexecPrepared(conn, "index_sectors", 1, paramValues, paramLengths, paramFormats, 1);
85 if (PQresultStatus(resSectors) != PGRES_TUPLES_OK)
87 fprintf(stderr, "index_sectors: SELECT failed: %s", PQerrorMessage(conn));
91 if (PQftype(resSectors, 0) != PG_OID_INT4)
93 fprintf(stderr, "Sector value has unexpected type\n");
97 if (PQftype(resSectors, 1) != PG_OID_INT8)
99 fprintf(stderr, "Sector value has unexpected type\n");
105 for (iSector = 0; iSector < PQntuples(resSectors); iSector++)
107 rankTotalTuples += PGint64(*((uint64_t *)PQgetvalue(resSectors, iSector, 1)));
110 rankStartTime = time(0);
111 for (iSector = 0; iSector <= PQntuples(resSectors); iSector++)
115 resPlaces = PQgetResult(conn);
116 if (PQresultStatus(resPlaces) != PGRES_TUPLES_OK)
118 fprintf(stderr, "index_sector_places: SELECT failed: %s", PQerrorMessage(conn));
122 if (PQftype(resPlaces, 0) != PG_OID_INT8)
124 fprintf(stderr, "Place_id value has unexpected type\n");
128 resNULL = PQgetResult(conn);
131 fprintf(stderr, "Unexpected non-null response\n");
136 if (iSector < PQntuples(resSectors))
138 sector = PGint32(*((uint32_t *)PQgetvalue(resSectors, iSector, 0)));
139 // fprintf(stderr, "\n Starting sector %d size %ld\n", sector, PGint64(*((uint64_t *)PQgetvalue(resSectors, iSector, 1))));
141 // Get all the place_id's for this sector
142 paramRank = PGint32(rank);
143 paramValues[0] = (char *)¶mRank;
144 paramLengths[0] = sizeof(paramRank);
146 paramSector = PGint32(sector);
147 paramValues[1] = (char *)¶mSector;
148 paramLengths[1] = sizeof(paramSector);
150 if (rankTotalTuples-rankCountTuples < num_threads*1000)
155 iResult = PQsendQueryPrepared(conn, "index_nosector_places_osmline", 0, NULL, 0, NULL, 1);
159 iResult = PQsendQueryPrepared(conn, "index_nosector_places", 1, paramValues, paramLengths, paramFormats, 1);
166 iResult = PQsendQueryPrepared(conn, "index_sector_places_osmline", 1, paramValues, paramLengths, paramFormats, 1);
171 iResult = PQsendQueryPrepared(conn, "index_sector_places", 2, paramValues, paramLengths, paramFormats, 1);
176 fprintf(stderr, "index_sector_places: SELECT failed: %s", PQerrorMessage(conn));
185 tuples = PQntuples(resPlaces);
190 for (i = 0; i < num_threads; i++)
192 thread_data[i].res = resPlaces;
193 thread_data[i].tuples = tuples;
194 thread_data[i].count = &count;
195 thread_data[i].count_mutex = &count_mutex;
196 thread_data[i].writer = writer;
197 thread_data[i].writer_mutex = &writer_mutex;
200 thread_data[i].table = 0; // use interpolations table
204 thread_data[i].table = 1; // use placex table
206 pthread_create(&thread_data[i].thread, NULL, &nominatim_indexThread, (void *)&thread_data[i]);
209 // Monitor threads to give user feedback
211 while (count < tuples)
215 // Aim for one update per second
216 if (sleepcount++ > 500)
218 rankPerSecond = ((float)rankCountTuples + (float)count) / MAX(difftime(time(0), rankStartTime),1);
219 fprintf(stderr, " Done %i in %i @ %f per second - Rank %i ETA (seconds): %f\n", (rankCountTuples + count), (int)(difftime(time(0), rankStartTime)), rankPerSecond, rank, ((float)(rankTotalTuples - (rankCountTuples + count)))/rankPerSecond);
224 // Wait for everything to finish
225 for (i = 0; i < num_threads; i++)
227 pthread_join(thread_data[i].thread, NULL);
230 rankCountTuples += tuples;
234 rankPerSecond = (float)rankCountTuples / MAX(difftime(time(0), rankStartTime),1);
235 fprintf(stderr, " Done %i in %i @ %f per second - ETA (seconds): %f\n", rankCountTuples, (int)(difftime(time(0), rankStartTime)), rankPerSecond, ((float)(rankTotalTuples - rankCountTuples))/rankPerSecond);
239 if (rankTotalTuples-rankCountTuples < num_threads*20 && iSector < PQntuples(resSectors))
241 iSector = PQntuples(resSectors) - 1;
245 fprintf(stderr, "\r Done %i in %i @ %f per second - FINISHED\n\n", rankCountTuples, (int)(difftime(time(0), rankStartTime)), rankPerSecond);
250 void nominatim_index(int rank_min, int rank_max, int num_threads, const char *conninfo, const char *structuredoutputfile)
252 struct index_thread_data * thread_data;
261 xmlTextWriterPtr writer;
262 pthread_mutex_t writer_mutex = PTHREAD_MUTEX_INITIALIZER;
264 Oid pg_prepare_params[2];
266 conn = PQconnectdb(conninfo);
267 if (PQstatus(conn) != CONNECTION_OK)
269 fprintf(stderr, "Connection to database failed: %s\n", PQerrorMessage(conn));
273 pg_prepare_params[0] = PG_OID_INT4;
274 res = PQprepare(conn, "index_sectors",
275 "select geometry_sector,count(*) from placex where rank_search = $1 and indexed_status > 0 group by geometry_sector order by geometry_sector",
276 1, pg_prepare_params);
277 if (PQresultStatus(res) != PGRES_COMMAND_OK)
279 fprintf(stderr, "Failed preparing index_sectors: %s\n", PQerrorMessage(conn));
284 res = PQprepare(conn, "index_sectors_osmline",
285 "select geometry_sector,count(*) from location_property_osmline where indexed_status > 0 group by geometry_sector order by geometry_sector",
287 if (PQresultStatus(res) != PGRES_COMMAND_OK)
289 fprintf(stderr, "Failed preparing index_sectors: %s\n", PQerrorMessage(conn));
294 pg_prepare_params[0] = PG_OID_INT4;
295 res = PQprepare(conn, "index_nosectors",
296 "select 0::integer,count(*) from placex where rank_search = $1 and indexed_status > 0",
297 1, pg_prepare_params);
298 if (PQresultStatus(res) != PGRES_COMMAND_OK)
300 fprintf(stderr, "Failed preparing index_sectors: %s\n", PQerrorMessage(conn));
305 pg_prepare_params[0] = PG_OID_INT4;
306 pg_prepare_params[1] = PG_OID_INT4;
307 res = PQprepare(conn, "index_sector_places",
308 "select place_id from placex where rank_search = $1 and geometry_sector = $2 and indexed_status > 0",
309 2, pg_prepare_params);
310 if (PQresultStatus(res) != PGRES_COMMAND_OK)
312 fprintf(stderr, "Failed preparing index_sector_places: %s\n", PQerrorMessage(conn));
317 pg_prepare_params[0] = PG_OID_INT4;
318 res = PQprepare(conn, "index_nosector_places",
319 "select place_id from placex where rank_search = $1 and indexed_status > 0 order by geometry_sector",
320 1, pg_prepare_params);
321 if (PQresultStatus(res) != PGRES_COMMAND_OK)
323 fprintf(stderr, "Failed preparing index_nosector_places: %s\n", PQerrorMessage(conn));
328 pg_prepare_params[0] = PG_OID_INT4;
329 res = PQprepare(conn, "index_sector_places_osmline",
330 "select place_id from location_property_osmline where geometry_sector = $1 and indexed_status > 0",
331 1, pg_prepare_params);
332 if (PQresultStatus(res) != PGRES_COMMAND_OK)
334 fprintf(stderr, "Failed preparing index_sector_places: %s\n", PQerrorMessage(conn));
339 res = PQprepare(conn, "index_nosector_places_osmline",
340 "select place_id from location_property_osmline where indexed_status > 0 order by geometry_sector",
342 if (PQresultStatus(res) != PGRES_COMMAND_OK)
344 fprintf(stderr, "Failed preparing index_nosector_places: %s\n", PQerrorMessage(conn));
349 // Build the data for each thread
350 thread_data = (struct index_thread_data *)malloc(sizeof(struct index_thread_data)*num_threads);
351 for (i = 0; i < num_threads; i++)
353 thread_data[i].conn = PQconnectdb(conninfo);
354 if (PQstatus(thread_data[i].conn) != CONNECTION_OK)
356 fprintf(stderr, "Connection to database failed: %s\n", PQerrorMessage(thread_data[i].conn));
360 pg_prepare_params[0] = PG_OID_INT8;
361 res = PQprepare(thread_data[i].conn, "index_placex",
362 "update placex set indexed_status = 0 where place_id = $1",
363 1, pg_prepare_params);
364 if (PQresultStatus(res) != PGRES_COMMAND_OK)
366 fprintf(stderr, "Failed preparing index_placex: %s\n", PQerrorMessage(conn));
371 pg_prepare_params[0] = PG_OID_INT8;
372 res = PQprepare(thread_data[i].conn, "index_osmline",
373 "update location_property_osmline set indexed_status = 0 where place_id = $1",
374 1, pg_prepare_params);
375 if (PQresultStatus(res) != PGRES_COMMAND_OK)
377 fprintf(stderr, "Failed preparing index_osmline: %s\n", PQerrorMessage(conn));
382 /*res = PQexec(thread_data[i].conn, "set enable_seqscan = false");
383 if (PQresultStatus(res) != PGRES_COMMAND_OK)
385 fprintf(stderr, "Failed disabling sequential scan: %s\n", PQerrorMessage(conn));
390 nominatim_exportCreatePreparedQueries(thread_data[i].conn);
394 fprintf(stderr, "Starting indexing rank (%i to %i) using %i threads\n", rank_min, rank_max, num_threads);
396 for (rank = rank_min; rank <= rank_max; rank++)
398 // OSMLINE: do reindexing (=> reparenting) for interpolation lines at rank 30, but before all other objects of rank 30
399 // reason: houses (rank 30) depend on the updated interpolation line, when reparenting (see placex_update in functions.sql)
402 run_indexing(rank, 1, conn, num_threads, thread_data, structuredoutputfile);
404 run_indexing(rank, 0, conn, num_threads, thread_data, structuredoutputfile);
406 // Close all connections
407 for (i = 0; i < num_threads; i++)
409 PQfinish(thread_data[i].conn);
414 void *nominatim_indexThread(void * thread_data_in)
416 struct index_thread_data * thread_data = (struct index_thread_data * )thread_data_in;
417 struct export_data querySet;
421 const char *paramValues[1];
424 uint64_t paramPlaceID;
426 time_t updateStartTime;
429 table = (uint)(thread_data->table);
433 pthread_mutex_lock( thread_data->count_mutex );
434 if (*(thread_data->count) >= thread_data->tuples)
436 pthread_mutex_unlock( thread_data->count_mutex );
440 place_id = PGint64(*((uint64_t *)PQgetvalue(thread_data->res, *thread_data->count, 0)));
441 (*thread_data->count)++;
443 pthread_mutex_unlock( thread_data->count_mutex );
445 if (verbose) fprintf(stderr, " Processing place_id %ld\n", place_id);
447 updateStartTime = time(0);
450 if (thread_data->writer)
452 nominatim_exportPlaceQueries(place_id, thread_data->conn, &querySet);
457 paramPlaceID = PGint64(place_id);
458 paramValues[0] = (char *)¶mPlaceID;
459 paramLengths[0] = sizeof(paramPlaceID);
461 if (table == 1) // table=1 for placex
463 res = PQexecPrepared(thread_data->conn, "index_placex", 1, paramValues, paramLengths, paramFormats, 1);
465 else // table=0 for osmline
467 res = PQexecPrepared(thread_data->conn, "index_osmline", 1, paramValues, paramLengths, paramFormats, 1);
469 if (PQresultStatus(res) == PGRES_COMMAND_OK)
473 if (!strncmp(PQerrorMessage(thread_data->conn), "ERROR: deadlock detected", 25))
477 fprintf(stderr, "index_placex: UPDATE failed - deadlock, retrying (%ld)\n", place_id);
481 fprintf(stderr, "index_osmline: UPDATE failed - deadlock, retrying (%ld)\n", place_id);
490 fprintf(stderr, "index_placex: UPDATE failed: %s", PQerrorMessage(thread_data->conn));
494 fprintf(stderr, "index_osmline: UPDATE failed: %s", PQerrorMessage(thread_data->conn));
502 if (difftime(time(0), updateStartTime) > 1) fprintf(stderr, " Slow place_id %ld\n", place_id);
504 if (thread_data->writer)
506 nominatim_exportPlace(place_id, thread_data->conn, thread_data->writer, thread_data->writer_mutex, &querySet);
507 nominatim_exportFreeQueries(&querySet);