Merge remote-tracking branch 'zere/master'
[planetdump.git] / planet06_pg.cpp
1 #ifndef _GNU_SOURCE
2 #define _GNU_SOURCE
3 #endif
4
5 #undef USE_ICONV
6
7 #include <stdio.h>
8 #include <stdlib.h>
9 #include <string.h>
10 #include <stdarg.h>
11 #include <sys/types.h>
12 #include <sys/stat.h>
13 #include <sys/socket.h>
14 #include <sys/un.h>
15 #include <unistd.h>
16 #include <fcntl.h>
17 #include <errno.h>
18 #include <limits.h>
19 #include <time.h>
20 #include <utime.h>
21
22 #include <pqxx/pqxx>
23 #include <stdexcept>
24 #include <signal.h>
25 #include <stdarg.h>
26 #include <assert.h>
27 #include <cstdlib>
28 #include <cstdint>
29
30 #include "users.hpp"
31 extern "C" {
32 #include "keyvals.h"
33 #include "output_osm.h"
34 }
35
36 #define SCALE 10000000.0
37
38 using namespace std;
39 using namespace pqxx;
40
41 const char *reformDate(const char *str)
42 {
43     static char out[64], prev[64]; // Not thread safe
44
45     struct tm tm;
46
47     // Re-use the previous answer if we asked to convert the same timestamp twice
48     // This accelerates bulk uploaded data where sequential features often have the same timestamp
49     if (!strncmp(prev, str, sizeof(prev)))
50         return out;
51     else
52         strncpy(prev, str, sizeof(prev));
53
54     // 2007-05-20 13:51:35
55     bzero(&tm, sizeof(tm));
56     int n = sscanf(str, "%d-%d-%d %d:%d:%d",
57                    &tm.tm_year, &tm.tm_mon, &tm.tm_mday, &tm.tm_hour, &tm.tm_min, &tm.tm_sec);
58
59     if (n !=6)
60         printf("failed to parse date string, got(%d): %s\n", n, str);
61
62     tm.tm_year -= 1900;
63     tm.tm_mon  -= 1;
64     tm.tm_isdst = -1;
65
66     //2007-07-10T11:32:32Z
67     snprintf(out, sizeof(out), "%d-%02d-%02dT%02d:%02d:%02dZ",
68              tm.tm_year+1900, tm.tm_mon+1, tm.tm_mday, tm.tm_hour, tm.tm_min, tm.tm_sec);
69
70     return out;
71 }
72
73 /**
74  * Uses a cursor through a particular table to give the appearance 
75  * of an array of elements. This only works if the IDs are accessed 
76  * in a strictly ascending order and its only efficient if most of 
77  * the elements are accessed.
78  */
79 template <typename id_type>
80 class table_stream {
81 public:
82
83   table_stream(pqxx::work &x, string query, string name) 
84     : stream(x, query, name, 10000),
85       ic_itr(stream), ic_end(), next_id(0) {
86     // initialise the iterators to the beginning of the table
87     // and read the first element from it.
88     r_itr = ic_itr->begin();
89
90     if (r_itr != ic_itr->end()) {
91       if (!(*r_itr)[0].to<id_type>(next_id)) {
92         throw std::runtime_error("ID is not numeric.");
93       }
94     } else {
95       next_id = std::numeric_limits<id_type>::max();
96     }
97   }
98
99   bool find_id(id_type id) {
100     while (id > next_id) {
101       next();
102     }
103
104     return id == next_id;
105   }
106
107 private:
108
109   icursorstream stream;
110   icursor_iterator ic_itr;
111   const icursor_iterator ic_end;
112
113 protected:
114
115   void next() {
116     ++r_itr;
117     if (r_itr == ic_itr->end()) {
118       // we're at the end of this chunk, so grab another chunk from
119       // the cursor
120       ++ic_itr;
121       if (ic_itr == ic_end) {
122         // reached the end of the table, so flag this to the other
123         // routines by setting next_id to an invalid value
124         next_id = std::numeric_limits<id_type>::max();
125       } else {
126         r_itr = ic_itr->begin();
127       }
128     }
129
130     // get the next ID unless the end of the table has been hit
131     if (next_id < std::numeric_limits<id_type>::max()) {
132       const id_type cur_id = next_id;
133       if (!(*r_itr)[0].to<id_type>(next_id)) {
134         throw std::runtime_error("Next ID is non-numeric.");
135       }
136       if (next_id < cur_id) {
137          ostringstream err;
138          err << "Postgres has given us a 'next' ID " << next_id 
139              << " which is less than the current ID " << cur_id
140              << ", either Postgres has failed, or there's a bug.";
141          throw std::runtime_error(err.str());
142       }
143     }
144   }
145
146   result::const_iterator r_itr;
147   id_type next_id;
148 };
149
150 /**
151  * extends the table stream to provide utility methods for getting
152  * tags out of the table.
153  */
154 struct tag_stream 
155   : public table_stream<int64_t> {
156
157    tag_stream(pqxx::work &x, const char *table, const char *type) 
158       : table_stream<int64_t>(x, query(table, type), "fetch_tags") {
159   }
160
161   bool get(int64_t id, struct keyval *kv) {
162     bool has_tags = find_id(id);
163     resetList(kv);
164     if (has_tags) {
165       while (id == next_id) {
166         addItem(kv, (*r_itr)[1].c_str(), (*r_itr)[2].c_str(), 0);
167         next();
168       }
169     }
170     return has_tags;
171   }
172
173 private:
174
175   string query(const char *table, const char *type) {
176     ostringstream ostr;
177     ostr << "select " << type << "_id, k, v from " << table << " order by " << type << "_id";
178     return ostr.str();
179   }
180 };
181
182 /**
183  * gets way nodes out of the stream and returns them in the 
184  * keyval struct that the output functions are expecting.
185  */
186 struct way_node_stream 
187   : public table_stream<int64_t> {
188
189   way_node_stream(pqxx::work &x) 
190     : table_stream<int64_t>(x, "select way_id, node_id from current_way_nodes "
191                             "ORDER BY way_id, sequence_id", "fetch_way_nodes") {
192   }
193
194   bool get(int64_t id, struct keyval *kv) {
195     bool has_nodes = find_id(id);
196     resetList(kv);
197     if (has_nodes) {
198       while (id == next_id) {
199         addItem(kv, "", (*r_itr)[1].c_str(), 0);
200         next();
201       }
202     }
203     return has_nodes;
204   }
205 };
206
207 /**
208  * gets relation members out of the stream and returns them in the 
209  * pair of keyval structs that the output functions are expecting.
210  */
211 struct relation_member_stream 
212   : public table_stream<int64_t> {
213
214   relation_member_stream(pqxx::work &x) 
215     : table_stream<int64_t>(x, "select relation_id, member_id, member_type, "
216                             "member_role from current_relation_members "
217                             "ORDER BY relation_id, sequence_id", "fetch_relation_members") {
218   }
219
220   bool get(int64_t id, struct keyval *members, struct keyval *roles) {
221     bool has_members = find_id(id);
222     resetList(members);
223     resetList(roles);
224     if (has_members) {
225       while (id == next_id) {
226         addItem(members, (*r_itr)[2].c_str(), (*r_itr)[1].c_str(), 0);
227         addItem(roles, "", (*r_itr)[3].c_str(), 0);
228         next();
229       }
230     }
231     return has_members;
232   }
233 };
234
235 void changesets(pqxx::work &xaction) {
236   // work around reformDate returning pointer to static storage :-(
237   static char created_at[64];
238   struct keyval tags;
239   initList(&tags);
240
241   ostringstream query;
242   query << "select id, user_id, created_at, closed_at, num_changes, "
243         << "min_lat, max_lat, min_lon, max_lon, closed_at > now() at time zone 'utc' as open "
244         << "from changesets c order by id";
245
246   icursorstream changesets(xaction, query.str(), "fetch_changesets", 1000);
247   tag_stream tagstream(xaction, "changeset_tags", "changeset");
248
249   const icursor_iterator ic_end;
250   int64_t prev_id = -1;
251   for (icursor_iterator ic_itr(changesets); ic_itr != ic_end; ++ic_itr) {
252     const pqxx::result &res = *ic_itr;
253     for (pqxx::result::const_iterator itr = res.begin();
254          itr != res.end(); ++itr) {
255       const pqxx::result::tuple &row = *itr;
256       const int64_t id = row[0].as<int64_t>();
257
258       if (id <= prev_id) {
259          ostringstream err;
260          err << "Postgres gave us changeset " << id << " after changeset "
261              << prev_id << ", which isn't ordered. This is a bug, either "
262              << "in Postgres or in planet dump.";
263          throw std::runtime_error(err.str());
264       }
265       prev_id = id;
266
267       const bool null_bbox = row[5].is_null() || row[6].is_null() || row[7].is_null() || row[8].is_null();
268
269       if (!tagstream.get(id, &tags)) {
270         resetList(&tags);
271       }
272
273       // work around reformDate returning pointer to static data
274       strncpy(created_at, reformDate(row[2].c_str()), 64);
275
276       osm_changeset(id,
277                     lookup_user(row[1].c_str()), // user_id
278                     created_at, // created_at
279                     reformDate(row[3].c_str()), // closed_at
280                     row[4].as<int>(), // num_changes
281                     null_bbox ? 0 : 1,
282                     null_bbox ? 0 : row[5].as<int>() / SCALE, // min_lat
283                     null_bbox ? 0 : row[6].as<int>() / SCALE, // max_lat
284                     null_bbox ? 0 : row[7].as<int>() / SCALE, // min_lon
285                     null_bbox ? 0 : row[8].as<int>() / SCALE, // max_lon
286                     row[9].as<bool>() ? 1 : 0, // open
287                     &tags);
288     }
289   }
290
291   resetList(&tags);
292 }
293
294 void nodes(pqxx::work &xaction) {
295   struct keyval tags;
296   initList(&tags);
297   
298   ostringstream query;
299   query << "select n.id, n.latitude, n.longitude, n.timestamp, "
300         << "c.user_id, n.version, n.changeset_id "
301         << "from current_nodes n join changesets c on n.changeset_id=c.id "
302         << "where n.visible = true order by n.id";
303   
304   icursorstream nodes(xaction, query.str(), "fetch_nodes", 1000);
305   tag_stream tagstream(xaction, "current_node_tags", "node");
306
307   const icursor_iterator ic_end;
308   int64_t prev_id = -1;
309   for (icursor_iterator ic_itr(nodes); ic_itr != ic_end; ++ic_itr) {
310     const pqxx::result &res = *ic_itr;
311     for (pqxx::result::const_iterator itr = res.begin();
312          itr != res.end(); ++itr) {
313       int64_t id;
314       int version, latitude, longitude, changeset;
315
316       if (!(*itr)[0].to<int64_t>(id)) {
317         throw std::runtime_error("Node ID is not numeric.");
318       }
319       if (!(*itr)[1].to<int>(latitude)) {
320         throw std::runtime_error("Latitude is not numeric.");
321       }
322       if (!(*itr)[2].to<int>(longitude)) {
323         throw std::runtime_error("Longitude is not numeric.");
324       }
325       if (!(*itr)[5].to<int>(version)) {
326         throw std::runtime_error("Version is not numeric.");
327       }
328       if (!(*itr)[6].to<int>(changeset)) {
329         throw std::runtime_error("Changeset ID is not numeric.");
330       }
331
332       if (id <= prev_id) {
333          ostringstream err;
334          err << "Postgres gave us node " << id << " after node "
335              << prev_id << ", which isn't ordered. This is a bug, either "
336              << "in Postgres or in planet dump.";
337          throw std::runtime_error(err.str());
338       }
339       prev_id = id;
340
341       if (!tagstream.get(id, &tags)) {
342         resetList(&tags);
343       }
344
345       osm_node(id, latitude / SCALE, longitude / SCALE, &tags, 
346                reformDate((*itr)[3].c_str()), 
347                lookup_user((*itr)[4].c_str()), 
348                version, changeset);
349     }
350   }
351
352   resetList(&tags);
353 }
354
355 void ways(pqxx::work &xaction) {
356   struct keyval tags, nodes;
357   initList(&tags);
358   initList(&nodes);
359   
360   ostringstream query;
361   query << "select w.id, w.timestamp, cs.user_id, w.version, "
362         << "w.changeset_id from current_ways w join changesets cs on "
363         << "w.changeset_id=cs.id where visible = true order by id";
364   
365   icursorstream ways(xaction, query.str(), "fetch_ways", 1000);
366   tag_stream tagstream(xaction, "current_way_tags", "way");
367   way_node_stream nodestream(xaction);
368
369   const icursor_iterator ic_end;
370   int64_t prev_id = -1;
371   for (icursor_iterator ic_itr(ways); ic_itr != ic_end; ++ic_itr) {
372     const pqxx::result &res = *ic_itr;
373     for (pqxx::result::const_iterator itr = res.begin();
374          itr != res.end(); ++itr) {
375       int64_t id;
376       int version, changeset;
377
378       if (!(*itr)[0].to<int64_t>(id)) {
379         throw std::runtime_error("Node ID is not numeric.");
380       }
381       if (!(*itr)[3].to<int>(version)) {
382         throw std::runtime_error("Version is not numeric.");
383       }
384       if (!(*itr)[4].to<int>(changeset)) {
385         throw std::runtime_error("Changeset ID is not numeric.");
386       }
387
388       if (id <= prev_id) {
389          ostringstream err;
390          err << "Postgres gave us way " << id << " after way "
391              << prev_id << ", which isn't ordered. This is a bug, either "
392              << "in Postgres or in planet dump.";
393          throw std::runtime_error(err.str());
394       }
395       prev_id = id;
396
397       tagstream.get(id, &tags);
398       nodestream.get(id, &nodes);
399
400       osm_way(id, &nodes, &tags, 
401               reformDate((*itr)[1].c_str()), 
402               lookup_user((*itr)[2].c_str()), 
403               version, changeset);
404     }
405   }
406
407   resetList(&tags);
408   resetList(&nodes);
409 }
410
411 void relations(pqxx::work &xaction) {
412   struct keyval tags, members, roles;
413   initList(&tags);
414   initList(&members);
415   initList(&roles);
416   
417   ostringstream query;
418   query << "select r.id, r.timestamp, c.user_id, r.version, r.changeset_id "
419         << "from current_relations r join changesets c on "
420         << "r.changeset_id=c.id where visible = true ORDER BY id";
421   
422   icursorstream relations(xaction, query.str(), "fetch_relations", 1000);
423   tag_stream tagstream(xaction, "current_relation_tags", "relation");
424   relation_member_stream memstream(xaction);
425
426   const icursor_iterator ic_end;
427   int64_t prev_id = -1;
428   for (icursor_iterator ic_itr(relations); ic_itr != ic_end; ++ic_itr) {
429     const pqxx::result &res = *ic_itr;
430     for (pqxx::result::const_iterator itr = res.begin();
431          itr != res.end(); ++itr) {
432       int64_t id;
433       int version, changeset;
434
435       if (!(*itr)[0].to<int64_t>(id)) {
436         throw std::runtime_error("Relation ID is not numeric.");
437       }
438       if (!(*itr)[3].to<int>(version)) {
439         throw std::runtime_error("Version is not numeric.");
440       }
441       if (!(*itr)[4].to<int>(changeset)) {
442         throw std::runtime_error("Changeset ID is not numeric.");
443       }
444
445       if (id <= prev_id) {
446          ostringstream err;
447          err << "Postgres gave us relation " << id << " after relation "
448              << prev_id << ", which isn't ordered. This is a bug, either "
449              << "in Postgres or in planet dump.";
450          throw std::runtime_error(err.str());
451       }
452       prev_id = id;
453
454       tagstream.get(id, &tags);
455       memstream.get(id, &members, &roles);
456
457       osm_relation(id, &members, &roles, &tags, 
458                    reformDate((*itr)[1].c_str()), 
459                    lookup_user((*itr)[2].c_str()), 
460                    version, changeset);
461     }
462   }
463
464   resetList(&tags);
465   resetList(&members);
466   resetList(&roles);
467 }
468
469 int main(int argc, char **argv)
470 {
471   int i;
472   int want_nodes, want_ways, want_relations, want_changesets;
473     
474   if (argc == 1)
475     want_nodes = want_ways = want_relations = want_changesets = 1;
476   else {
477     want_nodes = want_ways = want_relations = want_changesets = 0;
478     for(i=1; i<argc; i++) {
479       if (!strcmp(argv[i], "--nodes"))
480         want_nodes = 1;
481       else if (!strcmp(argv[i], "--ways"))
482         want_ways = 1;
483       else if (!strcmp(argv[i], "--relations"))
484         want_relations = 1;
485       else if (!strcmp(argv[i], "--changesets"))
486         want_changesets = 1;
487       else {
488         fprintf(stderr, "Usage error:\n");
489         fprintf(stderr, "\t%s [--nodes] [--ways] [--relations] [--changesets]\n\n", argv[0]);
490         fprintf(stderr, "Writes OSM planet dump to STDOUT. If no flags are specified then nodes, ways and\n");
491         fprintf(stderr, "relations are output. If one or more flags are set then only the requested data\n");
492         fprintf(stderr, "is dumped.\n");
493         exit(2);
494       }
495     }
496   }
497
498   char *connection_params = NULL;
499   if ((connection_params = getenv("CONNECTION_PARAMS")) == NULL) {
500     fprintf(stderr, "ERROR: you must set the $CONNECTION_PARAMS environment "
501             "variable to the appropriate connection parameters.\n");
502     exit(2);
503   }
504
505   try {
506     // open database connection
507     pqxx::connection conn(connection_params);
508     pqxx::work xaction(conn);
509     xaction.exec("SET TRANSACTION ISOLATION LEVEL REPEATABLE READ READ ONLY");
510
511     fetch_users(xaction);
512
513     osm_header();
514     
515     if (want_changesets)
516       changesets(xaction);
517
518     if (want_nodes)
519       nodes(xaction);
520
521     if (want_ways)
522       ways(xaction);
523     
524     if (want_relations)
525       relations(xaction);
526
527     osm_footer();
528     
529     free_users();
530
531     // rollback happens automatically here
532   } catch (const std::exception &e) {
533     fprintf(stderr, "ERROR: %s\n", e.what());
534     return 1;
535
536   } catch (...) {
537     fprintf(stderr, "UNKNOWN ERROR!\n");
538     return 1;
539   }
540
541   return 0;
542 }