Manage the replication directory
[chef.git] / cookbooks / planet / files / default / replication-bin / replicate-changesets
1 #!/usr/bin/ruby
2
3 require 'rubygems'
4 require 'pg'
5 require 'yaml'
6 require 'time'
7 require 'fileutils'
8 require 'xml/libxml'
9 require 'zlib'
10
11 # after this many changes, a changeset will be closed
12 CHANGES_LIMIT=50000
13
14 # this is the scale factor for lat/lon values stored as integers in the database
15 GEO_SCALE=10000000
16
17 ##
18 # changeset class keeps some information about changesets downloaded from the
19 # database - enough to let us know which changesets are closed/open & recently
20 # closed.
21 class Changeset
22   attr_reader :id, :created_at, :closed_at, :num_changes
23
24   def initialize(row)
25     @id = row['id'].to_i
26     @created_at = Time.parse(row['created_at'])
27     @closed_at = Time.parse(row['closed_at'])
28     @num_changes = row['num_changes'].to_i
29   end
30
31   def closed?(t)
32     (@closed_at < t) || (@num_changes >= CHANGES_LIMIT)
33   end
34
35   def open?(t)
36     not closed?(t)
37   end
38
39   def activity_between?(t1, t2)
40     ((@closed_at >= t1) && (@closed_at < t2)) || ((@created_at >= t1) && (@created_at < t2))
41   end
42 end
43
44 ##
45 # state and connections associated with getting changeset data
46 # replicated to a file.
47 class Replicator
48   def initialize(config)
49     @config = YAML.load(File.read(config))
50     @state = YAML.load(File.read(@config['state_file']))
51     @conn = PGconn.connect(@config['db'])
52     @now = Time.now.getutc
53   end
54
55   def open_changesets
56     last_run = @state['last_run']
57     last_run = (@now - 60) if last_run.nil?
58     @state['last_run'] = @now
59     # pretty much all operations on a changeset will modify its closed_at
60     # time (see rails_port's changeset model). so it is probably enough 
61     # for us to look at anything that was closed recently, and filter from
62     # there.
63     @conn.
64       exec("select id, created_at, closed_at, num_changes from changesets where closed_at > ((now() at time zone 'utc') - '1 hour'::interval)").
65       map {|row| Changeset.new(row) }.
66       select {|cs| cs.activity_between?(last_run, @now) }
67   end
68
69   # creates an XML file containing the changeset information from the 
70   # list of changesets output by open_changesets.
71   def changeset_dump(changesets)
72     doc = XML::Document.new
73     doc.root = XML::Node.new("osm")
74     { 'version' => '0.6',
75       'generator' => 'replicate_changesets.rb',
76       'copyright' => "OpenStreetMap and contributors",
77       'attribution' => "http://www.openstreetmap.org/copyright",
78       'license' => "http://opendatacommons.org/licenses/odbl/1-0/" }.
79       each { |k,v| doc.root[k] = v }
80
81     changesets.each do |cs|
82       xml = XML::Node.new("changeset")
83       xml['id'] = cs.id.to_s
84       xml['created_at'] = cs.created_at.getutc.xmlschema
85       xml['closed_at'] = cs.closed_at.getutc.xmlschema if cs.closed?(@now)
86       xml['open'] = cs.open?(@now).to_s
87
88       res = @conn.exec("select u.id, u.display_name, c.min_lat, c.max_lat, c.min_lon, c.max_lon from users u join changesets c on u.id=c.user_id where c.id=#{cs.id}")
89       xml['user'] = res[0]['display_name']
90       xml['uid'] = res[0]['id']
91
92       unless (res[0]['min_lat'].nil? ||
93               res[0]['max_lat'].nil? ||
94               res[0]['min_lon'].nil? ||
95               res[0]['max_lon'].nil?)
96         xml['min_lat'] = (res[0]['min_lat'].to_f / GEO_SCALE).to_s
97         xml['max_lat'] = (res[0]['max_lat'].to_f / GEO_SCALE).to_s
98         xml['min_lon'] = (res[0]['min_lon'].to_f / GEO_SCALE).to_s
99         xml['max_lon'] = (res[0]['max_lon'].to_f / GEO_SCALE).to_s
100       end
101
102       res = @conn.exec("select k, v from changeset_tags where changeset_id=#{cs.id}")
103       res.each do |row|
104         tag = XML::Node.new("tag")
105         tag['k'] = row['k']
106         tag['v'] = row['v']
107         xml << tag
108       end
109
110       doc.root << xml
111     end
112     
113     doc.to_s
114   end
115
116   # saves new state (including the changeset dump xml)
117   def save!
118     File.open(@config['state_file'], "r") do |fl|
119       fl.flock(File::LOCK_EX)
120
121       sequence = (@state.has_key?('sequence') ? @state['sequence'] + 1 : 0)
122       data_file = @config['data_dir'] + sprintf("/%03d/%03d/%03d.osm.gz", sequence / 1000000, (sequence / 1000) % 1000, (sequence % 1000));
123       tmp_state = @config['state_file'] + ".tmp"
124       tmp_data = "/tmp/changeset_data.osm.tmp"
125       # try and write the files to tmp locations and then
126       # move them into place later, to avoid in-progress
127       # clashes, or people seeing incomplete files.
128       begin
129         FileUtils.mkdir_p(File.dirname(data_file))
130         Zlib::GzipWriter.open(tmp_data) do |fh|
131           fh.write(changeset_dump(open_changesets))
132         end
133         @state['sequence'] = sequence
134         File.open(tmp_state, "w") do |fh|
135           fh.write(YAML.dump(@state))
136         end
137         FileUtils.mv(tmp_data, data_file)
138         FileUtils.mv(tmp_state, @config['state_file'])
139         fl.flock(File::LOCK_UN)
140
141       rescue
142         STDERR.puts("Error! Couldn't update state.")
143         fl.flock(File::LOCK_UN)
144         raise
145       end
146     end
147   end
148 end
149
150 rep = Replicator.new(ARGV[0])
151 rep.save!
152