Bring changeset replication task under chef control
authorTom Hughes <tom@compton.nu>
Wed, 3 Jul 2013 21:54:46 +0000 (22:54 +0100)
committerTom Hughes <tom@compton.nu>
Wed, 3 Jul 2013 21:54:46 +0000 (22:54 +0100)
cookbooks/planet/recipes/replication.rb
cookbooks/planet/templates/default/changesets.bin.erb [new file with mode: 0644]
cookbooks/planet/templates/default/changesets.conf.erb [new file with mode: 0644]
cookbooks/planet/templates/default/replication.cron.erb

index f055280f1e92ea294fc6cac462f513863c5bd549..2d9dc4f4b631839f34ca9d485ba27b5d98c87dd8 100644 (file)
@@ -21,6 +21,18 @@ include_recipe "osmosis"
 
 db_passwords = data_bag_item("db", "passwords")
 
+package "ruby"
+package "ruby-libxml"
+
+gem_package "pg"
+
+template "/usr/local/bin/replicate-changesets" do
+  source "changesets.bin.erb"
+  owner "root"
+  group "root"
+  mode 0755
+end
+
 directory "/etc/replication" do
   owner "root"
   group "root"
@@ -35,6 +47,14 @@ template "/etc/replication/auth.conf" do
   variables :password => db_passwords["planetdiff"]
 end
 
+template "/etc/replication/changesets.conf" do
+  source "changesets.conf.erb"
+  user "root"
+  group "planet"
+  mode 0640
+  variables :password => db_passwords["planetdiff"]
+end
+
 directory "/var/lib/replication" do
   owner "planet"
   group "planet"
diff --git a/cookbooks/planet/templates/default/changesets.bin.erb b/cookbooks/planet/templates/default/changesets.bin.erb
new file mode 100644 (file)
index 0000000..443d8ed
--- /dev/null
@@ -0,0 +1,152 @@
+#!/usr/bin/ruby
+
+require 'rubygems'
+require 'pg'
+require 'yaml'
+require 'time'
+require 'fileutils'
+require 'xml/libxml'
+require 'zlib'
+
+# after this many changes, a changeset will be closed
+CHANGES_LIMIT=50000
+
+# this is the scale factor for lat/lon values stored as integers in the database
+GEO_SCALE=10000000
+
+##
+# changeset class keeps some information about changesets downloaded from the
+# database - enough to let us know which changesets are closed/open & recently
+# closed.
+class Changeset
+  attr_reader :id, :created_at, :closed_at, :num_changes
+
+  def initialize(row)
+    @id = row['id'].to_i
+    @created_at = Time.parse(row['created_at'])
+    @closed_at = Time.parse(row['closed_at'])
+    @num_changes = row['num_changes'].to_i
+  end
+
+  def closed?(t)
+    (@closed_at < t) || (@num_changes >= CHANGES_LIMIT)
+  end
+
+  def open?(t)
+    not closed?(t)
+  end
+
+  def activity_between?(t1, t2)
+    ((@closed_at >= t1) && (@closed_at < t2)) || ((@created_at >= t1) && (@created_at < t2))
+  end
+end
+
+##
+# state and connections associated with getting changeset data
+# replicated to a file.
+class Replicator
+  def initialize(config)
+    @config = YAML.load(File.read(config))
+    @state = YAML.load(File.read(@config['state_file']))
+    @conn = PGconn.connect(@config['db'])
+    @now = Time.now.getutc
+  end
+
+  def open_changesets
+    last_run = @state['last_run']
+    last_run = (@now - 60) if last_run.nil?
+    @state['last_run'] = @now
+    # pretty much all operations on a changeset will modify its closed_at
+    # time (see rails_port's changeset model). so it is probably enough 
+    # for us to look at anything that was closed recently, and filter from
+    # there.
+    @conn.
+      exec("select id, created_at, closed_at, num_changes from changesets where closed_at > ((now() at time zone 'utc') - '1 hour'::interval)").
+      map {|row| Changeset.new(row) }.
+      select {|cs| cs.activity_between?(last_run, @now) }
+  end
+
+  # creates an XML file containing the changeset information from the 
+  # list of changesets output by open_changesets.
+  def changeset_dump(changesets)
+    doc = XML::Document.new
+    doc.root = XML::Node.new("osm")
+    { 'version' => '0.6',
+      'generator' => 'replicate_changesets.rb',
+      'copyright' => "OpenStreetMap and contributors",
+      'attribution' => "http://www.openstreetmap.org/copyright",
+      'license' => "http://opendatacommons.org/licenses/odbl/1-0/" }.
+      each { |k,v| doc.root[k] = v }
+
+    changesets.each do |cs|
+      xml = XML::Node.new("changeset")
+      xml['id'] = cs.id.to_s
+      xml['created_at'] = cs.created_at.getutc.xmlschema
+      xml['closed_at'] = cs.closed_at.getutc.xmlschema if cs.closed?(@now)
+      xml['open'] = cs.open?(@now).to_s
+
+      res = @conn.exec("select u.id, u.display_name, c.min_lat, c.max_lat, c.min_lon, c.max_lon from users u join changesets c on u.id=c.user_id where c.id=#{cs.id}")
+      xml['user'] = res[0]['display_name']
+      xml['uid'] = res[0]['id']
+
+      unless (res[0]['min_lat'].nil? ||
+              res[0]['max_lat'].nil? ||
+              res[0]['min_lon'].nil? ||
+              res[0]['max_lon'].nil?)
+        xml['min_lat'] = (res[0]['min_lat'].to_f / GEO_SCALE).to_s
+        xml['max_lat'] = (res[0]['max_lat'].to_f / GEO_SCALE).to_s
+        xml['min_lon'] = (res[0]['min_lon'].to_f / GEO_SCALE).to_s
+        xml['max_lon'] = (res[0]['max_lon'].to_f / GEO_SCALE).to_s
+      end
+
+      res = @conn.exec("select k, v from changeset_tags where changeset_id=#{cs.id}")
+      res.each do |row|
+        tag = XML::Node.new("tag")
+        tag['k'] = row['k']
+        tag['v'] = row['v']
+        xml << tag
+      end
+
+      doc.root << xml
+    end
+    
+    doc.to_s
+  end
+
+  # saves new state (including the changeset dump xml)
+  def save!
+    File.open(@config['state_file'], "r") do |fl|
+      fl.flock(File::LOCK_EX)
+
+      sequence = (@state.has_key?('sequence') ? @state['sequence'] + 1 : 0)
+      data_file = @config['data_dir'] + sprintf("/%03d/%03d/%03d.osm.gz", sequence / 1000000, (sequence / 1000) % 1000, (sequence % 1000));
+      tmp_state = @config['state_file'] + ".tmp"
+      tmp_data = "/tmp/changeset_data.osm.tmp"
+      # try and write the files to tmp locations and then
+      # move them into place later, to avoid in-progress
+      # clashes, or people seeing incomplete files.
+      begin
+        FileUtils.mkdir_p(File.dirname(data_file))
+        Zlib::GzipWriter.open(tmp_data) do |fh|
+          fh.write(changeset_dump(open_changesets))
+        end
+        @state['sequence'] = sequence
+        File.open(tmp_state, "w") do |fh|
+          fh.write(YAML.dump(@state))
+        end
+        FileUtils.mv(tmp_data, data_file)
+        FileUtils.mv(tmp_state, @config['state_file'])
+        fl.flock(File::LOCK_UN)
+
+      rescue
+        STDERR.puts("Error! Couldn't update state.")
+        fl.flock(File::LOCK_UN)
+        raise
+      end
+    end
+  end
+end
+
+rep = Replicator.new(ARGV[0])
+rep.save!
+
diff --git a/cookbooks/planet/templates/default/changesets.conf.erb b/cookbooks/planet/templates/default/changesets.conf.erb
new file mode 100644 (file)
index 0000000..72ac2a0
--- /dev/null
@@ -0,0 +1,3 @@
+state_file: /store/planet/replication/changesets/state.yaml
+db: host=db dbname=openstreetmap user=planetdiff password=<%= @password %>
+data_dir: /store/planet/replication/changesets
index 6993b1505a0f4e4971b8b52b59e33bad4c99476f..dead114ef360612742a7d18ec011b5bdd2259cf6 100644 (file)
@@ -1,8 +1,13 @@
 # DO NOT EDIT - This file is being maintained by Chef
 
-MAILTO=brett@bretth.com
 TZ=UTC
 
+MAILTO=brett@bretth.com
+
 * * * * * planet /usr/local/bin/osmosis -q --replicate-apidb authFile=/etc/replication/auth.conf validateSchemaVersion=false --write-replication workingDirectory=/store/planet/replication/minute
 2 * * * * planet /usr/local/bin/osmosis -q --merge-replication-files workingDirectory=/var/lib/replication/hour
 5 * * * * planet /usr/local/bin/osmosis -q --merge-replication-files workingDirectory=/var/lib/replication/day
+
+MAILTO=zerebubuth@gmail.com
+
+* * * * * planet /usr/local/bin/replicate-changesets /etc/replication/changesets.conf