lowercase relation member types on output.
[planetdump.git] / planet06_pg.cpp
1 #ifndef _GNU_SOURCE
2 #define _GNU_SOURCE
3 #endif
4
5 #undef USE_ICONV
6
7 #include <stdio.h>
8 #include <stdlib.h>
9 #include <string.h>
10 #include <stdarg.h>
11 #include <sys/types.h>
12 #include <sys/stat.h>
13 #include <sys/socket.h>
14 #include <sys/un.h>
15 #include <unistd.h>
16 #include <fcntl.h>
17 #include <errno.h>
18 #include <limits.h>
19 #include <time.h>
20 #include <utime.h>
21
22 #include <pqxx/pqxx>
23 #include <stdexcept>
24 #include <signal.h>
25 #include <stdarg.h>
26 #include <assert.h>
27 #include <cstdlib>
28
29 #include "users.hpp"
30 extern "C" {
31 #include "keyvals.h"
32 #include "output_osm.h"
33 }
34
35 #define SCALE 10000000.0
36
37 using namespace std;
38 using namespace pqxx;
39
40 const char *reformDate(const char *str)
41 {
42     static char out[64], prev[64]; // Not thread safe
43
44     time_t tmp;
45     struct tm tm;
46
47     // Re-use the previous answer if we asked to convert the same timestamp twice
48     // This accelerates bulk uploaded data where sequential features often have the same timestamp
49     if (!strncmp(prev, str, sizeof(prev)))
50         return out;
51     else
52         strncpy(prev, str, sizeof(prev));
53
54     // 2007-05-20 13:51:35
55     bzero(&tm, sizeof(tm));
56     int n = sscanf(str, "%d-%d-%d %d:%d:%d",
57                    &tm.tm_year, &tm.tm_mon, &tm.tm_mday, &tm.tm_hour, &tm.tm_min, &tm.tm_sec);
58
59     if (n !=6)
60         printf("failed to parse date string, got(%d): %s\n", n, str);
61
62     tm.tm_year -= 1900;
63     tm.tm_mon  -= 1;
64     tm.tm_isdst = -1;
65
66     // Rails stores the timestamps in the DB using UK localtime (ugh), convert to UTC
67     tmp = mktime(&tm);
68     gmtime_r(&tmp, &tm);
69
70     //2007-07-10T11:32:32Z
71     snprintf(out, sizeof(out), "%d-%02d-%02dT%02d:%02d:%02dZ",
72              tm.tm_year+1900, tm.tm_mon+1, tm.tm_mday, tm.tm_hour, tm.tm_min, tm.tm_sec);
73
74     return out;
75 }
76
77 /**
78  * Uses a cursor through a particular table to give the appearance 
79  * of an array of elements. This only works if the IDs are accessed 
80  * in a strictly ascending order and its only efficient if most of 
81  * the elements are accessed.
82  */
83 template <typename id_type>
84 class table_stream {
85 public:
86
87   table_stream(pqxx::work &x, string query, string name) 
88     : stream(x, query, name, 1000),
89       ic_itr(stream), ic_end(), next_id(0) {
90     // initialise the iterators to the beginning of the table
91     // and read the first element from it.
92     r_itr = ic_itr->begin();
93
94     if (r_itr != ic_itr->end()) {
95       if (!(*r_itr)[0].to<id_type>(next_id)) {
96         throw std::runtime_error("ID is not numeric.");
97       }
98     }
99   }
100
101   bool find_id(id_type id) {
102     while (id > next_id) {
103       next();
104     }
105
106     return id == next_id;
107   }
108
109 private:
110
111   icursorstream stream;
112   icursor_iterator ic_itr;
113   const icursor_iterator ic_end;
114
115 protected:
116
117   void next() {
118     ++r_itr;
119     if (r_itr == ic_itr->end()) {
120       // we're at the end of this chunk, so grab another chunk from
121       // the cursor
122       ++ic_itr;
123       if (ic_itr == ic_end) {
124         // reached the end of the table, so flag this to the other
125         // routines by setting next_id to an invalid value
126         next_id = std::numeric_limits<id_type>::max();
127       } else {
128         r_itr = ic_itr->begin();
129       }
130     }
131
132     // get the next ID unless the end of the table has been hit
133     if (next_id < std::numeric_limits<id_type>::max()) {
134       if (!(*r_itr)[0].to<id_type>(next_id)) {
135         throw std::runtime_error("Next ID is non-numeric.");
136       }
137     }
138   }
139
140   result::const_iterator r_itr;
141   id_type next_id;
142 };
143
144 /**
145  * extends the table stream to provide utility methods for getting
146  * tags out of the table.
147  */
148 struct tag_stream 
149   : public table_stream<int> {
150
151   tag_stream(pqxx::work &x, const char *table) 
152     : table_stream<int>(x, query(table), "fetch_tags") {
153   }
154
155   bool get(int id, struct keyval *kv) {
156     bool has_tags = find_id(id);
157     resetList(kv);
158     if (has_tags) {
159       while (id == next_id) {
160         addItem(kv, (*r_itr)[1].c_str(), (*r_itr)[2].c_str(), 0);
161         next();
162       }
163     }
164     return has_tags;
165   }
166
167 private:
168
169   string query(const char *table) {
170     ostringstream ostr;
171     ostr << "select id, k, v from " << table << " order by id, k";
172     return ostr.str();
173   }
174 };
175
176 /**
177  * gets way nodes out of the stream and returns them in the 
178  * keyval struct that the output functions are expecting.
179  */
180 struct way_node_stream 
181   : public table_stream<int> {
182
183   way_node_stream(pqxx::work &x) 
184     : table_stream<int>(x, "select id, node_id from current_way_nodes "
185                         "ORDER BY id, sequence_id", "fetch_way_nodes") {
186   }
187
188   bool get(int id, struct keyval *kv) {
189     bool has_nodes = find_id(id);
190     resetList(kv);
191     if (has_nodes) {
192       while (id == next_id) {
193         addItem(kv, "", (*r_itr)[1].c_str(), 0);
194         next();
195       }
196     }
197     return has_nodes;
198   }
199 };
200
201 /**
202  * gets relation members out of the stream and returns them in the 
203  * pair of keyval structs that the output functions are expecting.
204  */
205 struct relation_member_stream 
206   : public table_stream<int> {
207
208   relation_member_stream(pqxx::work &x) 
209     : table_stream<int>(x, "select id, member_id, member_type, "
210                         "lower(member_role) from current_relation_members "
211                         "ORDER BY id, sequence_id", "fetch_relation_members") {
212   }
213
214   bool get(int id, struct keyval *members, struct keyval *roles) {
215     bool has_members = find_id(id);
216     resetList(members);
217     resetList(roles);
218     if (has_members) {
219       while (id == next_id) {
220         addItem(members, (*r_itr)[2].c_str(), (*r_itr)[1].c_str(), 0);
221         addItem(roles, "", (*r_itr)[3].c_str(), 0);
222         next();
223       }
224     }
225     return has_members;
226   }
227 };
228
229 void nodes(pqxx::work &xaction) {
230   struct keyval tags;
231   initList(&tags);
232   
233   ostringstream query;
234   query << "select n.id, n.latitude, n.longitude, n.timestamp, "
235         << "c.user_id, n.version, n.changeset_id "
236         << "from current_nodes n join changesets c on n.changeset_id=c.id "
237         << "where n.visible = true order by n.id";
238   
239   icursorstream nodes(xaction, query.str(), "fetch_nodes", 1000);
240   tag_stream tagstream(xaction, "current_node_tags");
241
242   const icursor_iterator ic_end;
243   for (icursor_iterator ic_itr(nodes); ic_itr != ic_end; ++ic_itr) {
244     const pqxx::result &res = *ic_itr;
245     for (pqxx::result::const_iterator itr = res.begin();
246          itr != res.end(); ++itr) {
247       int id, version, latitude, longitude, changeset;
248
249       if (!(*itr)[0].to<int>(id)) {
250         throw std::runtime_error("Node ID is not numeric.");
251       }
252       if (!(*itr)[1].to<int>(latitude)) {
253         throw std::runtime_error("Latitude is not numeric.");
254       }
255       if (!(*itr)[2].to<int>(longitude)) {
256         throw std::runtime_error("Longitude is not numeric.");
257       }
258       if (!(*itr)[5].to<int>(version)) {
259         throw std::runtime_error("Version is not numeric.");
260       }
261       if (!(*itr)[6].to<int>(changeset)) {
262         throw std::runtime_error("Changeset ID is not numeric.");
263       }
264
265       if (!tagstream.get(id, &tags)) {
266         resetList(&tags);
267       }
268
269       osm_node(id, latitude / SCALE, longitude / SCALE, &tags, 
270                reformDate((*itr)[3].c_str()), 
271                lookup_user((*itr)[4].c_str()), 
272                version, changeset);
273     }
274   }
275
276   resetList(&tags);
277 }
278
279 void ways(pqxx::work &xaction) {
280   struct keyval tags, nodes;
281   initList(&tags);
282   initList(&nodes);
283   
284   ostringstream query;
285   query << "select w.id, w.timestamp, cs.user_id, w.version, "
286         << "w.changeset_id from current_ways w join changesets cs on "
287         << "w.changeset_id=cs.id where visible = true order by id";
288   
289   icursorstream ways(xaction, query.str(), "fetch_ways", 1000);
290   tag_stream tagstream(xaction, "current_way_tags");
291   way_node_stream nodestream(xaction);
292
293   const icursor_iterator ic_end;
294   for (icursor_iterator ic_itr(ways); ic_itr != ic_end; ++ic_itr) {
295     const pqxx::result &res = *ic_itr;
296     for (pqxx::result::const_iterator itr = res.begin();
297          itr != res.end(); ++itr) {
298       int id, version, changeset;
299
300       if (!(*itr)[0].to<int>(id)) {
301         throw std::runtime_error("Node ID is not numeric.");
302       }
303       if (!(*itr)[3].to<int>(version)) {
304         throw std::runtime_error("Version is not numeric.");
305       }
306       if (!(*itr)[4].to<int>(changeset)) {
307         throw std::runtime_error("Changeset ID is not numeric.");
308       }
309
310       tagstream.get(id, &tags);
311       nodestream.get(id, &nodes);
312
313       osm_way(id, &nodes, &tags, 
314               reformDate((*itr)[1].c_str()), 
315               lookup_user((*itr)[2].c_str()), 
316               version, changeset);
317     }
318   }
319
320   resetList(&tags);
321   resetList(&nodes);
322 }
323
324 void relations(pqxx::work &xaction) {
325   struct keyval tags, members, roles;
326   initList(&tags);
327   initList(&members);
328   initList(&roles);
329   
330   ostringstream query;
331   query << "select r.id, r.timestamp, c.user_id, r.version, r.changeset_id "
332         << "from current_relations r join changesets c on "
333         << "r.changeset_id=c.id where visible = true ORDER BY id";
334   
335   icursorstream relations(xaction, query.str(), "fetch_relations", 1000);
336   tag_stream tagstream(xaction, "current_relation_tags");
337   relation_member_stream memstream(xaction);
338
339   const icursor_iterator ic_end;
340   for (icursor_iterator ic_itr(relations); ic_itr != ic_end; ++ic_itr) {
341     const pqxx::result &res = *ic_itr;
342     for (pqxx::result::const_iterator itr = res.begin();
343          itr != res.end(); ++itr) {
344       int id, version, changeset;
345
346       if (!(*itr)[0].to<int>(id)) {
347         throw std::runtime_error("Relation ID is not numeric.");
348       }
349       if (!(*itr)[3].to<int>(version)) {
350         throw std::runtime_error("Version is not numeric.");
351       }
352       if (!(*itr)[4].to<int>(changeset)) {
353         throw std::runtime_error("Changeset ID is not numeric.");
354       }
355
356       tagstream.get(id, &tags);
357       memstream.get(id, &members, &roles);
358
359       osm_relation(id, &members, &roles, &tags, 
360                    reformDate((*itr)[1].c_str()), 
361                    lookup_user((*itr)[2].c_str()), 
362                    version, changeset);
363     }
364   }
365
366   resetList(&tags);
367   resetList(&members);
368   resetList(&roles);
369 }
370
371 int main(int argc, char **argv)
372 {
373   int i;
374   int want_nodes, want_ways, want_relations;
375     
376   if (argc == 1)
377     want_nodes = want_ways = want_relations = 1;
378   else {
379     want_nodes = want_ways = want_relations = 0;
380     for(i=1; i<argc; i++) {
381       if (!strcmp(argv[i], "--nodes"))
382         want_nodes = 1;
383       else if (!strcmp(argv[i], "--ways"))
384         want_ways = 1;
385       else if (!strcmp(argv[i], "--relations"))
386         want_relations = 1;
387       else {
388         fprintf(stderr, "Usage error:\n");
389         fprintf(stderr, "\t%s [--nodes] [--ways] [--relations]\n\n", argv[0]);
390         fprintf(stderr, "Writes OSM planet dump to STDOUT. If no flags are specified then all data is output.\n");
391         fprintf(stderr, "If one or more flags are set then only the requested data is dumped.\n");
392         exit(2);
393       }
394     }
395   }
396
397   char *connection_params = NULL;
398   if ((connection_params = getenv("CONNECTION_PARAMS")) == NULL) {
399     fprintf(stderr, "ERROR: you must set the $CONNECTION_PARAMS environment "
400             "variable to the appropriate connection parameters.\n");
401     exit(2);
402   }
403
404   try {
405     // open database connection
406     pqxx::connection conn(connection_params);
407     pqxx::work xaction(conn);
408
409     fetch_users(xaction);
410
411     osm_header();
412     
413     if (want_nodes)
414       nodes(xaction);
415
416     if (want_ways)
417       ways(xaction);
418     
419     if (want_relations)
420       relations(xaction);
421
422     osm_footer();
423     
424     free_users();
425
426     // rollback happens automatically here
427   } catch (const std::exception &e) {
428     fprintf(stderr, "ERROR: %s\n", e.what());
429     return 1;
430
431   } catch (...) {
432     fprintf(stderr, "UNKNOWN ERROR!\n");
433     return 1;
434   }
435
436   return 0;
437 }