Fixed bug calling reformDate twice. Since it returns a pointer to static data, it...
[planetdump.git] / planet06_pg.cpp
1 #ifndef _GNU_SOURCE
2 #define _GNU_SOURCE
3 #endif
4
5 #undef USE_ICONV
6
7 #include <stdio.h>
8 #include <stdlib.h>
9 #include <string.h>
10 #include <stdarg.h>
11 #include <sys/types.h>
12 #include <sys/stat.h>
13 #include <sys/socket.h>
14 #include <sys/un.h>
15 #include <unistd.h>
16 #include <fcntl.h>
17 #include <errno.h>
18 #include <limits.h>
19 #include <time.h>
20 #include <utime.h>
21
22 #include <pqxx/pqxx>
23 #include <stdexcept>
24 #include <signal.h>
25 #include <stdarg.h>
26 #include <assert.h>
27 #include <cstdlib>
28
29 #include "users.hpp"
30 extern "C" {
31 #include "keyvals.h"
32 #include "output_osm.h"
33 }
34
35 #define SCALE 10000000.0
36
37 using namespace std;
38 using namespace pqxx;
39
40 const char *reformDate(const char *str)
41 {
42     static char out[64], prev[64]; // Not thread safe
43
44     time_t tmp;
45     struct tm tm;
46
47     // Re-use the previous answer if we asked to convert the same timestamp twice
48     // This accelerates bulk uploaded data where sequential features often have the same timestamp
49     if (!strncmp(prev, str, sizeof(prev)))
50         return out;
51     else
52         strncpy(prev, str, sizeof(prev));
53
54     // 2007-05-20 13:51:35
55     bzero(&tm, sizeof(tm));
56     int n = sscanf(str, "%d-%d-%d %d:%d:%d",
57                    &tm.tm_year, &tm.tm_mon, &tm.tm_mday, &tm.tm_hour, &tm.tm_min, &tm.tm_sec);
58
59     if (n !=6)
60         printf("failed to parse date string, got(%d): %s\n", n, str);
61
62     tm.tm_year -= 1900;
63     tm.tm_mon  -= 1;
64     tm.tm_isdst = -1;
65
66     // Rails stores the timestamps in the DB using UK localtime (ugh), convert to UTC
67     tmp = mktime(&tm);
68     gmtime_r(&tmp, &tm);
69
70     //2007-07-10T11:32:32Z
71     snprintf(out, sizeof(out), "%d-%02d-%02dT%02d:%02d:%02dZ",
72              tm.tm_year+1900, tm.tm_mon+1, tm.tm_mday, tm.tm_hour, tm.tm_min, tm.tm_sec);
73
74     return out;
75 }
76
77 /**
78  * Uses a cursor through a particular table to give the appearance 
79  * of an array of elements. This only works if the IDs are accessed 
80  * in a strictly ascending order and its only efficient if most of 
81  * the elements are accessed.
82  */
83 template <typename id_type>
84 class table_stream {
85 public:
86
87   table_stream(pqxx::work &x, string query, string name) 
88     : stream(x, query, name, 10000),
89       ic_itr(stream), ic_end(), next_id(0) {
90     // initialise the iterators to the beginning of the table
91     // and read the first element from it.
92     r_itr = ic_itr->begin();
93
94     if (r_itr != ic_itr->end()) {
95       if (!(*r_itr)[0].to<id_type>(next_id)) {
96         throw std::runtime_error("ID is not numeric.");
97       }
98     } else {
99       next_id = std::numeric_limits<id_type>::max();
100     }
101   }
102
103   bool find_id(id_type id) {
104     while (id > next_id) {
105       next();
106     }
107
108     return id == next_id;
109   }
110
111 private:
112
113   icursorstream stream;
114   icursor_iterator ic_itr;
115   const icursor_iterator ic_end;
116
117 protected:
118
119   void next() {
120     ++r_itr;
121     if (r_itr == ic_itr->end()) {
122       // we're at the end of this chunk, so grab another chunk from
123       // the cursor
124       ++ic_itr;
125       if (ic_itr == ic_end) {
126         // reached the end of the table, so flag this to the other
127         // routines by setting next_id to an invalid value
128         next_id = std::numeric_limits<id_type>::max();
129       } else {
130         r_itr = ic_itr->begin();
131       }
132     }
133
134     // get the next ID unless the end of the table has been hit
135     if (next_id < std::numeric_limits<id_type>::max()) {
136       if (!(*r_itr)[0].to<id_type>(next_id)) {
137         throw std::runtime_error("Next ID is non-numeric.");
138       }
139     }
140   }
141
142   result::const_iterator r_itr;
143   id_type next_id;
144 };
145
146 /**
147  * extends the table stream to provide utility methods for getting
148  * tags out of the table.
149  */
150 struct tag_stream 
151   : public table_stream<int> {
152
153   tag_stream(pqxx::work &x, const char *table) 
154     : table_stream<int>(x, query(table), "fetch_tags") {
155   }
156
157   bool get(int id, struct keyval *kv) {
158     bool has_tags = find_id(id);
159     resetList(kv);
160     if (has_tags) {
161       while (id == next_id) {
162         addItem(kv, (*r_itr)[1].c_str(), (*r_itr)[2].c_str(), 0);
163         next();
164       }
165     }
166     return has_tags;
167   }
168
169 private:
170
171   string query(const char *table) {
172     ostringstream ostr;
173     ostr << "select id, k, v from " << table << " order by id, k";
174     return ostr.str();
175   }
176 };
177
178 /**
179  * gets way nodes out of the stream and returns them in the 
180  * keyval struct that the output functions are expecting.
181  */
182 struct way_node_stream 
183   : public table_stream<int> {
184
185   way_node_stream(pqxx::work &x) 
186     : table_stream<int>(x, "select id, node_id from current_way_nodes "
187                         "ORDER BY id, sequence_id", "fetch_way_nodes") {
188   }
189
190   bool get(int id, struct keyval *kv) {
191     bool has_nodes = find_id(id);
192     resetList(kv);
193     if (has_nodes) {
194       while (id == next_id) {
195         addItem(kv, "", (*r_itr)[1].c_str(), 0);
196         next();
197       }
198     }
199     return has_nodes;
200   }
201 };
202
203 /**
204  * gets relation members out of the stream and returns them in the 
205  * pair of keyval structs that the output functions are expecting.
206  */
207 struct relation_member_stream 
208   : public table_stream<int> {
209
210   relation_member_stream(pqxx::work &x) 
211     : table_stream<int>(x, "select id, member_id, member_type, "
212                         "lower(member_role) from current_relation_members "
213                         "ORDER BY id, sequence_id", "fetch_relation_members") {
214   }
215
216   bool get(int id, struct keyval *members, struct keyval *roles) {
217     bool has_members = find_id(id);
218     resetList(members);
219     resetList(roles);
220     if (has_members) {
221       while (id == next_id) {
222         addItem(members, (*r_itr)[2].c_str(), (*r_itr)[1].c_str(), 0);
223         addItem(roles, "", (*r_itr)[3].c_str(), 0);
224         next();
225       }
226     }
227     return has_members;
228   }
229 };
230
231 void changesets(pqxx::work &xaction) {
232   // work around reformDate returning pointer to static storage :-(
233   static char created_at[64];
234   struct keyval tags;
235   initList(&tags);
236
237   ostringstream query;
238   query << "select id, user_id, created_at, closed_at, num_changes, "
239         << "min_lat, max_lat, min_lon, max_lon, closed_at > now() as open "
240         << "from changesets c order by id";
241
242   icursorstream changesets(xaction, query.str(), "fetch_changesets", 1000);
243   tag_stream tagstream(xaction, "changeset_tags");
244
245   const icursor_iterator ic_end;
246   for (icursor_iterator ic_itr(changesets); ic_itr != ic_end; ++ic_itr) {
247     const pqxx::result &res = *ic_itr;
248     for (pqxx::result::const_iterator itr = res.begin();
249          itr != res.end(); ++itr) {
250       const pqxx::result::tuple &row = *itr;
251       const int id = row[0].as<int>();
252       const int num_changes = row[4].as<int>();
253       const bool null_bbox = row[5].is_null() || row[6].is_null() || row[7].is_null() || row[8].is_null();
254
255       if (!tagstream.get(id, &tags)) {
256         resetList(&tags);
257       }
258
259       // work around reformDate returning pointer to static data
260       strncpy(created_at, reformDate(row[2].c_str()), 64);
261
262       osm_changeset(id,
263                     lookup_user(row[1].c_str()), // user_id
264                     created_at, // created_at
265                     reformDate(row[3].c_str()), // closed_at
266                     null_bbox ? 0 : 1,
267                     null_bbox ? 0 : row[5].as<int>() / SCALE, // min_lat
268                     null_bbox ? 0 : row[6].as<int>() / SCALE, // max_lat
269                     null_bbox ? 0 : row[7].as<int>() / SCALE, // min_lon
270                     null_bbox ? 0 : row[8].as<int>() / SCALE, // max_lon
271                     row[9].as<bool>() ? 1 : 0, // open
272                     &tags);
273     }
274   }
275
276   resetList(&tags);
277 }
278
279 void nodes(pqxx::work &xaction) {
280   struct keyval tags;
281   initList(&tags);
282   
283   ostringstream query;
284   query << "select n.id, n.latitude, n.longitude, n.timestamp, "
285         << "c.user_id, n.version, n.changeset_id "
286         << "from current_nodes n join changesets c on n.changeset_id=c.id "
287         << "where n.visible = true order by n.id";
288   
289   icursorstream nodes(xaction, query.str(), "fetch_nodes", 1000);
290   tag_stream tagstream(xaction, "current_node_tags");
291
292   const icursor_iterator ic_end;
293   for (icursor_iterator ic_itr(nodes); ic_itr != ic_end; ++ic_itr) {
294     const pqxx::result &res = *ic_itr;
295     for (pqxx::result::const_iterator itr = res.begin();
296          itr != res.end(); ++itr) {
297       int id, version, latitude, longitude, changeset;
298
299       if (!(*itr)[0].to<int>(id)) {
300         throw std::runtime_error("Node ID is not numeric.");
301       }
302       if (!(*itr)[1].to<int>(latitude)) {
303         throw std::runtime_error("Latitude is not numeric.");
304       }
305       if (!(*itr)[2].to<int>(longitude)) {
306         throw std::runtime_error("Longitude is not numeric.");
307       }
308       if (!(*itr)[5].to<int>(version)) {
309         throw std::runtime_error("Version is not numeric.");
310       }
311       if (!(*itr)[6].to<int>(changeset)) {
312         throw std::runtime_error("Changeset ID is not numeric.");
313       }
314
315       if (!tagstream.get(id, &tags)) {
316         resetList(&tags);
317       }
318
319       osm_node(id, latitude / SCALE, longitude / SCALE, &tags, 
320                reformDate((*itr)[3].c_str()), 
321                lookup_user((*itr)[4].c_str()), 
322                version, changeset);
323     }
324   }
325
326   resetList(&tags);
327 }
328
329 void ways(pqxx::work &xaction) {
330   struct keyval tags, nodes;
331   initList(&tags);
332   initList(&nodes);
333   
334   ostringstream query;
335   query << "select w.id, w.timestamp, cs.user_id, w.version, "
336         << "w.changeset_id from current_ways w join changesets cs on "
337         << "w.changeset_id=cs.id where visible = true order by id";
338   
339   icursorstream ways(xaction, query.str(), "fetch_ways", 1000);
340   tag_stream tagstream(xaction, "current_way_tags");
341   way_node_stream nodestream(xaction);
342
343   const icursor_iterator ic_end;
344   for (icursor_iterator ic_itr(ways); ic_itr != ic_end; ++ic_itr) {
345     const pqxx::result &res = *ic_itr;
346     for (pqxx::result::const_iterator itr = res.begin();
347          itr != res.end(); ++itr) {
348       int id, version, changeset;
349
350       if (!(*itr)[0].to<int>(id)) {
351         throw std::runtime_error("Node ID is not numeric.");
352       }
353       if (!(*itr)[3].to<int>(version)) {
354         throw std::runtime_error("Version is not numeric.");
355       }
356       if (!(*itr)[4].to<int>(changeset)) {
357         throw std::runtime_error("Changeset ID is not numeric.");
358       }
359
360       tagstream.get(id, &tags);
361       nodestream.get(id, &nodes);
362
363       osm_way(id, &nodes, &tags, 
364               reformDate((*itr)[1].c_str()), 
365               lookup_user((*itr)[2].c_str()), 
366               version, changeset);
367     }
368   }
369
370   resetList(&tags);
371   resetList(&nodes);
372 }
373
374 void relations(pqxx::work &xaction) {
375   struct keyval tags, members, roles;
376   initList(&tags);
377   initList(&members);
378   initList(&roles);
379   
380   ostringstream query;
381   query << "select r.id, r.timestamp, c.user_id, r.version, r.changeset_id "
382         << "from current_relations r join changesets c on "
383         << "r.changeset_id=c.id where visible = true ORDER BY id";
384   
385   icursorstream relations(xaction, query.str(), "fetch_relations", 1000);
386   tag_stream tagstream(xaction, "current_relation_tags");
387   relation_member_stream memstream(xaction);
388
389   const icursor_iterator ic_end;
390   for (icursor_iterator ic_itr(relations); ic_itr != ic_end; ++ic_itr) {
391     const pqxx::result &res = *ic_itr;
392     for (pqxx::result::const_iterator itr = res.begin();
393          itr != res.end(); ++itr) {
394       int id, version, changeset;
395
396       if (!(*itr)[0].to<int>(id)) {
397         throw std::runtime_error("Relation ID is not numeric.");
398       }
399       if (!(*itr)[3].to<int>(version)) {
400         throw std::runtime_error("Version is not numeric.");
401       }
402       if (!(*itr)[4].to<int>(changeset)) {
403         throw std::runtime_error("Changeset ID is not numeric.");
404       }
405
406       tagstream.get(id, &tags);
407       memstream.get(id, &members, &roles);
408
409       osm_relation(id, &members, &roles, &tags, 
410                    reformDate((*itr)[1].c_str()), 
411                    lookup_user((*itr)[2].c_str()), 
412                    version, changeset);
413     }
414   }
415
416   resetList(&tags);
417   resetList(&members);
418   resetList(&roles);
419 }
420
421 int main(int argc, char **argv)
422 {
423   int i;
424   int want_nodes, want_ways, want_relations, want_changesets;
425     
426   if (argc == 1)
427     // note: changesets not enabled by default, yet. client software will need
428     // time to prepare. suggestion is to have them as a separate download.
429     want_nodes = want_ways = want_relations = 1;
430   else {
431     want_nodes = want_ways = want_relations = want_changesets = 0;
432     for(i=1; i<argc; i++) {
433       if (!strcmp(argv[i], "--nodes"))
434         want_nodes = 1;
435       else if (!strcmp(argv[i], "--ways"))
436         want_ways = 1;
437       else if (!strcmp(argv[i], "--relations"))
438         want_relations = 1;
439       else if (!strcmp(argv[i], "--changesets"))
440         want_changesets = 1;
441       else {
442         fprintf(stderr, "Usage error:\n");
443         fprintf(stderr, "\t%s [--nodes] [--ways] [--relations] [--changesets]\n\n", argv[0]);
444         fprintf(stderr, "Writes OSM planet dump to STDOUT. If no flags are specified then nodes, ways and\n");
445         fprintf(stderr, "relations are output. If one or more flags are set then only the requested data\n");
446         fprintf(stderr, "is dumped.\n");
447         exit(2);
448       }
449     }
450   }
451
452   char *connection_params = NULL;
453   if ((connection_params = getenv("CONNECTION_PARAMS")) == NULL) {
454     fprintf(stderr, "ERROR: you must set the $CONNECTION_PARAMS environment "
455             "variable to the appropriate connection parameters.\n");
456     exit(2);
457   }
458
459   try {
460     // open database connection
461     pqxx::connection conn(connection_params);
462     pqxx::work xaction(conn);
463
464     fetch_users(xaction);
465
466     osm_header();
467     
468     if (want_changesets)
469       changesets(xaction);
470
471     if (want_nodes)
472       nodes(xaction);
473
474     if (want_ways)
475       ways(xaction);
476     
477     if (want_relations)
478       relations(xaction);
479
480     osm_footer();
481     
482     free_users();
483
484     // rollback happens automatically here
485   } catch (const std::exception &e) {
486     fprintf(stderr, "ERROR: %s\n", e.what());
487     return 1;
488
489   } catch (...) {
490     fprintf(stderr, "UNKNOWN ERROR!\n");
491     return 1;
492   }
493
494   return 0;
495 }