class Hydra::Worker

Hydra class responsible to dispatching runners and communicating with the master.

The Worker is never run directly by a user. Workers are created by a Master to delegate to Runners.

The general convention is to have one Worker per machine on a distributed network.

Attributes

runners[R]

Public Class Methods

new(opts = {}) click to toggle source

Create a new worker.

  • io: The IO object to use to communicate with the master

  • num_runners: The number of runners to launch

# File lib/hydra/worker.rb, line 17
def initialize(opts = {})
  @verbose = opts.fetch(:verbose) { false }
  @io = opts.fetch(:io) { raise "No IO Object" }
  @runners = []
  @listeners = []
  @options = opts.fetch(:options) { "" }

  load_worker_initializer

  @runner_event_listeners = Array(opts.fetch(:runner_listeners) { nil })
  @runner_event_listeners.select{|l| l.is_a? String}.each do |l|
    @runner_event_listeners.delete_at(@runner_event_listeners.index(l))
    listener = eval(l)
    @runner_event_listeners << listener if listener.is_a?(Hydra::RunnerListener::Abstract)
  end
  @runner_log_file = opts.fetch(:runner_log_file) { nil }

  boot_runners(opts.fetch(:runners) { 1 })
  @io.write(Hydra::Messages::Worker::WorkerBegin.new)

  process_messages

  @runners.each{|r| Process.wait r[:pid] }
end

Public Instance Methods

delegate_file(message) click to toggle source

When the master sends a file down to the worker, it hits this method. Then the worker delegates the file down to a runner.

# File lib/hydra/worker.rb, line 62
def delegate_file(message)
  runner = idle_runner
  runner[:idle] = false
  runner[:io].write(RunFile.new(eval(message.serialize)))
end
load_worker_initializer() click to toggle source
# File lib/hydra/worker.rb, line 42
def load_worker_initializer
  if File.exist?('./hydra_worker_init.rb')
    trace('Requiring hydra_worker_init.rb')
    require 'hydra_worker_init'
  else
    trace('hydra_worker_init.rb not present')
  end
end
relay_results(message, runner) click to toggle source

When a runner finishes, it sends the results up to the worker. Then the worker sends the results up to the master.

# File lib/hydra/worker.rb, line 70
def relay_results(message, runner)
  runner[:idle] = true
  @io.write(Results.new(eval(message.serialize)))
end
request_file(message, runner) click to toggle source

When a runner wants a file, it hits this method with a message. Then the worker bubbles the file request up to the master.

# File lib/hydra/worker.rb, line 55
def request_file(message, runner)
  @io.write(RequestFile.new)
  runner[:idle] = true
end
shutdown() click to toggle source

When a master issues a shutdown order, it hits this method, which causes the worker to send shutdown messages to its runners.

# File lib/hydra/worker.rb, line 77
def shutdown
  @running = false
  trace "Notifying #{@runners.size} Runners of Shutdown"
  @runners.each do |r|
    trace "Sending Shutdown to Runner"
    trace "\t#{r.inspect}"
    r[:io].write(Shutdown.new)
  end
  Thread.exit
end

Private Instance Methods

process_messages_from_master() click to toggle source
# File lib/hydra/worker.rb, line 120
def process_messages_from_master
  @listeners << Thread.new do
    while @running
      begin
        message = @io.gets
        if message and !message.class.to_s.index("Master").nil?
          trace "Received Message from Master"
          trace "\t#{message.inspect}"
          message.handle(self)
        else
          trace "Nothing from Master, Pinging"
          @io.write Ping.new
        end
      rescue IOError => ex
        trace "Worker lost Master"
        shutdown
      end
    end
  end
end
process_messages_from_runners() click to toggle source
# File lib/hydra/worker.rb, line 141
def process_messages_from_runners
  @runners.each do |r|
    @listeners << Thread.new do
      while @running
        begin
          message = r[:io].gets
          if message and !message.class.to_s.index("Runner").nil?
            trace "Received Message from Runner"
            trace "\t#{message.inspect}"
            message.handle(self, r)
          end
        rescue IOError => ex
          trace "Worker lost Runner [#{r.inspect}]"
          Thread.exit
        end
      end
    end
  end
end