15 #include "nominatim.h"
 
  18 #include "postgresql.h"
 
  22 void nominatim_index(int rank_min, int rank_max, int num_threads, const char *conninfo, const char *structuredoutputfile)
 
  24     struct index_thread_data * thread_data;
 
  25     pthread_mutex_t count_mutex = PTHREAD_MUTEX_INITIALIZER;
 
  26     int tuples, count, sleepcount;
 
  35     PGresult * resSectors;
 
  44     const char *paramValues[2];
 
  51     xmlTextWriterPtr writer;
 
  52     pthread_mutex_t writer_mutex = PTHREAD_MUTEX_INITIALIZER;
 
  54     Oid pg_prepare_params[2];
 
  56     conn = PQconnectdb(conninfo);
 
  57     if (PQstatus(conn) != CONNECTION_OK)
 
  59         fprintf(stderr, "Connection to database failed: %s\n", PQerrorMessage(conn));
 
  63     pg_prepare_params[0] = PG_OID_INT4;
 
  64     res = PQprepare(conn, "index_sectors",
 
  65                     "select geometry_sector,count(*) from placex where rank_search = $1 and indexed_status > 0 group by geometry_sector order by geometry_sector",
 
  66                     1, pg_prepare_params);
 
  67     if (PQresultStatus(res) != PGRES_COMMAND_OK)
 
  69         fprintf(stderr, "Failed preparing index_sectors: %s\n", PQerrorMessage(conn));
 
  74     pg_prepare_params[0] = PG_OID_INT4;
 
  75     res = PQprepare(conn, "index_nosectors",
 
  76                     "select 0::integer,count(*) from placex where rank_search = $1 and indexed_status > 0",
 
  77                     1, pg_prepare_params);
 
  78     if (PQresultStatus(res) != PGRES_COMMAND_OK)
 
  80         fprintf(stderr, "Failed preparing index_sectors: %s\n", PQerrorMessage(conn));
 
  85     pg_prepare_params[0] = PG_OID_INT4;
 
  86     pg_prepare_params[1] = PG_OID_INT4;
 
  87     res = PQprepare(conn, "index_sector_places",
 
  88                     "select place_id from placex where rank_search = $1 and geometry_sector = $2 and indexed_status > 0",
 
  89                     2, pg_prepare_params);
 
  90     if (PQresultStatus(res) != PGRES_COMMAND_OK)
 
  92         fprintf(stderr, "Failed preparing index_sector_places: %s\n", PQerrorMessage(conn));
 
  97     pg_prepare_params[0] = PG_OID_INT4;
 
  98     res = PQprepare(conn, "index_nosector_places",
 
  99                     "select place_id from placex where rank_search = $1 and indexed_status > 0 order by geometry_sector",
 
 100                     1, pg_prepare_params);
 
 101     if (PQresultStatus(res) != PGRES_COMMAND_OK)
 
 103         fprintf(stderr, "Failed preparing index_nosector_places: %s\n", PQerrorMessage(conn));
 
 108     // Build the data for each thread
 
 109     thread_data = (struct index_thread_data *)malloc(sizeof(struct index_thread_data)*num_threads);
 
 110     for (i = 0; i < num_threads; i++)
 
 112         thread_data[i].conn = PQconnectdb(conninfo);
 
 113         if (PQstatus(thread_data[i].conn) != CONNECTION_OK)
 
 115             fprintf(stderr, "Connection to database failed: %s\n", PQerrorMessage(thread_data[i].conn));
 
 119         pg_prepare_params[0] = PG_OID_INT8;
 
 120         res = PQprepare(thread_data[i].conn, "index_placex",
 
 121                         "update placex set indexed_status = 0 where place_id = $1",
 
 122                         1, pg_prepare_params);
 
 123         if (PQresultStatus(res) != PGRES_COMMAND_OK)
 
 125             fprintf(stderr, "Failed preparing index_placex: %s\n", PQerrorMessage(conn));
 
 130         /*res = PQexec(thread_data[i].conn, "set enable_seqscan = false");
 
 131         if (PQresultStatus(res) != PGRES_COMMAND_OK)
 
 133             fprintf(stderr, "Failed disabling sequential scan: %s\n", PQerrorMessage(conn));
 
 138         nominatim_exportCreatePreparedQueries(thread_data[i].conn);
 
 141     // Create the output file
 
 143     if (structuredoutputfile)
 
 145         writer = nominatim_exportXMLStart(structuredoutputfile);
 
 148     fprintf(stderr, "Starting indexing rank (%i to %i) using %i threads\n", rank_min, rank_max, num_threads);
 
 150     for (rank = rank_min; rank <= rank_max; rank++)
 
 152         fprintf(stderr, "Starting rank %d\n", rank);
 
 156         paramRank = PGint32(rank);
 
 157         paramValues[0] = (char *)¶mRank;
 
 158         paramLengths[0] = sizeof(paramRank);
 
 161 //            resSectors = PQexecPrepared(conn, "index_nosectors", 1, paramValues, paramLengths, paramFormats, 1);
 
 163         resSectors = PQexecPrepared(conn, "index_sectors", 1, paramValues, paramLengths, paramFormats, 1);
 
 165         if (PQresultStatus(resSectors) != PGRES_TUPLES_OK)
 
 167             fprintf(stderr, "index_sectors: SELECT failed: %s", PQerrorMessage(conn));
 
 171         if (PQftype(resSectors, 0) != PG_OID_INT4)
 
 173             fprintf(stderr, "Sector value has unexpected type\n");
 
 177         if (PQftype(resSectors, 1) != PG_OID_INT8)
 
 179             fprintf(stderr, "Sector value has unexpected type\n");
 
 185         for (iSector = 0; iSector < PQntuples(resSectors); iSector++)
 
 187             rankTotalTuples += PGint64(*((uint64_t *)PQgetvalue(resSectors, iSector, 1)));
 
 190         rankStartTime = time(0);
 
 191         for (iSector = 0; iSector <= PQntuples(resSectors); iSector++)
 
 195                 resPlaces = PQgetResult(conn);
 
 196                 if (PQresultStatus(resPlaces) != PGRES_TUPLES_OK)
 
 198                     fprintf(stderr, "index_sector_places: SELECT failed: %s", PQerrorMessage(conn));
 
 202                 if (PQftype(resPlaces, 0) != PG_OID_INT8)
 
 204                     fprintf(stderr, "Place_id value has unexpected type\n");
 
 208                 resNULL = PQgetResult(conn);
 
 211                     fprintf(stderr, "Unexpected non-null response\n");
 
 216             if (iSector < PQntuples(resSectors))
 
 218                 sector = PGint32(*((uint32_t *)PQgetvalue(resSectors, iSector, 0)));
 
 219 //                fprintf(stderr, "\n Starting sector %d size %ld\n", sector, PGint64(*((uint64_t *)PQgetvalue(resSectors, iSector, 1))));
 
 221                 // Get all the place_id's for this sector
 
 222                 paramRank = PGint32(rank);
 
 223                 paramValues[0] = (char *)¶mRank;
 
 224                 paramLengths[0] = sizeof(paramRank);
 
 226                 paramSector = PGint32(sector);
 
 227                 paramValues[1] = (char *)¶mSector;
 
 228                 paramLengths[1] = sizeof(paramSector);
 
 230                 if (rankTotalTuples-rankCountTuples < num_threads*1000)
 
 232                     iResult = PQsendQueryPrepared(conn, "index_nosector_places", 1, paramValues, paramLengths, paramFormats, 1);
 
 236                     iResult = PQsendQueryPrepared(conn, "index_sector_places", 2, paramValues, paramLengths, paramFormats, 1);
 
 240                     fprintf(stderr, "index_sector_places: SELECT failed: %s", PQerrorMessage(conn));
 
 250                 tuples = PQntuples(resPlaces);
 
 255                     for (i = 0; i < num_threads; i++)
 
 257                         thread_data[i].res = resPlaces;
 
 258                         thread_data[i].tuples = tuples;
 
 259                         thread_data[i].count = &count;
 
 260                         thread_data[i].count_mutex = &count_mutex;
 
 261                         thread_data[i].writer = writer;
 
 262                         thread_data[i].writer_mutex = &writer_mutex;
 
 263                         pthread_create(&thread_data[i].thread, NULL, &nominatim_indexThread, (void *)&thread_data[i]);
 
 266                     // Monitor threads to give user feedback
 
 268                     while (count < tuples)
 
 272                         // Aim for one update per second
 
 273                         if (sleepcount++ > 500)
 
 275                             rankPerSecond = ((float)rankCountTuples + (float)count) / MAX(difftime(time(0), rankStartTime),1);
 
 276                             fprintf(stderr, "  Done %i in %i @ %f per second - Rank %i ETA (seconds): %f\n", (rankCountTuples + count), (int)(difftime(time(0), rankStartTime)), rankPerSecond, rank, ((float)(rankTotalTuples - (rankCountTuples + count)))/rankPerSecond);
 
 281                     // Wait for everything to finish
 
 282                     for (i = 0; i < num_threads; i++)
 
 284                         pthread_join(thread_data[i].thread, NULL);
 
 287                     rankCountTuples += tuples;
 
 291                 rankPerSecond = (float)rankCountTuples / MAX(difftime(time(0), rankStartTime),1);
 
 292                 fprintf(stderr, "  Done %i in %i @ %f per second - ETA (seconds): %f\n", rankCountTuples, (int)(difftime(time(0), rankStartTime)), rankPerSecond, ((float)(rankTotalTuples - rankCountTuples))/rankPerSecond);
 
 296             if (rankTotalTuples-rankCountTuples < num_threads*20 && iSector < PQntuples(resSectors))
 
 298                 iSector = PQntuples(resSectors) - 1;
 
 302         fprintf(stderr, "\r  Done %i in %i @ %f per second - FINISHED                      \n\n", rankCountTuples, (int)(difftime(time(0), rankStartTime)), rankPerSecond);
 
 309         nominatim_exportXMLEnd(writer);
 
 312     // Close all connections
 
 313     for (i = 0; i < num_threads; i++)
 
 315         PQfinish(thread_data[i].conn);
 
 320 void *nominatim_indexThread(void * thread_data_in)
 
 322     struct index_thread_data * thread_data = (struct index_thread_data * )thread_data_in;
 
 323     struct export_data  querySet;
 
 327     const char *paramValues[1];
 
 330     uint64_t    paramPlaceID;
 
 332     time_t              updateStartTime;
 
 336         pthread_mutex_lock( thread_data->count_mutex );
 
 337         if (*(thread_data->count) >= thread_data->tuples)
 
 339             pthread_mutex_unlock( thread_data->count_mutex );
 
 343         place_id = PGint64(*((uint64_t *)PQgetvalue(thread_data->res, *thread_data->count, 0)));
 
 344         (*thread_data->count)++;
 
 346         pthread_mutex_unlock( thread_data->count_mutex );
 
 348         if (verbose) fprintf(stderr, "  Processing place_id %ld\n", place_id);
 
 350         updateStartTime = time(0);
 
 353         if (thread_data->writer)
 
 355              nominatim_exportPlaceQueries(place_id, thread_data->conn, &querySet);
 
 360                 paramPlaceID = PGint64(place_id);
 
 361                 paramValues[0] = (char *)¶mPlaceID;
 
 362                 paramLengths[0] = sizeof(paramPlaceID);
 
 364                 res = PQexecPrepared(thread_data->conn, "index_placex", 1, paramValues, paramLengths, paramFormats, 1);
 
 365                 if (PQresultStatus(res) == PGRES_COMMAND_OK)
 
 369                         if (!strncmp(PQerrorMessage(thread_data->conn), "ERROR:  deadlock detected", 25))
 
 371                             fprintf(stderr, "index_placex: UPDATE failed - deadlock, retrying (%ld)\n", place_id);
 
 377                             fprintf(stderr, "index_placex: UPDATE failed: %s", PQerrorMessage(thread_data->conn));
 
 384         if (difftime(time(0), updateStartTime) > 1) fprintf(stderr, "  Slow place_id %ld\n", place_id);
 
 386         if (thread_data->writer)
 
 388             nominatim_exportPlace(place_id, thread_data->conn, thread_data->writer, thread_data->writer_mutex, &querySet);
 
 389             nominatim_exportFreeQueries(&querySet);