15 #include "nominatim.h"
18 #include "postgresql.h"
22 void nominatim_index(int rank_min, int rank_max, int num_threads, const char *conninfo, const char *structuredoutputfile)
24 struct index_thread_data * thread_data;
25 pthread_mutex_t count_mutex = PTHREAD_MUTEX_INITIALIZER;
26 int tuples, count, sleepcount;
35 PGresult * resSectors;
45 const char *paramValues[2];
52 xmlTextWriterPtr writer;
53 pthread_mutex_t writer_mutex = PTHREAD_MUTEX_INITIALIZER;
55 Oid pg_prepare_params[2];
57 conn = PQconnectdb(conninfo);
58 if (PQstatus(conn) != CONNECTION_OK)
60 fprintf(stderr, "Connection to database failed: %s\n", PQerrorMessage(conn));
64 pg_prepare_params[0] = PG_OID_INT4;
65 res = PQprepare(conn, "index_sectors",
66 "select geometry_sector,count(*) from placex where rank_search = $1 and indexed_status > 0 group by geometry_sector order by geometry_sector",
67 1, pg_prepare_params);
68 if (PQresultStatus(res) != PGRES_COMMAND_OK)
70 fprintf(stderr, "Failed preparing index_sectors: %s\n", PQerrorMessage(conn));
75 pg_prepare_params[0] = PG_OID_INT4;
76 res = PQprepare(conn, "index_nosectors",
77 "select 0::integer,count(*) from placex where rank_search = $1 and indexed_status > 0",
78 1, pg_prepare_params);
79 if (PQresultStatus(res) != PGRES_COMMAND_OK)
81 fprintf(stderr, "Failed preparing index_sectors: %s\n", PQerrorMessage(conn));
86 pg_prepare_params[0] = PG_OID_INT4;
87 pg_prepare_params[1] = PG_OID_INT4;
88 res = PQprepare(conn, "index_sector_places",
89 "select place_id from placex where rank_search = $1 and geometry_sector = $2 and indexed_status > 0",
90 2, pg_prepare_params);
91 if (PQresultStatus(res) != PGRES_COMMAND_OK)
93 fprintf(stderr, "Failed preparing index_sector_places: %s\n", PQerrorMessage(conn));
98 pg_prepare_params[0] = PG_OID_INT4;
99 res = PQprepare(conn, "index_nosector_places",
100 "select place_id from placex where rank_search = $1 and indexed_status > 0 order by geometry_sector",
101 1, pg_prepare_params);
102 if (PQresultStatus(res) != PGRES_COMMAND_OK)
104 fprintf(stderr, "Failed preparing index_nosector_places: %s\n", PQerrorMessage(conn));
109 // Build the data for each thread
110 thread_data = (struct index_thread_data *)malloc(sizeof(struct index_thread_data)*num_threads);
111 for (i = 0; i < num_threads; i++)
113 thread_data[i].conn = PQconnectdb(conninfo);
114 if (PQstatus(thread_data[i].conn) != CONNECTION_OK)
116 fprintf(stderr, "Connection to database failed: %s\n", PQerrorMessage(thread_data[i].conn));
120 pg_prepare_params[0] = PG_OID_INT4;
121 res = PQprepare(thread_data[i].conn, "index_placex",
122 "update placex set indexed_status = 0 where place_id = $1",
123 1, pg_prepare_params);
124 if (PQresultStatus(res) != PGRES_COMMAND_OK)
126 fprintf(stderr, "Failed preparing index_placex: %s\n", PQerrorMessage(conn));
131 res = PQexec(thread_data[i].conn, "set enable_seqscan = false");
132 if (PQresultStatus(res) != PGRES_COMMAND_OK)
134 fprintf(stderr, "Failed disabling sequential scan: %s\n", PQerrorMessage(conn));
139 nominatim_exportCreatePreparedQueries(thread_data[i].conn);
142 // Create the output file
144 if (structuredoutputfile)
146 writer = nominatim_exportXMLStart(structuredoutputfile);
149 fprintf(stderr, "Starting indexing rank (%i to %i) using %i treads\n", rank_min, rank_max, num_threads);
151 for (rank = rank_min; rank <= rank_max; rank++)
153 fprintf(stderr, "Starting rank %d\n", rank);
157 paramRank = PGint32(rank);
158 paramValues[0] = (char *)¶mRank;
159 paramLengths[0] = sizeof(paramRank);
162 // resSectors = PQexecPrepared(conn, "index_nosectors", 1, paramValues, paramLengths, paramFormats, 1);
164 resSectors = PQexecPrepared(conn, "index_sectors", 1, paramValues, paramLengths, paramFormats, 1);
166 if (PQresultStatus(resSectors) != PGRES_TUPLES_OK)
168 fprintf(stderr, "index_sectors: SELECT failed: %s", PQerrorMessage(conn));
172 if (PQftype(resSectors, 0) != PG_OID_INT4)
174 fprintf(stderr, "Sector value has unexpected type\n");
178 if (PQftype(resSectors, 1) != PG_OID_INT8)
180 fprintf(stderr, "Sector value has unexpected type\n");
186 for (iSector = 0; iSector < PQntuples(resSectors); iSector++)
188 rankTotalTuples += PGint64(*((uint64_t *)PQgetvalue(resSectors, iSector, 1)));
191 rankStartTime = time(0);
192 for (iSector = 0; iSector <= PQntuples(resSectors); iSector++)
196 resPlaces = PQgetResult(conn);
197 if (PQresultStatus(resPlaces) != PGRES_TUPLES_OK)
199 fprintf(stderr, "index_sector_places: SELECT failed: %s", PQerrorMessage(conn));
203 if (PQftype(resPlaces, 0) != PG_OID_INT4)
205 fprintf(stderr, "Place_id value has unexpected type\n");
209 resNULL = PQgetResult(conn);
212 fprintf(stderr, "Unexpected non-null response\n");
217 if (iSector < PQntuples(resSectors))
219 sector = PGint32(*((uint32_t *)PQgetvalue(resSectors, iSector, 0)));
220 // fprintf(stderr, "\n Starting sector %d size %ld\n", sector, PGint64(*((uint64_t *)PQgetvalue(resSectors, iSector, 1))));
222 // Get all the place_id's for this sector
223 paramRank = PGint32(rank);
224 paramValues[0] = (char *)¶mRank;
225 paramLengths[0] = sizeof(paramRank);
227 paramSector = PGint32(sector);
228 paramValues[1] = (char *)¶mSector;
229 paramLengths[1] = sizeof(paramSector);
231 if (rankTotalTuples-rankCountTuples < num_threads*1000)
233 iResult = PQsendQueryPrepared(conn, "index_nosector_places", 1, paramValues, paramLengths, paramFormats, 1);
237 iResult = PQsendQueryPrepared(conn, "index_sector_places", 2, paramValues, paramLengths, paramFormats, 1);
241 fprintf(stderr, "index_sector_places: SELECT failed: %s", PQerrorMessage(conn));
251 tuples = PQntuples(resPlaces);
256 for (i = 0; i < num_threads; i++)
258 thread_data[i].res = resPlaces;
259 thread_data[i].tuples = tuples;
260 thread_data[i].count = &count;
261 thread_data[i].count_mutex = &count_mutex;
262 thread_data[i].writer = writer;
263 thread_data[i].writer_mutex = &writer_mutex;
264 pthread_create(&thread_data[i].thread, NULL, &nominatim_indexThread, (void *)&thread_data[i]);
267 // Monitor threads to give user feedback
269 while (count < tuples)
273 // Aim for one update per second
274 if (sleepcount++ > 500)
276 rankPerSecond = ((float)rankCountTuples + (float)count) / MAX(difftime(time(0), rankStartTime),1);
277 fprintf(stderr, " Done %i in %i @ %f per second - Rank %i ETA (seconds): %f\n", (rankCountTuples + count), (int)(difftime(time(0), rankStartTime)), rankPerSecond, rank, ((float)(rankTotalTuples - (rankCountTuples + count)))/rankPerSecond);
282 // Wait for everything to finish
283 for (i = 0; i < num_threads; i++)
285 pthread_join(thread_data[i].thread, NULL);
288 rankCountTuples += tuples;
292 rankPerSecond = (float)rankCountTuples / MAX(difftime(time(0), rankStartTime),1);
293 fprintf(stderr, " Done %i in %i @ %f per second - ETA (seconds): %f\n", rankCountTuples, (int)(difftime(time(0), rankStartTime)), rankPerSecond, ((float)(rankTotalTuples - rankCountTuples))/rankPerSecond);
297 if (rankTotalTuples-rankCountTuples < num_threads*20 && iSector < PQntuples(resSectors))
299 iSector = PQntuples(resSectors) - 1;
303 fprintf(stderr, "\r Done %i in %i @ %f per second - FINISHED \n\n", rankCountTuples, (int)(difftime(time(0), rankStartTime)), rankPerSecond);
310 nominatim_exportXMLEnd(writer);
314 void *nominatim_indexThread(void * thread_data_in)
316 struct index_thread_data * thread_data = (struct index_thread_data * )thread_data_in;
317 struct export_data querySet;
321 const char *paramValues[1];
324 uint32_t paramPlaceID;
326 time_t updateStartTime;
330 pthread_mutex_lock( thread_data->count_mutex );
331 if (*(thread_data->count) >= thread_data->tuples)
333 pthread_mutex_unlock( thread_data->count_mutex );
337 place_id = PGint32(*((uint32_t *)PQgetvalue(thread_data->res, *thread_data->count, 0)));
338 (*thread_data->count)++;
340 pthread_mutex_unlock( thread_data->count_mutex );
342 if (verbose) fprintf(stderr, " Processing place_id %d\n", place_id);
344 updateStartTime = time(0);
347 if (thread_data->writer)
349 nominatim_exportPlaceQueries(place_id, thread_data->conn, &querySet);
355 paramPlaceID = PGint32(place_id);
356 paramValues[0] = (char *)¶mPlaceID;
357 paramLengths[0] = sizeof(paramPlaceID);
359 res = PQexecPrepared(thread_data->conn, "index_placex", 1, paramValues, paramLengths, paramFormats, 1);
360 if (PQresultStatus(res) == PGRES_COMMAND_OK)
364 if (strncmp(PQerrorMessage(thread_data->conn), "ERROR: deadlock detected", 25))
366 fprintf(stderr, "index_placex: UPDATE failed - deadlock, retrying\n");
372 fprintf(stderr, "index_placex: UPDATE failed: %s", PQerrorMessage(thread_data->conn));
375 // exit(EXIT_FAILURE);
380 if (difftime(time(0), updateStartTime) > 1) fprintf(stderr, " Slow place_id %d\n", place_id);
382 if (thread_data->writer)
384 nominatim_exportPlace(place_id, thread_data->conn, thread_data->writer, thread_data->writer_mutex, &querySet);
385 nominatim_exportFreeQueries(&querySet);