2  * triggers indexing (reparenting etc.) through setting resetting indexed_status: update placex/osmline set indexed_status = 0 where indexed_status > 0
 
   3  * triggers placex_update and osmline_update
 
  17 #include "nominatim.h"
 
  20 #include "postgresql.h"
 
  24 void run_indexing(int rank, int interpolation, PGconn *conn, int num_threads, 
 
  25 struct index_thread_data * thread_data, const char *structuredoutputfile)
 
  27     int tuples, count, sleepcount;
 
  28     pthread_mutex_t count_mutex = PTHREAD_MUTEX_INITIALIZER;
 
  35     PGresult * resSectors;
 
  43     const char *paramValues[2];
 
  50     xmlTextWriterPtr writer;
 
  51     pthread_mutex_t writer_mutex = PTHREAD_MUTEX_INITIALIZER;
 
  53     // Create the output file
 
  55     if (structuredoutputfile)
 
  57         writer = nominatim_exportXMLStart(structuredoutputfile);
 
  62         fprintf(stderr, "Starting interpolation lines (location_property_osmline)\n");
 
  66         fprintf(stderr, "Starting rank %d\n", rank);
 
  72     paramRank = PGint32(rank);
 
  73     paramValues[0] = (char *)¶mRank;
 
  74     paramLengths[0] = sizeof(paramRank);
 
  79         resSectors = PQexecPrepared(conn, "index_sectors_osmline", 0, NULL, 0, NULL, 1);
 
  83         resSectors = PQexecPrepared(conn, "index_sectors", 1, paramValues, paramLengths, paramFormats, 1);
 
  85     if (PQresultStatus(resSectors) != PGRES_TUPLES_OK)
 
  87         fprintf(stderr, "index_sectors: SELECT failed: %s", PQerrorMessage(conn));
 
  91     if (PQftype(resSectors, 0) != PG_OID_INT4)
 
  93         fprintf(stderr, "Sector value has unexpected type\n");
 
  97     if (PQftype(resSectors, 1) != PG_OID_INT8)
 
  99         fprintf(stderr, "Sector value has unexpected type\n");
 
 105     for (iSector = 0; iSector < PQntuples(resSectors); iSector++)
 
 107         rankTotalTuples += PGint64(*((uint64_t *)PQgetvalue(resSectors, iSector, 1)));
 
 110     rankStartTime = time(0);
 
 111     for (iSector = 0; iSector <= PQntuples(resSectors); iSector++)
 
 115             resPlaces = PQgetResult(conn);
 
 116             if (PQresultStatus(resPlaces) != PGRES_TUPLES_OK)
 
 118                 fprintf(stderr, "index_sector_places: SELECT failed: %s", PQerrorMessage(conn));
 
 122             if (PQftype(resPlaces, 0) != PG_OID_INT8)
 
 124                 fprintf(stderr, "Place_id value has unexpected type\n");
 
 128             resNULL = PQgetResult(conn);
 
 131                 fprintf(stderr, "Unexpected non-null response\n");
 
 136         if (iSector < PQntuples(resSectors))
 
 138             sector = PGint32(*((uint32_t *)PQgetvalue(resSectors, iSector, 0)));
 
 139 //                fprintf(stderr, "\n Starting sector %d size %ld\n", sector, PGint64(*((uint64_t *)PQgetvalue(resSectors, iSector, 1))));
 
 141             // Get all the place_id's for this sector
 
 142             paramRank = PGint32(rank);
 
 143             paramSector = PGint32(sector);
 
 144             if (rankTotalTuples-rankCountTuples < num_threads*1000)
 
 149                     iResult = PQsendQueryPrepared(conn, "index_nosector_places_osmline", 0, NULL, 0, NULL, 1);
 
 153                     paramValues[0] = (char *)¶mRank;
 
 154                     paramLengths[0] = sizeof(paramRank);
 
 156                     iResult = PQsendQueryPrepared(conn, "index_nosector_places", 1, paramValues, paramLengths, paramFormats, 1);
 
 163                     iResult = PQsendQueryPrepared(conn, "index_sector_places_osmline", 1, paramValues, paramLengths, paramFormats, 1);
 
 164                     paramValues[0] = (char *)¶mSector;
 
 165                     paramLengths[0] = sizeof(paramSector);
 
 170                     paramValues[0] = (char *)¶mRank;
 
 171                     paramLengths[0] = sizeof(paramRank);
 
 173                     paramValues[1] = (char *)¶mSector;
 
 174                     paramLengths[1] = sizeof(paramSector);
 
 176                     iResult = PQsendQueryPrepared(conn, "index_sector_places", 2, paramValues, paramLengths, paramFormats, 1);
 
 181                 fprintf(stderr, "index_sector_places: SELECT failed: %s", PQerrorMessage(conn));
 
 190             tuples = PQntuples(resPlaces);
 
 195                 for (i = 0; i < num_threads; i++)
 
 197                     thread_data[i].res = resPlaces;
 
 198                     thread_data[i].tuples = tuples;
 
 199                     thread_data[i].count = &count;
 
 200                     thread_data[i].count_mutex = &count_mutex;
 
 201                     thread_data[i].writer = writer;
 
 202                     thread_data[i].writer_mutex = &writer_mutex;
 
 205                         thread_data[i].table = 0;  // use interpolations table
 
 209                         thread_data[i].table = 1;  // use placex table
 
 211                     pthread_create(&thread_data[i].thread, NULL, &nominatim_indexThread, (void *)&thread_data[i]);
 
 214                 // Monitor threads to give user feedback
 
 216                 while (count < tuples)
 
 220                     // Aim for one update per second
 
 221                     if (sleepcount++ > 1000)
 
 223                         rankPerSecond = ((float)rankCountTuples + (float)count) / MAX(difftime(time(0), rankStartTime),1);
 
 226                             fprintf(stderr, "  Done %i in %i @ %f per second - Interpolation lines ETA (seconds): %f\n", (rankCountTuples + count), (int)(difftime(time(0), rankStartTime)), rankPerSecond, ((float)(rankTotalTuples - (rankCountTuples + count)))/rankPerSecond);
 
 230                             fprintf(stderr, "  Done %i in %i @ %f per second - Rank %i ETA (seconds): %f\n", (rankCountTuples + count), (int)(difftime(time(0), rankStartTime)), rankPerSecond, rank, ((float)(rankTotalTuples - (rankCountTuples + count)))/rankPerSecond);
 
 237                 // Wait for everything to finish
 
 238                 for (i = 0; i < num_threads; i++)
 
 240                     pthread_join(thread_data[i].thread, NULL);
 
 243                 rankCountTuples += tuples;
 
 247             rankPerSecond = (float)rankCountTuples / MAX(difftime(time(0), rankStartTime),1);
 
 248             fprintf(stderr, "  Done %i in %i @ %f per second - ETA (seconds): %f\n", rankCountTuples, (int)(difftime(time(0), rankStartTime)), rankPerSecond, ((float)(rankTotalTuples - rankCountTuples))/rankPerSecond);
 
 252         if (rankTotalTuples-rankCountTuples < num_threads*20 && iSector < PQntuples(resSectors))
 
 254             iSector = PQntuples(resSectors) - 1;
 
 258     fprintf(stderr, "\r  Done %i in %i @ %f per second - FINISHED\n\n", rankCountTuples, (int)(difftime(time(0), rankStartTime)), rankPerSecond);
 
 263 void nominatim_index(int rank_min, int rank_max, int num_threads, const char *conninfo, const char *structuredoutputfile)
 
 265     struct index_thread_data * thread_data;
 
 274     xmlTextWriterPtr writer;
 
 275     pthread_mutex_t writer_mutex = PTHREAD_MUTEX_INITIALIZER;
 
 277     Oid pg_prepare_params[2];
 
 279     conn = PQconnectdb(conninfo);
 
 280     if (PQstatus(conn) != CONNECTION_OK)
 
 282         fprintf(stderr, "Connection to database failed: %s\n", PQerrorMessage(conn));
 
 286     pg_prepare_params[0] = PG_OID_INT4;
 
 287     res = PQprepare(conn, "index_sectors",
 
 288                     "select geometry_sector,count(*) from placex where rank_search = $1 and indexed_status > 0 group by geometry_sector order by geometry_sector",
 
 289                     1, pg_prepare_params);
 
 290     if (PQresultStatus(res) != PGRES_COMMAND_OK)
 
 292         fprintf(stderr, "Failed preparing index_sectors: %s\n", PQerrorMessage(conn));
 
 297     res = PQprepare(conn, "index_sectors_osmline",
 
 298                     "select geometry_sector,count(*) from location_property_osmline where indexed_status > 0 group by geometry_sector order by geometry_sector",
 
 300     if (PQresultStatus(res) != PGRES_COMMAND_OK)
 
 302         fprintf(stderr, "Failed preparing index_sectors: %s\n", PQerrorMessage(conn));
 
 307     pg_prepare_params[0] = PG_OID_INT4;
 
 308     res = PQprepare(conn, "index_nosectors",
 
 309                     "select 0::integer,count(*) from placex where rank_search = $1 and indexed_status > 0",
 
 310                     1, pg_prepare_params);
 
 311     if (PQresultStatus(res) != PGRES_COMMAND_OK)
 
 313         fprintf(stderr, "Failed preparing index_sectors: %s\n", PQerrorMessage(conn));
 
 318     pg_prepare_params[0] = PG_OID_INT4;
 
 319     pg_prepare_params[1] = PG_OID_INT4;
 
 320     res = PQprepare(conn, "index_sector_places",
 
 321                     "select place_id from placex where rank_search = $1 and geometry_sector = $2 and indexed_status > 0",
 
 322                     2, pg_prepare_params);
 
 323     if (PQresultStatus(res) != PGRES_COMMAND_OK)
 
 325         fprintf(stderr, "Failed preparing index_sector_places: %s\n", PQerrorMessage(conn));
 
 330     pg_prepare_params[0] = PG_OID_INT4;
 
 331     res = PQprepare(conn, "index_nosector_places",
 
 332                     "select place_id from placex where rank_search = $1 and indexed_status > 0 order by geometry_sector",
 
 333                     1, pg_prepare_params);
 
 334     if (PQresultStatus(res) != PGRES_COMMAND_OK)
 
 336         fprintf(stderr, "Failed preparing index_nosector_places: %s\n", PQerrorMessage(conn));
 
 341     pg_prepare_params[0] = PG_OID_INT4;
 
 342     res = PQprepare(conn, "index_sector_places_osmline",
 
 343                     "select place_id from location_property_osmline where geometry_sector = $1 and indexed_status > 0",
 
 344                     1, pg_prepare_params);
 
 345     if (PQresultStatus(res) != PGRES_COMMAND_OK)
 
 347         fprintf(stderr, "Failed preparing index_sector_places: %s\n", PQerrorMessage(conn));
 
 352     res = PQprepare(conn, "index_nosector_places_osmline",
 
 353                     "select place_id from location_property_osmline where indexed_status > 0 order by geometry_sector",
 
 355     if (PQresultStatus(res) != PGRES_COMMAND_OK)
 
 357         fprintf(stderr, "Failed preparing index_nosector_places: %s\n", PQerrorMessage(conn));
 
 362     // Build the data for each thread
 
 363     thread_data = (struct index_thread_data *)malloc(sizeof(struct index_thread_data)*num_threads);
 
 364     for (i = 0; i < num_threads; i++)
 
 366         thread_data[i].conn = PQconnectdb(conninfo);
 
 367         if (PQstatus(thread_data[i].conn) != CONNECTION_OK)
 
 369             fprintf(stderr, "Connection to database failed: %s\n", PQerrorMessage(thread_data[i].conn));
 
 373         pg_prepare_params[0] = PG_OID_INT8;
 
 374         res = PQprepare(thread_data[i].conn, "index_placex",
 
 375                         "update placex set indexed_status = 0 where place_id = $1",
 
 376                         1, pg_prepare_params);
 
 377         if (PQresultStatus(res) != PGRES_COMMAND_OK)
 
 379             fprintf(stderr, "Failed preparing index_placex: %s\n", PQerrorMessage(thread_data[i].conn));
 
 384         pg_prepare_params[0] = PG_OID_INT8;
 
 385         res = PQprepare(thread_data[i].conn, "index_osmline",
 
 386                         "update location_property_osmline set indexed_status = 0 where place_id = $1",
 
 387                         1, pg_prepare_params);
 
 388         if (PQresultStatus(res) != PGRES_COMMAND_OK)
 
 390             fprintf(stderr, "Failed preparing index_osmline: %s\n", PQerrorMessage(thread_data[i].conn));
 
 395         // Make sure the error message is not localized as we parse it later.
 
 396         res = PQexec(thread_data[i].conn, "SET lc_messages TO 'C'");
 
 397         if (PQresultStatus(res) != PGRES_COMMAND_OK)
 
 399             fprintf(stderr, "Failed to set langauge: %s\n", PQerrorMessage(thread_data[i].conn));
 
 404         nominatim_exportCreatePreparedQueries(thread_data[i].conn);
 
 408     fprintf(stderr, "Starting indexing rank (%i to %i) using %i threads\n", rank_min, rank_max, num_threads);
 
 410     for (rank = rank_min; rank <= rank_max; rank++)
 
 412         // OSMLINE: do reindexing (=> reparenting) for interpolation lines at rank 30, but before all other objects of rank 30
 
 413         // reason: houses (rank 30) depend on the updated interpolation line, when reparenting (see placex_update in functions.sql)
 
 416             run_indexing(rank, 1, conn, num_threads, thread_data, structuredoutputfile);
 
 418         run_indexing(rank, 0, conn, num_threads, thread_data, structuredoutputfile);
 
 420         // Close all connections
 
 421         for (i = 0; i < num_threads; i++)
 
 423                 PQfinish(thread_data[i].conn);
 
 428 void *nominatim_indexThread(void * thread_data_in)
 
 430     struct index_thread_data * thread_data = (struct index_thread_data * )thread_data_in;
 
 431     struct export_data  querySet;
 
 435     const char  *paramValues[1];
 
 438     uint64_t    paramPlaceID;
 
 440     time_t      updateStartTime;
 
 443     table = (uint)(thread_data->table);
 
 447         pthread_mutex_lock( thread_data->count_mutex );
 
 448         if (*(thread_data->count) >= thread_data->tuples)
 
 450             pthread_mutex_unlock( thread_data->count_mutex );
 
 454         place_id = PGint64(*((uint64_t *)PQgetvalue(thread_data->res, *thread_data->count, 0)));
 
 455         (*thread_data->count)++;
 
 457         pthread_mutex_unlock( thread_data->count_mutex );
 
 459         if (verbose) fprintf(stderr, "  Processing place_id %ld\n", place_id);
 
 461         updateStartTime = time(0);
 
 464         if (thread_data->writer)
 
 466              nominatim_exportPlaceQueries(place_id, thread_data->conn, &querySet);
 
 471             paramPlaceID = PGint64(place_id);
 
 472             paramValues[0] = (char *)¶mPlaceID;
 
 473             paramLengths[0] = sizeof(paramPlaceID);
 
 475             if (table == 1) // table=1 for placex
 
 477                 res = PQexecPrepared(thread_data->conn, "index_placex", 1, paramValues, paramLengths, paramFormats, 1);
 
 479             else // table=0 for osmline
 
 481                 res = PQexecPrepared(thread_data->conn, "index_osmline", 1, paramValues, paramLengths, paramFormats, 1);
 
 483             if (PQresultStatus(res) == PGRES_COMMAND_OK)
 
 487                 if (!strncmp(PQerrorMessage(thread_data->conn), "ERROR:  deadlock detected", 25))
 
 491                         fprintf(stderr, "index_placex: UPDATE failed - deadlock, retrying (%ld)\n", place_id);
 
 495                         fprintf(stderr, "index_osmline: UPDATE failed - deadlock, retrying (%ld)\n", place_id);
 
 504                         fprintf(stderr, "index_placex: UPDATE failed: %s", PQerrorMessage(thread_data->conn));
 
 508                         fprintf(stderr, "index_osmline: UPDATE failed: %s", PQerrorMessage(thread_data->conn));
 
 516         if (difftime(time(0), updateStartTime) > 1) fprintf(stderr, "  Slow place_id %ld\n", place_id);
 
 518         if (thread_data->writer)
 
 520             nominatim_exportPlace(place_id, thread_data->conn, thread_data->writer, thread_data->writer_mutex, &querySet);
 
 521             nominatim_exportFreeQueries(&querySet);