Fix compiler warnings
[planetdump.git] / planet06_pg.cpp
1 #ifndef _GNU_SOURCE
2 #define _GNU_SOURCE
3 #endif
4
5 #undef USE_ICONV
6
7 #include <stdio.h>
8 #include <stdlib.h>
9 #include <string.h>
10 #include <stdarg.h>
11 #include <sys/types.h>
12 #include <sys/stat.h>
13 #include <sys/socket.h>
14 #include <sys/un.h>
15 #include <unistd.h>
16 #include <fcntl.h>
17 #include <errno.h>
18 #include <limits.h>
19 #include <time.h>
20 #include <utime.h>
21
22 #include <pqxx/pqxx>
23 #include <stdexcept>
24 #include <signal.h>
25 #include <stdarg.h>
26 #include <assert.h>
27 #include <cstdlib>
28 #include <cstdint>
29
30 #include "users.hpp"
31 extern "C" {
32 #include "keyvals.h"
33 #include "output_osm.h"
34 }
35
36 #define SCALE 10000000.0
37
38 using namespace std;
39 using namespace pqxx;
40
41 const char *reformDate(const char *str)
42 {
43     static char out[64], prev[64]; // Not thread safe
44
45     struct tm tm;
46
47     // Re-use the previous answer if we asked to convert the same timestamp twice
48     // This accelerates bulk uploaded data where sequential features often have the same timestamp
49     if (!strncmp(prev, str, sizeof(prev)))
50         return out;
51     else
52         strncpy(prev, str, sizeof(prev));
53
54     // 2007-05-20 13:51:35
55     bzero(&tm, sizeof(tm));
56     int n = sscanf(str, "%d-%d-%d %d:%d:%d",
57                    &tm.tm_year, &tm.tm_mon, &tm.tm_mday, &tm.tm_hour, &tm.tm_min, &tm.tm_sec);
58
59     if (n !=6)
60         printf("failed to parse date string, got(%d): %s\n", n, str);
61
62     tm.tm_year -= 1900;
63     tm.tm_mon  -= 1;
64     tm.tm_isdst = -1;
65
66     //2007-07-10T11:32:32Z
67     snprintf(out, sizeof(out), "%d-%02d-%02dT%02d:%02d:%02dZ",
68              tm.tm_year+1900, tm.tm_mon+1, tm.tm_mday, tm.tm_hour, tm.tm_min, tm.tm_sec);
69
70     return out;
71 }
72
73 /**
74  * Uses a cursor through a particular table to give the appearance 
75  * of an array of elements. This only works if the IDs are accessed 
76  * in a strictly ascending order and its only efficient if most of 
77  * the elements are accessed.
78  */
79 template <typename id_type>
80 class table_stream {
81 public:
82
83   table_stream(pqxx::work &x, string query, string name) 
84     : stream(x, query, name, 10000),
85       ic_itr(stream), ic_end(), next_id(0) {
86     // initialise the iterators to the beginning of the table
87     // and read the first element from it.
88     r_itr = ic_itr->begin();
89
90     if (r_itr != ic_itr->end()) {
91       if (!(*r_itr)[0].to<id_type>(next_id)) {
92         throw std::runtime_error("ID is not numeric.");
93       }
94     } else {
95       next_id = std::numeric_limits<id_type>::max();
96     }
97   }
98
99   bool find_id(id_type id) {
100     while (id > next_id) {
101       next();
102     }
103
104     return id == next_id;
105   }
106
107 private:
108
109   icursorstream stream;
110   icursor_iterator ic_itr;
111   const icursor_iterator ic_end;
112
113 protected:
114
115   void next() {
116     ++r_itr;
117     if (r_itr == ic_itr->end()) {
118       // we're at the end of this chunk, so grab another chunk from
119       // the cursor
120       ++ic_itr;
121       if (ic_itr == ic_end) {
122         // reached the end of the table, so flag this to the other
123         // routines by setting next_id to an invalid value
124         next_id = std::numeric_limits<id_type>::max();
125       } else {
126         r_itr = ic_itr->begin();
127       }
128     }
129
130     // get the next ID unless the end of the table has been hit
131     if (next_id < std::numeric_limits<id_type>::max()) {
132       if (!(*r_itr)[0].to<id_type>(next_id)) {
133         throw std::runtime_error("Next ID is non-numeric.");
134       }
135     }
136   }
137
138   result::const_iterator r_itr;
139   id_type next_id;
140 };
141
142 /**
143  * extends the table stream to provide utility methods for getting
144  * tags out of the table.
145  */
146 struct tag_stream 
147   : public table_stream<int64_t> {
148
149    tag_stream(pqxx::work &x, const char *table, const char *type) 
150       : table_stream<int64_t>(x, query(table, type), "fetch_tags") {
151   }
152
153   bool get(int64_t id, struct keyval *kv) {
154     bool has_tags = find_id(id);
155     resetList(kv);
156     if (has_tags) {
157       while (id == next_id) {
158         addItem(kv, (*r_itr)[1].c_str(), (*r_itr)[2].c_str(), 0);
159         next();
160       }
161     }
162     return has_tags;
163   }
164
165 private:
166
167   string query(const char *table, const char *type) {
168     ostringstream ostr;
169     ostr << "select " << type << "_id, k, v from " << table << " order by " << type << "_id, k";
170     return ostr.str();
171   }
172 };
173
174 /**
175  * gets way nodes out of the stream and returns them in the 
176  * keyval struct that the output functions are expecting.
177  */
178 struct way_node_stream 
179   : public table_stream<int64_t> {
180
181   way_node_stream(pqxx::work &x) 
182     : table_stream<int64_t>(x, "select way_id, node_id from current_way_nodes "
183                             "ORDER BY way_id, sequence_id", "fetch_way_nodes") {
184   }
185
186   bool get(int64_t id, struct keyval *kv) {
187     bool has_nodes = find_id(id);
188     resetList(kv);
189     if (has_nodes) {
190       while (id == next_id) {
191         addItem(kv, "", (*r_itr)[1].c_str(), 0);
192         next();
193       }
194     }
195     return has_nodes;
196   }
197 };
198
199 /**
200  * gets relation members out of the stream and returns them in the 
201  * pair of keyval structs that the output functions are expecting.
202  */
203 struct relation_member_stream 
204   : public table_stream<int64_t> {
205
206   relation_member_stream(pqxx::work &x) 
207     : table_stream<int64_t>(x, "select relation_id, member_id, member_type, "
208                             "member_role from current_relation_members "
209                             "ORDER BY relation_id, sequence_id", "fetch_relation_members") {
210   }
211
212   bool get(int64_t id, struct keyval *members, struct keyval *roles) {
213     bool has_members = find_id(id);
214     resetList(members);
215     resetList(roles);
216     if (has_members) {
217       while (id == next_id) {
218         addItem(members, (*r_itr)[2].c_str(), (*r_itr)[1].c_str(), 0);
219         addItem(roles, "", (*r_itr)[3].c_str(), 0);
220         next();
221       }
222     }
223     return has_members;
224   }
225 };
226
227 void changesets(pqxx::work &xaction) {
228   // work around reformDate returning pointer to static storage :-(
229   static char created_at[64];
230   struct keyval tags;
231   initList(&tags);
232
233   ostringstream query;
234   query << "select id, user_id, created_at, closed_at, num_changes, "
235         << "min_lat, max_lat, min_lon, max_lon, closed_at > now() at time zone 'utc' as open "
236         << "from changesets c order by id";
237
238   icursorstream changesets(xaction, query.str(), "fetch_changesets", 1000);
239   tag_stream tagstream(xaction, "changeset_tags", "changeset");
240
241   const icursor_iterator ic_end;
242   for (icursor_iterator ic_itr(changesets); ic_itr != ic_end; ++ic_itr) {
243     const pqxx::result &res = *ic_itr;
244     for (pqxx::result::const_iterator itr = res.begin();
245          itr != res.end(); ++itr) {
246       const pqxx::result::tuple &row = *itr;
247       const int64_t id = row[0].as<int64_t>();
248       const bool null_bbox = row[5].is_null() || row[6].is_null() || row[7].is_null() || row[8].is_null();
249
250       if (!tagstream.get(id, &tags)) {
251         resetList(&tags);
252       }
253
254       // work around reformDate returning pointer to static data
255       strncpy(created_at, reformDate(row[2].c_str()), 64);
256
257       osm_changeset(id,
258                     lookup_user(row[1].c_str()), // user_id
259                     created_at, // created_at
260                     reformDate(row[3].c_str()), // closed_at
261                     row[4].as<int>(), // num_changes
262                     null_bbox ? 0 : 1,
263                     null_bbox ? 0 : row[5].as<int>() / SCALE, // min_lat
264                     null_bbox ? 0 : row[6].as<int>() / SCALE, // max_lat
265                     null_bbox ? 0 : row[7].as<int>() / SCALE, // min_lon
266                     null_bbox ? 0 : row[8].as<int>() / SCALE, // max_lon
267                     row[9].as<bool>() ? 1 : 0, // open
268                     &tags);
269     }
270   }
271
272   resetList(&tags);
273 }
274
275 void nodes(pqxx::work &xaction) {
276   struct keyval tags;
277   initList(&tags);
278   
279   ostringstream query;
280   query << "select n.id, n.latitude, n.longitude, n.timestamp, "
281         << "c.user_id, n.version, n.changeset_id "
282         << "from current_nodes n join changesets c on n.changeset_id=c.id "
283         << "where n.visible = true order by n.id";
284   
285   icursorstream nodes(xaction, query.str(), "fetch_nodes", 1000);
286   tag_stream tagstream(xaction, "current_node_tags", "node");
287
288   const icursor_iterator ic_end;
289   for (icursor_iterator ic_itr(nodes); ic_itr != ic_end; ++ic_itr) {
290     const pqxx::result &res = *ic_itr;
291     for (pqxx::result::const_iterator itr = res.begin();
292          itr != res.end(); ++itr) {
293       int64_t id;
294       int version, latitude, longitude, changeset;
295
296       if (!(*itr)[0].to<int64_t>(id)) {
297         throw std::runtime_error("Node ID is not numeric.");
298       }
299       if (!(*itr)[1].to<int>(latitude)) {
300         throw std::runtime_error("Latitude is not numeric.");
301       }
302       if (!(*itr)[2].to<int>(longitude)) {
303         throw std::runtime_error("Longitude is not numeric.");
304       }
305       if (!(*itr)[5].to<int>(version)) {
306         throw std::runtime_error("Version is not numeric.");
307       }
308       if (!(*itr)[6].to<int>(changeset)) {
309         throw std::runtime_error("Changeset ID is not numeric.");
310       }
311
312       if (!tagstream.get(id, &tags)) {
313         resetList(&tags);
314       }
315
316       osm_node(id, latitude / SCALE, longitude / SCALE, &tags, 
317                reformDate((*itr)[3].c_str()), 
318                lookup_user((*itr)[4].c_str()), 
319                version, changeset);
320     }
321   }
322
323   resetList(&tags);
324 }
325
326 void ways(pqxx::work &xaction) {
327   struct keyval tags, nodes;
328   initList(&tags);
329   initList(&nodes);
330   
331   ostringstream query;
332   query << "select w.id, w.timestamp, cs.user_id, w.version, "
333         << "w.changeset_id from current_ways w join changesets cs on "
334         << "w.changeset_id=cs.id where visible = true order by id";
335   
336   icursorstream ways(xaction, query.str(), "fetch_ways", 1000);
337   tag_stream tagstream(xaction, "current_way_tags", "way");
338   way_node_stream nodestream(xaction);
339
340   const icursor_iterator ic_end;
341   for (icursor_iterator ic_itr(ways); ic_itr != ic_end; ++ic_itr) {
342     const pqxx::result &res = *ic_itr;
343     for (pqxx::result::const_iterator itr = res.begin();
344          itr != res.end(); ++itr) {
345       int64_t id;
346       int version, changeset;
347
348       if (!(*itr)[0].to<int64_t>(id)) {
349         throw std::runtime_error("Node ID is not numeric.");
350       }
351       if (!(*itr)[3].to<int>(version)) {
352         throw std::runtime_error("Version is not numeric.");
353       }
354       if (!(*itr)[4].to<int>(changeset)) {
355         throw std::runtime_error("Changeset ID is not numeric.");
356       }
357
358       tagstream.get(id, &tags);
359       nodestream.get(id, &nodes);
360
361       osm_way(id, &nodes, &tags, 
362               reformDate((*itr)[1].c_str()), 
363               lookup_user((*itr)[2].c_str()), 
364               version, changeset);
365     }
366   }
367
368   resetList(&tags);
369   resetList(&nodes);
370 }
371
372 void relations(pqxx::work &xaction) {
373   struct keyval tags, members, roles;
374   initList(&tags);
375   initList(&members);
376   initList(&roles);
377   
378   ostringstream query;
379   query << "select r.id, r.timestamp, c.user_id, r.version, r.changeset_id "
380         << "from current_relations r join changesets c on "
381         << "r.changeset_id=c.id where visible = true ORDER BY id";
382   
383   icursorstream relations(xaction, query.str(), "fetch_relations", 1000);
384   tag_stream tagstream(xaction, "current_relation_tags", "relation");
385   relation_member_stream memstream(xaction);
386
387   const icursor_iterator ic_end;
388   for (icursor_iterator ic_itr(relations); ic_itr != ic_end; ++ic_itr) {
389     const pqxx::result &res = *ic_itr;
390     for (pqxx::result::const_iterator itr = res.begin();
391          itr != res.end(); ++itr) {
392       int64_t id;
393       int version, changeset;
394
395       if (!(*itr)[0].to<int64_t>(id)) {
396         throw std::runtime_error("Relation ID is not numeric.");
397       }
398       if (!(*itr)[3].to<int>(version)) {
399         throw std::runtime_error("Version is not numeric.");
400       }
401       if (!(*itr)[4].to<int>(changeset)) {
402         throw std::runtime_error("Changeset ID is not numeric.");
403       }
404
405       tagstream.get(id, &tags);
406       memstream.get(id, &members, &roles);
407
408       osm_relation(id, &members, &roles, &tags, 
409                    reformDate((*itr)[1].c_str()), 
410                    lookup_user((*itr)[2].c_str()), 
411                    version, changeset);
412     }
413   }
414
415   resetList(&tags);
416   resetList(&members);
417   resetList(&roles);
418 }
419
420 int main(int argc, char **argv)
421 {
422   int i;
423   int want_nodes, want_ways, want_relations, want_changesets;
424     
425   if (argc == 1)
426     // note: changesets not enabled by default, yet. client software will need
427     // time to prepare. suggestion is to have them as a separate download.
428     want_nodes = want_ways = want_relations = 1;
429   else {
430     want_nodes = want_ways = want_relations = want_changesets = 0;
431     for(i=1; i<argc; i++) {
432       if (!strcmp(argv[i], "--nodes"))
433         want_nodes = 1;
434       else if (!strcmp(argv[i], "--ways"))
435         want_ways = 1;
436       else if (!strcmp(argv[i], "--relations"))
437         want_relations = 1;
438       else if (!strcmp(argv[i], "--changesets"))
439         want_changesets = 1;
440       else {
441         fprintf(stderr, "Usage error:\n");
442         fprintf(stderr, "\t%s [--nodes] [--ways] [--relations] [--changesets]\n\n", argv[0]);
443         fprintf(stderr, "Writes OSM planet dump to STDOUT. If no flags are specified then nodes, ways and\n");
444         fprintf(stderr, "relations are output. If one or more flags are set then only the requested data\n");
445         fprintf(stderr, "is dumped.\n");
446         exit(2);
447       }
448     }
449   }
450
451   char *connection_params = NULL;
452   if ((connection_params = getenv("CONNECTION_PARAMS")) == NULL) {
453     fprintf(stderr, "ERROR: you must set the $CONNECTION_PARAMS environment "
454             "variable to the appropriate connection parameters.\n");
455     exit(2);
456   }
457
458   try {
459     // open database connection
460     pqxx::connection conn(connection_params);
461     pqxx::work xaction(conn);
462     xaction.exec("set transaction isolation level serializable read only");
463
464     fetch_users(xaction);
465
466     osm_header();
467     
468     if (want_changesets)
469       changesets(xaction);
470
471     if (want_nodes)
472       nodes(xaction);
473
474     if (want_ways)
475       ways(xaction);
476     
477     if (want_relations)
478       relations(xaction);
479
480     osm_footer();
481     
482     free_users();
483
484     // rollback happens automatically here
485   } catch (const std::exception &e) {
486     fprintf(stderr, "ERROR: %s\n", e.what());
487     return 1;
488
489   } catch (...) {
490     fprintf(stderr, "UNKNOWN ERROR!\n");
491     return 1;
492   }
493
494   return 0;
495 }