Hydra class responsible to dispatching runners and communicating with the master.
The Worker is never run directly by a user. Workers are created by a Master to delegate to Runners.
The general convention is to have one Worker per machine on a distributed network.
Create a new worker.
io: The IO object to use to communicate with the master
num_runners: The number of runners to launch
# File lib/hydra/worker.rb, line 17 def initialize(opts = {}) @verbose = opts.fetch(:verbose) { false } @io = opts.fetch(:io) { raise "No IO Object" } @runners = [] @listeners = [] @options = opts.fetch(:options) { "" } load_worker_initializer @runner_event_listeners = Array(opts.fetch(:runner_listeners) { nil }) @runner_event_listeners.select{|l| l.is_a? String}.each do |l| @runner_event_listeners.delete_at(@runner_event_listeners.index(l)) listener = eval(l) @runner_event_listeners << listener if listener.is_a?(Hydra::RunnerListener::Abstract) end @runner_log_file = opts.fetch(:runner_log_file) { nil } boot_runners(opts.fetch(:runners) { 1 }) @io.write(Hydra::Messages::Worker::WorkerBegin.new) process_messages @runners.each{|r| Process.wait r[:pid] } end
When the master sends a file down to the worker, it hits this method. Then the worker delegates the file down to a runner.
# File lib/hydra/worker.rb, line 62 def delegate_file(message) runner = idle_runner runner[:idle] = false runner[:io].write(RunFile.new(eval(message.serialize))) end
# File lib/hydra/worker.rb, line 42 def load_worker_initializer if File.exist?('./hydra_worker_init.rb') trace('Requiring hydra_worker_init.rb') require 'hydra_worker_init' else trace('hydra_worker_init.rb not present') end end
When a runner finishes, it sends the results up to the worker. Then the worker sends the results up to the master.
# File lib/hydra/worker.rb, line 70 def relay_results(message, runner) runner[:idle] = true @io.write(Results.new(eval(message.serialize))) end
When a runner wants a file, it hits this method with a message. Then the worker bubbles the file request up to the master.
# File lib/hydra/worker.rb, line 55 def request_file(message, runner) @io.write(RequestFile.new) runner[:idle] = true end
When a master issues a shutdown order, it hits this method, which causes the worker to send shutdown messages to its runners.
# File lib/hydra/worker.rb, line 77 def shutdown @running = false trace "Notifying #{@runners.size} Runners of Shutdown" @runners.each do |r| trace "Sending Shutdown to Runner" trace "\t#{r.inspect}" r[:io].write(Shutdown.new) end Thread.exit end
# File lib/hydra/worker.rb, line 120 def process_messages_from_master @listeners << Thread.new do while @running begin message = @io.gets if message and !message.class.to_s.index("Master").nil? trace "Received Message from Master" trace "\t#{message.inspect}" message.handle(self) else trace "Nothing from Master, Pinging" @io.write Ping.new end rescue IOError => ex trace "Worker lost Master" shutdown end end end end
# File lib/hydra/worker.rb, line 141 def process_messages_from_runners @runners.each do |r| @listeners << Thread.new do while @running begin message = r[:io].gets if message and !message.class.to_s.index("Runner").nil? trace "Received Message from Runner" trace "\t#{message.inspect}" message.handle(self, r) end rescue IOError => ex trace "Worker lost Runner [#{r.inspect}]" Thread.exit end end end end end