summaryrefslogtreecommitdiff
path: root/vendor/rsync_command/lib/rsync_command.rb
blob: bdcafe0a8d92299272a407d76fedd23f9b8407d0 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
require "rsync_command/version"
require "rsync_command/ssh_options"
require "rsync_command/thread_pool"

require 'monitor'

class RsyncRunner
  attr_accessor :logger
  attr_accessor :source, :dest, :flags, :includes, :excludes
  attr_accessor :user, :host
  attr_accessor :chdir, :ssh
  def initialize(rsync_command)
    @logger = nil
    @source = ""
    @dest   = ""
    @flags  = ""
    @includes = []
    @excludes = []
    @rsync_command = rsync_command
  end
  def log(*args)
    @logger.log(*args)
  end
  def valid?
    !@source.empty? || !@dest.empty?
  end
  def to_hash
    fields = [:flags, :includes, :excludes, :logger, :ssh, :chdir]
    fields.inject({}){|hsh, i|
      hsh[i] = self.send(i); hsh
    }
  end
  def exec
    return unless valid?
    dest = {
      :user => self.user,
      :host => self.host,
      :path => self.dest
    }
    src = self.source
    @rsync_command.exec_rsync(src, dest, self.to_hash)
  end
end

class RsyncCommand
  attr_accessor :failures, :logger

  def initialize(options={})
    @options = options.dup
    @logger = @options.delete(:logger)
    @flags = @options.delete(:flags)
    @failures = []
    @failures.extend(MonitorMixin)
  end

  #
  # takes an Enumerable and iterates each item in the list in parallel.
  #
  def asynchronously(array, &block)
    pool = ThreadPool.new
    array.each do |item|
      pool.schedule(RsyncRunner.new(self), item, &block)
    end
    pool.shutdown
  end

  #
  # returns true if last exec returned a failure
  #
  def failed?
    @failures && @failures.any?
  end

  #
  # runs rsync, recording failures
  #
  def exec_rsync(src, dest, options={})
    logger = options[:logger] || @logger
    @failures.synchronize do
      @failures.clear
    end
    rsync_cmd = command(src, dest, options)
    if options[:chdir]
      rsync_cmd = "cd '#{options[:chdir]}'; #{rsync_cmd}"
    end
    logger.debug rsync_cmd if logger
    ok = system(rsync_cmd)
    unless ok
      @failures.synchronize do
        @failures << {:source => src, :dest => dest, :options => options.dup}
      end
    end
  end

  #
  # build rsync command
  #
  def command(src, dest, options={})
    src = remote_address(src)
    dest = remote_address(dest)
    options = @options.merge(options)
    flags = []
    flags << @flags if @flags
    flags << options[:flags] if options.has_key?(:flags)
    flags << '--delete' if options[:delete]
    flags << includes(options[:includes]) if options.has_key?(:includes)
    flags << excludes(options[:excludes]) if options.has_key?(:excludes)
    flags << SshOptions.new(options[:ssh]).to_flags if options.has_key?(:ssh)
    "rsync #{flags.compact.join(' ')} #{src} #{dest}"
  end

  #
  # Creates an rsync location if the +address+ is a hash with keys :user, :host, and :path
  # (each component is optional). If +address+ is a string, we just pass it through.
  #
  def remote_address(address)
    if address.is_a? String
      address # assume it is already formatted.
    elsif address.is_a? Hash
      [[address[:user], address[:host]].compact.join('@'), address[:path]].compact.join(':')
    end
  end

  def excludes(patterns)
    [patterns].flatten.compact.map { |p| "--exclude='#{p}'" }
  end

  def includes(patterns)
    [patterns].flatten.compact.map { |p| "--include='#{p}'" }
  end

end