]> git.openstreetmap.org Git - chef.git/blob - cookbooks/planet/files/default/replication-bin/replicate-changesets
Fix some rubocop style issues
[chef.git] / cookbooks / planet / files / default / replication-bin / replicate-changesets
1 #!/usr/bin/ruby
2
3 require "rubygems"
4 require "pg"
5 require "yaml"
6 require "time"
7 require "fileutils"
8 require "xml/libxml"
9 require "zlib"
10
11 # after this many changes, a changeset will be closed
12 CHANGES_LIMIT = 50000
13
14 # this is the scale factor for lat/lon values stored as integers in the database
15 GEO_SCALE = 10000000
16
17 ##
18 # changeset class keeps some information about changesets downloaded from the
19 # database - enough to let us know which changesets are closed/open & recently
20 # closed.
21 class Changeset
22   attr_reader :id, :created_at, :closed_at, :num_changes
23
24   def initialize(row)
25     @id = row["id"].to_i
26     @created_at = Time.parse(row["created_at"])
27     @closed_at = Time.parse(row["closed_at"])
28     @num_changes = row["num_changes"].to_i
29   end
30
31   def closed?(t)
32     (@closed_at < t) || (@num_changes >= CHANGES_LIMIT)
33   end
34
35   def open?(t)
36     !closed?(t)
37   end
38
39   def activity_between?(t1, t2)
40     ((@closed_at >= t1) && (@closed_at < t2)) || ((@created_at >= t1) && (@created_at < t2))
41   end
42 end
43
44 ##
45 # state and connections associated with getting changeset data
46 # replicated to a file.
47 class Replicator
48   def initialize(config)
49     @config = YAML.load(File.read(config))
50     @state = YAML.load(File.read(@config["state_file"]))
51     @conn = PGconn.connect(@config["db"])
52     @now = Time.now.getutc
53   end
54
55   def open_changesets
56     last_run = @state["last_run"]
57     last_run = (@now - 60) if last_run.nil?
58     @state["last_run"] = @now
59     # pretty much all operations on a changeset will modify its closed_at
60     # time (see rails_port's changeset model). so it is probably enough
61     # for us to look at anything that was closed recently, and filter from
62     # there.
63     @conn
64       .exec("select id, created_at, closed_at, num_changes from changesets where closed_at > ((now() at time zone 'utc') - '1 hour'::interval)")
65       .map { |row| Changeset.new(row) }
66       .select { |cs| cs.activity_between?(last_run, @now) }
67   end
68
69   # creates an XML file containing the changeset information from the
70   # list of changesets output by open_changesets.
71   def changeset_dump(changesets)
72     doc = XML::Document.new
73     doc.root = XML::Node.new("osm")
74     { "version" => "0.6",
75       "generator" => "replicate_changesets.rb",
76       "copyright" => "OpenStreetMap and contributors",
77       "attribution" => "http://www.openstreetmap.org/copyright",
78       "license" => "http://opendatacommons.org/licenses/odbl/1-0/" }
79       .each { |k, v| doc.root[k] = v }
80
81     changesets.each do |cs|
82       xml = XML::Node.new("changeset")
83       xml["id"] = cs.id.to_s
84       xml["created_at"] = cs.created_at.getutc.xmlschema
85       xml["closed_at"] = cs.closed_at.getutc.xmlschema if cs.closed?(@now)
86       xml["open"] = cs.open?(@now).to_s
87       xml["num_changes"] = cs.num_changes.to_s
88
89       res = @conn.exec("select u.id, u.display_name, c.min_lat, c.max_lat, c.min_lon, c.max_lon from users u join changesets c on u.id=c.user_id where c.id=#{cs.id}")
90       xml["user"] = res[0]["display_name"]
91       xml["uid"] = res[0]["id"]
92
93       unless res[0]["min_lat"].nil? ||
94              res[0]["max_lat"].nil? ||
95              res[0]["min_lon"].nil? ||
96              res[0]["max_lon"].nil?
97         xml["min_lat"] = (res[0]["min_lat"].to_f / GEO_SCALE).to_s
98         xml["max_lat"] = (res[0]["max_lat"].to_f / GEO_SCALE).to_s
99         xml["min_lon"] = (res[0]["min_lon"].to_f / GEO_SCALE).to_s
100         xml["max_lon"] = (res[0]["max_lon"].to_f / GEO_SCALE).to_s
101       end
102
103       res = @conn.exec("select k, v from changeset_tags where changeset_id=#{cs.id}")
104       res.each do |row|
105         tag = XML::Node.new("tag")
106         tag["k"] = row["k"]
107         tag["v"] = row["v"]
108         xml << tag
109       end
110
111       doc.root << xml
112     end
113
114     doc.to_s
115   end
116
117   # saves new state (including the changeset dump xml)
118   def save!
119     File.open(@config["state_file"], "r") do |fl|
120       fl.flock(File::LOCK_EX)
121
122       sequence = (@state.key?("sequence") ? @state["sequence"] + 1 : 0)
123       data_file = @config["data_dir"] + format("/%03d/%03d/%03d.osm.gz", sequence / 1000000, (sequence / 1000) % 1000, (sequence % 1000))
124       tmp_state = @config["state_file"] + ".tmp"
125       tmp_data = "/tmp/changeset_data.osm.tmp"
126       # try and write the files to tmp locations and then
127       # move them into place later, to avoid in-progress
128       # clashes, or people seeing incomplete files.
129       begin
130         FileUtils.mkdir_p(File.dirname(data_file))
131         Zlib::GzipWriter.open(tmp_data) do |fh|
132           fh.write(changeset_dump(open_changesets))
133         end
134         @state["sequence"] = sequence
135         File.open(tmp_state, "w") do |fh|
136           fh.write(YAML.dump(@state))
137         end
138         FileUtils.mv(tmp_data, data_file)
139         FileUtils.mv(tmp_state, @config["state_file"])
140         fl.flock(File::LOCK_UN)
141
142       rescue
143         STDERR.puts("Error! Couldn't update state.")
144         fl.flock(File::LOCK_UN)
145         raise
146       end
147     end
148   end
149 end
150
151 rep = Replicator.new(ARGV[0])
152 rep.save!