Object
Low level connection which maps commands and supports synchronous receives
alias :obj_send :send
# File lib/stomp/connection.rb, line 14 def self.default_port(ssl) ssl ? 61612 : 61613 end
A new Connection object accepts the following parameters:
login (String, default : '') passcode (String, default : '') host (String, default : 'localhost') port (Integer, default : 61613) reliable (Boolean, default : false) reconnect_delay (Integer, default : 5) e.g. c = Connection.new("username", "password", "localhost", 61613, true)
Hash:
hash = { :hosts => [ {:login => "login1", :passcode => "passcode1", :host => "localhost", :port => 61616, :ssl => false}, {:login => "login2", :passcode => "passcode2", :host => "remotehost", :port => 61617, :ssl => false} ], :initial_reconnect_delay => 0.01, :max_reconnect_delay => 30.0, :use_exponential_back_off => true, :back_off_multiplier => 2, :max_reconnect_attempts => 0, :randomize => false, :backup => false, :timeout => -1, :connect_headers => {}, :parse_timeout => 5, :logger => nil, } e.g. c = Connection.new(hash)
TODO Stomp URL :
A Stomp URL must begin with 'stomp://' and can be in one of the following forms: stomp://host:port stomp://host.domain.tld:port stomp://user:pass@host:port stomp://user:pass@host.domain.tld:port
# File lib/stomp/connection.rb, line 60 def initialize(login = '', passcode = '', host = 'localhost', port = 61613, reliable = false, reconnect_delay = 5, connect_headers = {}) @received_messages = [] if login.is_a?(Hash) hashed_initialize(login) else @host = host @port = port @login = login @passcode = passcode @reliable = reliable @reconnect_delay = reconnect_delay @connect_headers = connect_headers @ssl = false @parameters = nil @parse_timeout = 5 # To override, use hashed parameters @logger = nil # To override, use hashed parameters end # Use Mutexes: only one lock per each thread # Revert to original implementation attempt @transmit_semaphore = Mutex.new @read_semaphore = Mutex.new @socket_semaphore = Mutex.new @subscriptions = {} @failure = nil @connection_attempts = 0 socket end
Syntactic sugar for 'Connection.new' See 'initialize' for usage.
# File lib/stomp/connection.rb, line 108 def Connection.open(login = '', passcode = '', host = 'localhost', port = 61613, reliable = false, reconnect_delay = 5, connect_headers = {}) Connection.new(login, passcode, host, port, reliable, reconnect_delay, connect_headers) end
Receive a frame, block until the frame is received
# File lib/stomp/connection.rb, line 340 def __old_receive # The recive my fail so we may need to retry. while TRUE begin used_socket = socket return _receive(used_socket) rescue @failure = $! raise unless @reliable errstr = "receive failed: #{$!}" if @logger && @logger.respond_to?(:on_miscerr) @logger.on_miscerr(log_params, errstr) else $stderr.print errstr end end end end
Abort a transaction by name
# File lib/stomp/connection.rb, line 234 def abort(name, headers = {}) headers[:transaction] = name transmit("ABORT", headers) end
Acknowledge a message, used when a subscription has specified client acknowledgement ( connection.subscribe "/queue/a", :ack => 'client'g
Accepts a transaction header ( :transaction => 'some_transaction_id' )
# File lib/stomp/connection.rb, line 222 def ack(message_id, headers = {}) headers['message-id'] = message_id transmit("ACK", headers) end
Begin a transaction, requires a name for the transaction
# File lib/stomp/connection.rb, line 213 def begin(name, headers = {}) headers[:transaction] = name transmit("BEGIN", headers) end
# File lib/stomp/connection.rb, line 175 def change_host @parameters[:hosts] = @parameters[:hosts].sort_by { rand } if @parameters[:randomize] # Set first as master and send it to the end of array current_host = @parameters[:hosts].shift @parameters[:hosts] << current_host @ssl = current_host[:ssl] @host = current_host[:host] @port = current_host[:port] || Connection::default_port(@ssl) @login = current_host[:login] || "" @passcode = current_host[:passcode] || "" end
# File lib/stomp/connection.rb, line 314 def client_ack?(message) headers = @subscriptions[message.headers[:destination]] !headers.nil? && headers[:ack] == "client" end
Is this connection closed?
# File lib/stomp/connection.rb, line 208 def closed? @closed end
Commit a transaction by name
# File lib/stomp/connection.rb, line 228 def commit(name, headers = {}) headers[:transaction] = name transmit("COMMIT", headers) end
Close this connection
# File lib/stomp/connection.rb, line 320 def disconnect(headers = {}) transmit("DISCONNECT", headers) headers = headers.symbolize_keys @disconnect_receipt = receive if headers[:receipt] if @logger && @logger.respond_to?(:on_disconnect) @logger.on_disconnect(log_params) end close_socket end
# File lib/stomp/connection.rb, line 92 def hashed_initialize(params) @parameters = refine_params(params) @reliable = true @reconnect_delay = @parameters[:initial_reconnect_delay] @connect_headers = @parameters[:connect_headers] @parse_timeout = @parameters[:parse_timeout] @logger = @parameters[:logger] #sets the first host to connect change_host if @logger && @logger.respond_to?(:on_connecting) @logger.on_connecting(log_params) end end
# File lib/stomp/connection.rb, line 194 def increase_reconnect_delay @reconnect_delay *= @parameters[:back_off_multiplier] if @parameters[:use_exponential_back_off] @reconnect_delay = @parameters[:max_reconnect_delay] if @reconnect_delay > @parameters[:max_reconnect_delay] @reconnect_delay end
# File lib/stomp/connection.rb, line 190 def max_reconnect_attempts? !(@parameters.nil? || @parameters[:max_reconnect_attempts].nil?) && @parameters[:max_reconnect_attempts] != 0 && @connection_attempts >= @parameters[:max_reconnect_attempts] end
# File lib/stomp/connection.rb, line 270 def obj_send(*args) __send__(*args) end
Is this connection open?
# File lib/stomp/connection.rb, line 203 def open? !@closed end
Return a pending message if one is available, otherwise return nil
# File lib/stomp/connection.rb, line 332 def poll # No need for a read lock here. The receive method eventually fullfills # that requirement. return nil if @socket.nil? || !@socket.ready? receive end
Publish message to destination
To disable content length header ( :suppress_content_length => true ) Accepts a transaction header ( :transaction => 'some_transaction_id' )
# File lib/stomp/connection.rb, line 265 def publish(destination, message, headers = {}) headers[:destination] = destination transmit("SEND", headers, message) end
# File lib/stomp/connection.rb, line 359 def receive super_result = __old_receive if super_result.nil? && @reliable errstr = "connection.receive returning EOF as nil - resetting connection.\n" if @logger && @logger.respond_to?(:on_miscerr) @logger.on_miscerr(log_params, errstr) else $stderr.print errstr end @socket = nil super_result = __old_receive end return super_result end
# File lib/stomp/connection.rb, line 153 def refine_params(params) params = params.uncamelize_and_symbolize_keys default_params = { :connect_headers => {}, # Failover parameters :initial_reconnect_delay => 0.01, :max_reconnect_delay => 30.0, :use_exponential_back_off => true, :back_off_multiplier => 2, :max_reconnect_attempts => 0, :randomize => false, :backup => false, :timeout => -1, # Parse Timeout :parse_timeout => 5 } default_params.merge(params) end
# File lib/stomp/connection.rb, line 274 def send(*args) warn("This method is deprecated and will be removed on the next release. Use 'publish' instead") publish(*args) end
# File lib/stomp/connection.rb, line 112 def socket @socket_semaphore.synchronize do used_socket = @socket used_socket = nil if closed? while used_socket.nil? || !@failure.nil? @failure = nil begin used_socket = open_socket # Open complete connect(used_socket) if @logger && @logger.respond_to?(:on_connected) @logger.on_connected(log_params) end @connection_attempts = 0 rescue @failure = $! used_socket = nil raise unless @reliable if @logger && @logger.respond_to?(:on_connectfail) @logger.on_connectfail(log_params) else $stderr.print "connect to #{@host} failed: #{$!} will retry(##{@connection_attempts}) in #{@reconnect_delay}\n" end raise Stomp::Error::MaxReconnectAttempts if max_reconnect_attempts? sleep(@reconnect_delay) @connection_attempts += 1 if @parameters change_host increase_reconnect_delay end end end @socket = used_socket end end
Subscribe to a destination, must specify a name
# File lib/stomp/connection.rb, line 240 def subscribe(name, headers = {}, subId = nil) headers[:destination] = name transmit("SUBSCRIBE", headers) # Store the sub so that we can replay if we reconnect. if @reliable subId = name if subId.nil? @subscriptions[subId] = headers end end
Send a message back to the source or to the dead letter queue
Accepts a dead letter queue option ( :dead_letter_queue => "/queue/DLQ" ) Accepts a limit number of redeliveries option ( :max_redeliveries => 6 ) Accepts a force client acknowledgement option (:force_client_ack => true)
# File lib/stomp/connection.rb, line 284 def unreceive(message, options = {}) options = { :dead_letter_queue => "/queue/DLQ", :max_redeliveries => 6 }.merge options # Lets make sure all keys are symbols message.headers = message.headers.symbolize_keys retry_count = message.headers[:retry_count].to_i || 0 message.headers[:retry_count] = retry_count + 1 transaction_id = "transaction-#{message.headers[:'message-id']}-#{retry_count}" message_id = message.headers.delete(:'message-id') begin self.begin transaction_id if client_ack?(message) || options[:force_client_ack] self.ack(message_id, :transaction => transaction_id) end if retry_count <= options[:max_redeliveries] self.publish(message.headers[:destination], message.body, message.headers.merge(:transaction => transaction_id)) else # Poison ack, sending the message to the DLQ self.publish(options[:dead_letter_queue], message.body, message.headers.merge(:transaction => transaction_id, :original_destination => message.headers[:destination], :persistent => true)) end self.commit transaction_id rescue Exception => exception self.abort transaction_id raise exception end end
Unsubscribe from a destination, must specify a name
# File lib/stomp/connection.rb, line 252 def unsubscribe(name, headers = {}, subId = nil) headers[:destination] = name transmit("UNSUBSCRIBE", headers) if @reliable subId = name if subId.nil? @subscriptions.delete(subId) end end
Generated with the Darkfish Rdoc Generator 2.