Use 64 bit integers for object ID values
[planetdump.git] / planet06_pg.cpp
1 #ifndef _GNU_SOURCE
2 #define _GNU_SOURCE
3 #endif
4
5 #undef USE_ICONV
6
7 #include <stdio.h>
8 #include <stdlib.h>
9 #include <string.h>
10 #include <stdarg.h>
11 #include <sys/types.h>
12 #include <sys/stat.h>
13 #include <sys/socket.h>
14 #include <sys/un.h>
15 #include <unistd.h>
16 #include <fcntl.h>
17 #include <errno.h>
18 #include <limits.h>
19 #include <time.h>
20 #include <utime.h>
21
22 #include <pqxx/pqxx>
23 #include <stdexcept>
24 #include <signal.h>
25 #include <stdarg.h>
26 #include <assert.h>
27 #include <cstdlib>
28 #include <cstdint>
29
30 #include "users.hpp"
31 extern "C" {
32 #include "keyvals.h"
33 #include "output_osm.h"
34 }
35
36 #define SCALE 10000000.0
37
38 using namespace std;
39 using namespace pqxx;
40
41 const char *reformDate(const char *str)
42 {
43     static char out[64], prev[64]; // Not thread safe
44
45     time_t tmp;
46     struct tm tm;
47
48     // Re-use the previous answer if we asked to convert the same timestamp twice
49     // This accelerates bulk uploaded data where sequential features often have the same timestamp
50     if (!strncmp(prev, str, sizeof(prev)))
51         return out;
52     else
53         strncpy(prev, str, sizeof(prev));
54
55     // 2007-05-20 13:51:35
56     bzero(&tm, sizeof(tm));
57     int n = sscanf(str, "%d-%d-%d %d:%d:%d",
58                    &tm.tm_year, &tm.tm_mon, &tm.tm_mday, &tm.tm_hour, &tm.tm_min, &tm.tm_sec);
59
60     if (n !=6)
61         printf("failed to parse date string, got(%d): %s\n", n, str);
62
63     tm.tm_year -= 1900;
64     tm.tm_mon  -= 1;
65     tm.tm_isdst = -1;
66
67     //2007-07-10T11:32:32Z
68     snprintf(out, sizeof(out), "%d-%02d-%02dT%02d:%02d:%02dZ",
69              tm.tm_year+1900, tm.tm_mon+1, tm.tm_mday, tm.tm_hour, tm.tm_min, tm.tm_sec);
70
71     return out;
72 }
73
74 /**
75  * Uses a cursor through a particular table to give the appearance 
76  * of an array of elements. This only works if the IDs are accessed 
77  * in a strictly ascending order and its only efficient if most of 
78  * the elements are accessed.
79  */
80 template <typename id_type>
81 class table_stream {
82 public:
83
84   table_stream(pqxx::work &x, string query, string name) 
85     : stream(x, query, name, 10000),
86       ic_itr(stream), ic_end(), next_id(0) {
87     // initialise the iterators to the beginning of the table
88     // and read the first element from it.
89     r_itr = ic_itr->begin();
90
91     if (r_itr != ic_itr->end()) {
92       if (!(*r_itr)[0].to<id_type>(next_id)) {
93         throw std::runtime_error("ID is not numeric.");
94       }
95     } else {
96       next_id = std::numeric_limits<id_type>::max();
97     }
98   }
99
100   bool find_id(id_type id) {
101     while (id > next_id) {
102       next();
103     }
104
105     return id == next_id;
106   }
107
108 private:
109
110   icursorstream stream;
111   icursor_iterator ic_itr;
112   const icursor_iterator ic_end;
113
114 protected:
115
116   void next() {
117     ++r_itr;
118     if (r_itr == ic_itr->end()) {
119       // we're at the end of this chunk, so grab another chunk from
120       // the cursor
121       ++ic_itr;
122       if (ic_itr == ic_end) {
123         // reached the end of the table, so flag this to the other
124         // routines by setting next_id to an invalid value
125         next_id = std::numeric_limits<id_type>::max();
126       } else {
127         r_itr = ic_itr->begin();
128       }
129     }
130
131     // get the next ID unless the end of the table has been hit
132     if (next_id < std::numeric_limits<id_type>::max()) {
133       if (!(*r_itr)[0].to<id_type>(next_id)) {
134         throw std::runtime_error("Next ID is non-numeric.");
135       }
136     }
137   }
138
139   result::const_iterator r_itr;
140   id_type next_id;
141 };
142
143 /**
144  * extends the table stream to provide utility methods for getting
145  * tags out of the table.
146  */
147 struct tag_stream 
148   : public table_stream<int64_t> {
149
150    tag_stream(pqxx::work &x, const char *table, const char *type) 
151       : table_stream<int64_t>(x, query(table, type), "fetch_tags") {
152   }
153
154   bool get(int64_t id, struct keyval *kv) {
155     bool has_tags = find_id(id);
156     resetList(kv);
157     if (has_tags) {
158       while (id == next_id) {
159         addItem(kv, (*r_itr)[1].c_str(), (*r_itr)[2].c_str(), 0);
160         next();
161       }
162     }
163     return has_tags;
164   }
165
166 private:
167
168   string query(const char *table, const char *type) {
169     ostringstream ostr;
170     ostr << "select " << type << "_id, k, v from " << table << " order by " << type << "_id, k";
171     return ostr.str();
172   }
173 };
174
175 /**
176  * gets way nodes out of the stream and returns them in the 
177  * keyval struct that the output functions are expecting.
178  */
179 struct way_node_stream 
180   : public table_stream<int64_t> {
181
182   way_node_stream(pqxx::work &x) 
183     : table_stream<int64_t>(x, "select way_id, node_id from current_way_nodes "
184                             "ORDER BY way_id, sequence_id", "fetch_way_nodes") {
185   }
186
187   bool get(int64_t id, struct keyval *kv) {
188     bool has_nodes = find_id(id);
189     resetList(kv);
190     if (has_nodes) {
191       while (id == next_id) {
192         addItem(kv, "", (*r_itr)[1].c_str(), 0);
193         next();
194       }
195     }
196     return has_nodes;
197   }
198 };
199
200 /**
201  * gets relation members out of the stream and returns them in the 
202  * pair of keyval structs that the output functions are expecting.
203  */
204 struct relation_member_stream 
205   : public table_stream<int64_t> {
206
207   relation_member_stream(pqxx::work &x) 
208     : table_stream<int64_t>(x, "select relation_id, member_id, member_type, "
209                             "member_role from current_relation_members "
210                             "ORDER BY relation_id, sequence_id", "fetch_relation_members") {
211   }
212
213   bool get(int64_t id, struct keyval *members, struct keyval *roles) {
214     bool has_members = find_id(id);
215     resetList(members);
216     resetList(roles);
217     if (has_members) {
218       while (id == next_id) {
219         addItem(members, (*r_itr)[2].c_str(), (*r_itr)[1].c_str(), 0);
220         addItem(roles, "", (*r_itr)[3].c_str(), 0);
221         next();
222       }
223     }
224     return has_members;
225   }
226 };
227
228 void changesets(pqxx::work &xaction) {
229   // work around reformDate returning pointer to static storage :-(
230   static char created_at[64];
231   struct keyval tags;
232   initList(&tags);
233
234   ostringstream query;
235   query << "select id, user_id, created_at, closed_at, num_changes, "
236         << "min_lat, max_lat, min_lon, max_lon, closed_at > now() at time zone 'utc' as open "
237         << "from changesets c order by id";
238
239   icursorstream changesets(xaction, query.str(), "fetch_changesets", 1000);
240   tag_stream tagstream(xaction, "changeset_tags", "changeset");
241
242   const icursor_iterator ic_end;
243   for (icursor_iterator ic_itr(changesets); ic_itr != ic_end; ++ic_itr) {
244     const pqxx::result &res = *ic_itr;
245     for (pqxx::result::const_iterator itr = res.begin();
246          itr != res.end(); ++itr) {
247       const pqxx::result::tuple &row = *itr;
248       const int64_t id = row[0].as<int64_t>();
249       const int num_changes = row[4].as<int>();
250       const bool null_bbox = row[5].is_null() || row[6].is_null() || row[7].is_null() || row[8].is_null();
251
252       if (!tagstream.get(id, &tags)) {
253         resetList(&tags);
254       }
255
256       // work around reformDate returning pointer to static data
257       strncpy(created_at, reformDate(row[2].c_str()), 64);
258
259       osm_changeset(id,
260                     lookup_user(row[1].c_str()), // user_id
261                     created_at, // created_at
262                     reformDate(row[3].c_str()), // closed_at
263             row[4].as<int>(), // num_changes
264                     null_bbox ? 0 : 1,
265                     null_bbox ? 0 : row[5].as<int>() / SCALE, // min_lat
266                     null_bbox ? 0 : row[6].as<int>() / SCALE, // max_lat
267                     null_bbox ? 0 : row[7].as<int>() / SCALE, // min_lon
268                     null_bbox ? 0 : row[8].as<int>() / SCALE, // max_lon
269                     row[9].as<bool>() ? 1 : 0, // open
270                     &tags);
271     }
272   }
273
274   resetList(&tags);
275 }
276
277 void nodes(pqxx::work &xaction) {
278   struct keyval tags;
279   initList(&tags);
280   
281   ostringstream query;
282   query << "select n.id, n.latitude, n.longitude, n.timestamp, "
283         << "c.user_id, n.version, n.changeset_id "
284         << "from current_nodes n join changesets c on n.changeset_id=c.id "
285         << "where n.visible = true order by n.id";
286   
287   icursorstream nodes(xaction, query.str(), "fetch_nodes", 1000);
288   tag_stream tagstream(xaction, "current_node_tags", "node");
289
290   const icursor_iterator ic_end;
291   for (icursor_iterator ic_itr(nodes); ic_itr != ic_end; ++ic_itr) {
292     const pqxx::result &res = *ic_itr;
293     for (pqxx::result::const_iterator itr = res.begin();
294          itr != res.end(); ++itr) {
295       int64_t id;
296       int version, latitude, longitude, changeset;
297
298       if (!(*itr)[0].to<int64_t>(id)) {
299         throw std::runtime_error("Node ID is not numeric.");
300       }
301       if (!(*itr)[1].to<int>(latitude)) {
302         throw std::runtime_error("Latitude is not numeric.");
303       }
304       if (!(*itr)[2].to<int>(longitude)) {
305         throw std::runtime_error("Longitude is not numeric.");
306       }
307       if (!(*itr)[5].to<int>(version)) {
308         throw std::runtime_error("Version is not numeric.");
309       }
310       if (!(*itr)[6].to<int>(changeset)) {
311         throw std::runtime_error("Changeset ID is not numeric.");
312       }
313
314       if (!tagstream.get(id, &tags)) {
315         resetList(&tags);
316       }
317
318       osm_node(id, latitude / SCALE, longitude / SCALE, &tags, 
319                reformDate((*itr)[3].c_str()), 
320                lookup_user((*itr)[4].c_str()), 
321                version, changeset);
322     }
323   }
324
325   resetList(&tags);
326 }
327
328 void ways(pqxx::work &xaction) {
329   struct keyval tags, nodes;
330   initList(&tags);
331   initList(&nodes);
332   
333   ostringstream query;
334   query << "select w.id, w.timestamp, cs.user_id, w.version, "
335         << "w.changeset_id from current_ways w join changesets cs on "
336         << "w.changeset_id=cs.id where visible = true order by id";
337   
338   icursorstream ways(xaction, query.str(), "fetch_ways", 1000);
339   tag_stream tagstream(xaction, "current_way_tags", "way");
340   way_node_stream nodestream(xaction);
341
342   const icursor_iterator ic_end;
343   for (icursor_iterator ic_itr(ways); ic_itr != ic_end; ++ic_itr) {
344     const pqxx::result &res = *ic_itr;
345     for (pqxx::result::const_iterator itr = res.begin();
346          itr != res.end(); ++itr) {
347       int64_t id;
348       int version, changeset;
349
350       if (!(*itr)[0].to<int64_t>(id)) {
351         throw std::runtime_error("Node ID is not numeric.");
352       }
353       if (!(*itr)[3].to<int>(version)) {
354         throw std::runtime_error("Version is not numeric.");
355       }
356       if (!(*itr)[4].to<int>(changeset)) {
357         throw std::runtime_error("Changeset ID is not numeric.");
358       }
359
360       tagstream.get(id, &tags);
361       nodestream.get(id, &nodes);
362
363       osm_way(id, &nodes, &tags, 
364               reformDate((*itr)[1].c_str()), 
365               lookup_user((*itr)[2].c_str()), 
366               version, changeset);
367     }
368   }
369
370   resetList(&tags);
371   resetList(&nodes);
372 }
373
374 void relations(pqxx::work &xaction) {
375   struct keyval tags, members, roles;
376   initList(&tags);
377   initList(&members);
378   initList(&roles);
379   
380   ostringstream query;
381   query << "select r.id, r.timestamp, c.user_id, r.version, r.changeset_id "
382         << "from current_relations r join changesets c on "
383         << "r.changeset_id=c.id where visible = true ORDER BY id";
384   
385   icursorstream relations(xaction, query.str(), "fetch_relations", 1000);
386   tag_stream tagstream(xaction, "current_relation_tags", "relation");
387   relation_member_stream memstream(xaction);
388
389   const icursor_iterator ic_end;
390   for (icursor_iterator ic_itr(relations); ic_itr != ic_end; ++ic_itr) {
391     const pqxx::result &res = *ic_itr;
392     for (pqxx::result::const_iterator itr = res.begin();
393          itr != res.end(); ++itr) {
394       int64_t id;
395       int version, changeset;
396
397       if (!(*itr)[0].to<int64_t>(id)) {
398         throw std::runtime_error("Relation ID is not numeric.");
399       }
400       if (!(*itr)[3].to<int>(version)) {
401         throw std::runtime_error("Version is not numeric.");
402       }
403       if (!(*itr)[4].to<int>(changeset)) {
404         throw std::runtime_error("Changeset ID is not numeric.");
405       }
406
407       tagstream.get(id, &tags);
408       memstream.get(id, &members, &roles);
409
410       osm_relation(id, &members, &roles, &tags, 
411                    reformDate((*itr)[1].c_str()), 
412                    lookup_user((*itr)[2].c_str()), 
413                    version, changeset);
414     }
415   }
416
417   resetList(&tags);
418   resetList(&members);
419   resetList(&roles);
420 }
421
422 int main(int argc, char **argv)
423 {
424   int i;
425   int want_nodes, want_ways, want_relations, want_changesets;
426     
427   if (argc == 1)
428     // note: changesets not enabled by default, yet. client software will need
429     // time to prepare. suggestion is to have them as a separate download.
430     want_nodes = want_ways = want_relations = 1;
431   else {
432     want_nodes = want_ways = want_relations = want_changesets = 0;
433     for(i=1; i<argc; i++) {
434       if (!strcmp(argv[i], "--nodes"))
435         want_nodes = 1;
436       else if (!strcmp(argv[i], "--ways"))
437         want_ways = 1;
438       else if (!strcmp(argv[i], "--relations"))
439         want_relations = 1;
440       else if (!strcmp(argv[i], "--changesets"))
441         want_changesets = 1;
442       else {
443         fprintf(stderr, "Usage error:\n");
444         fprintf(stderr, "\t%s [--nodes] [--ways] [--relations] [--changesets]\n\n", argv[0]);
445         fprintf(stderr, "Writes OSM planet dump to STDOUT. If no flags are specified then nodes, ways and\n");
446         fprintf(stderr, "relations are output. If one or more flags are set then only the requested data\n");
447         fprintf(stderr, "is dumped.\n");
448         exit(2);
449       }
450     }
451   }
452
453   char *connection_params = NULL;
454   if ((connection_params = getenv("CONNECTION_PARAMS")) == NULL) {
455     fprintf(stderr, "ERROR: you must set the $CONNECTION_PARAMS environment "
456             "variable to the appropriate connection parameters.\n");
457     exit(2);
458   }
459
460   try {
461     // open database connection
462     pqxx::connection conn(connection_params);
463     pqxx::work xaction(conn);
464     xaction.exec("set transaction isolation level serializable read only");
465
466     fetch_users(xaction);
467
468     osm_header();
469     
470     if (want_changesets)
471       changesets(xaction);
472
473     if (want_nodes)
474       nodes(xaction);
475
476     if (want_ways)
477       ways(xaction);
478     
479     if (want_relations)
480       relations(xaction);
481
482     osm_footer();
483     
484     free_users();
485
486     // rollback happens automatically here
487   } catch (const std::exception &e) {
488     fprintf(stderr, "ERROR: %s\n", e.what());
489     return 1;
490
491   } catch (...) {
492     fprintf(stderr, "UNKNOWN ERROR!\n");
493     return 1;
494   }
495
496   return 0;
497 }