class Stomp::Connection

Low level connection which maps commands and supports synchronous receives

Attributes

autoflush[RW]

Autoflush forces a flush on each transmit. This may be changed dynamically by calling code.

connection_frame[R]

The CONNECTED frame from the broker.

disconnect_receipt[R]

Any disconnect RECEIPT frame if requested.

hb_received[R]

Heartbeat receive has been on time.

hb_sent[R]

Heartbeat send has been successful.

protocol[R]

The Stomp Protocol version.

session[R]

A unique session ID, assigned by the broker.

Public Class Methods

default_port(ssl) click to toggle source

::default_port returns the default port used by the gem for TCP or SSL.

# File lib/stomp/connection.rb, line 38
def self.default_port(ssl)
  ssl ? 61612 : 61613
end
new(login = '', passcode = '', host = 'localhost', port = 61613, reliable = false, reconnect_delay = 5, connect_headers = {}) click to toggle source

A new Connection object can be initialized using two forms:

Hash (this is the recommended Connection initialization method):

hash = {
  :hosts => [
    {:login => "login1", :passcode => "passcode1", :host => "localhost", :port => 61616, :ssl => false},
    {:login => "login2", :passcode => "passcode2", :host => "remotehost", :port => 61617, :ssl => false}
  ],
  :reliable => true,
  :initial_reconnect_delay => 0.01,
  :max_reconnect_delay => 30.0,
  :use_exponential_back_off => true,
  :back_off_multiplier => 2,
  :max_reconnect_attempts => 0,
  :randomize => false,
  :connect_timeout => 0,
  :connect_headers => {},
  :parse_timeout => 5,
  :logger => nil,
  :dmh => false,
  :closed_check => true,
  :hbser => false,
  :stompconn => false,
  :usecrlf => false,
}

e.g. c = Stomp::Connection.new(hash)

Positional parameters:

login             (String,  default : '')
passcode          (String,  default : '')
host              (String,  default : 'localhost')
port              (Integer, default : 61613)
reliable          (Boolean, default : false)
reconnect_delay   (Integer, default : 5)

e.g. c = Stomp::Connection.new("username", "password", "localhost", 61613, true)
# File lib/stomp/connection.rb, line 82
def initialize(login = '', passcode = '', host = 'localhost', port = 61613, reliable = false, reconnect_delay = 5, connect_headers = {})
  @protocol = Stomp::SPL_10 # Assumed at first
  @hb_received = true       # Assumed at first
  @hb_sent = true           # Assumed at first
  @hbs = @hbr = false       # Sending/Receiving heartbeats. Assume no for now.

  if login.is_a?(Hash)
    hashed_initialize(login)
  else
    @host = host
    @port = port
    @login = login
    @passcode = passcode
    @reliable = reliable
    @reconnect_delay = reconnect_delay
    @connect_headers = connect_headers
    @ssl = false
    @parameters = nil
    @parse_timeout = 5              # To override, use hashed parameters
    @connect_timeout = 0    # To override, use hashed parameters
    @logger = nil                   # To override, use hashed parameters
    @autoflush = false    # To override, use hashed parameters or setter
    @closed_check = true  # Run closed check in each protocol method
    @hbser = false        # Raise if heartbeat send exception
    @stompconn = false    # If true, use STOMP rather than CONNECT
    @usecrlf = false      # If true, use \r\n as line ends (1.2 only)
    warn "login looks like a URL, do you have the correct parameters?" if @login =~ /:\/\//
  end

  # Use Mutexes:  only one lock per each thread.
  # Reverted to original implementation attempt using Mutex.
  @transmit_semaphore = Mutex.new
  @read_semaphore = Mutex.new
  @socket_semaphore = Mutex.new

  @subscriptions = {}
  @failure = nil
  @connection_attempts = 0

  socket
end
open(login = '', passcode = '', host = 'localhost', port = 61613, reliable = false, reconnect_delay = 5, connect_headers = {}) click to toggle source

open is syntactic sugar for 'Connection.new', see 'initialize' for usage.

# File lib/stomp/connection.rb, line 145
def Connection.open(login = '', passcode = '', host = 'localhost', port = 61613, reliable = false, reconnect_delay = 5, connect_headers = {})
  Connection.new(login, passcode, host, port, reliable, reconnect_delay, connect_headers)
end

Public Instance Methods

abort(name, headers = {}) click to toggle source

Abort aborts a transaction by name.

# File lib/stomp/connection.rb, line 231
def abort(name, headers = {})
  raise Stomp::Error::NoCurrentConnection if @closed_check && closed?
  headers = headers.symbolize_keys
  headers[:transaction] = name
  _headerCheck(headers)
  transmit(Stomp::CMD_ABORT, headers)
end
ack(message_id, headers = {}) click to toggle source

Acknowledge a message, used when a subscription has specified client acknowledgement i.e. connection.subscribe(“/queue/a”, :ack => 'client'). Accepts an optional transaction header ( :transaction => 'some_transaction_id' ) Behavior is protocol level dependent, see the specifications or comments below.

# File lib/stomp/connection.rb, line 172
def ack(message_id, headers = {})
  raise Stomp::Error::NoCurrentConnection if @closed_check && closed?
  raise Stomp::Error::MessageIDRequiredError if message_id.nil? || message_id == ""
  headers = headers.symbolize_keys

  case @protocol
    when Stomp::SPL_12
      # The ACK frame MUST include an id header matching the ack header 
      # of the MESSAGE being acknowledged.
      headers[:id] = message_id
    when Stomp::SPL_11
      # ACK has two REQUIRED headers: message-id, which MUST contain a value 
      # matching the message-id for the MESSAGE being acknowledged and 
      # subscription, which MUST be set to match the value of the subscription's 
      # id header.
      headers[:'message-id'] = message_id
      raise Stomp::Error::SubscriptionRequiredError unless headers[:subscription]
    else # Stomp::SPL_10
      # ACK has one required header, message-id, which must contain a value 
      # matching the message-id for the MESSAGE being acknowledged.
      headers[:'message-id'] = message_id
  end
  _headerCheck(headers)
  transmit(Stomp::CMD_ACK, headers)
end
begin(name, headers = {}) click to toggle source

Begin starts a transaction, and requires a name for the transaction

# File lib/stomp/connection.rb, line 160
def begin(name, headers = {})
  raise Stomp::Error::NoCurrentConnection if @closed_check && closed?
  headers = headers.symbolize_keys
  headers[:transaction] = name
  _headerCheck(headers)
  transmit(Stomp::CMD_BEGIN, headers)
end
client_ack?(message) click to toggle source

client_ack? determines if headers contain :ack => “client”.

# File lib/stomp/connection.rb, line 335
def client_ack?(message)
  headers = @subscriptions[message.headers[:destination]]
  !headers.nil? && headers[:ack] == "client"
end
closed?() click to toggle source

closed? tests if this connection is closed.

# File lib/stomp/connection.rb, line 155
def closed?
  @closed
end
commit(name, headers = {}) click to toggle source

Commit commits a transaction by name.

# File lib/stomp/connection.rb, line 222
def commit(name, headers = {})
  raise Stomp::Error::NoCurrentConnection if @closed_check && closed?
  headers = headers.symbolize_keys
  headers[:transaction] = name
  _headerCheck(headers)
  transmit(Stomp::CMD_COMMIT, headers)
end
disconnect(headers = {}) click to toggle source

disconnect closes this connection. If requested, a disconnect RECEIPT will be received.

# File lib/stomp/connection.rb, line 342
def disconnect(headers = {})
  raise Stomp::Error::NoCurrentConnection if @closed_check && closed?
  headers = headers.symbolize_keys
  _headerCheck(headers)
  if @protocol >= Stomp::SPL_11
    @st.kill if @st # Kill ticker thread if any
    @rt.kill if @rt # Kill ticker thread if any
  end
  transmit(Stomp::CMD_DISCONNECT, headers)
  @disconnect_receipt = receive if headers[:receipt]
  if @logger && @logger.respond_to?(:on_disconnect)
    @logger.on_disconnect(log_params)
  end
  close_socket
end
hashed_initialize(params) click to toggle source

#hashed_initialize prepares a new connection with a Hash of initialization parameters.

# File lib/stomp/connection.rb, line 126
def hashed_initialize(params)

  @parameters = refine_params(params)
  @reliable =  @parameters[:reliable]
  @reconnect_delay = @parameters[:initial_reconnect_delay]
  @connect_headers = @parameters[:connect_headers]
  @parse_timeout =  @parameters[:parse_timeout]
  @connect_timeout =  @parameters[:connect_timeout]
  @logger =  @parameters[:logger]
  @autoflush = @parameters[:autoflush]
  @closed_check = @parameters[:closed_check]
  @hbser = @parameters[:hbser]
  @stompconn = @parameters[:stompconn]
  @usecrlf = @parameters[:usecrlf]
  #sets the first host to connect
  change_host
end
hbrecv_count() click to toggle source

#hbrecv_count returns the current connection's heartbeat receive count.

# File lib/stomp/connection.rb, line 443
def hbrecv_count()
  return 0 unless @hbrecv_count
  @hbrecv_count
end
hbrecv_interval() click to toggle source

#hbrecv_interval returns the connection's heartbeat receive interval.

# File lib/stomp/connection.rb, line 431
def hbrecv_interval()
  return 0 unless @hbrecv_interval
  @hbrecv_interval / 1000.0 # ms
end
hbsend_count() click to toggle source

#hbsend_count returns the current connection's heartbeat send count.

# File lib/stomp/connection.rb, line 437
def hbsend_count()
  return 0 unless @hbsend_count
  @hbsend_count
end
hbsend_interval() click to toggle source

#hbsend_interval returns the connection's heartbeat send interval.

# File lib/stomp/connection.rb, line 425
def hbsend_interval()
  return 0 unless @hbsend_interval
  @hbsend_interval / 1000.0 # ms
end
nack(message_id, headers = {}) click to toggle source

STOMP 1.1+ NACK.

# File lib/stomp/connection.rb, line 199
def nack(message_id, headers = {})
  raise Stomp::Error::NoCurrentConnection if @closed_check && closed?
  raise Stomp::Error::UnsupportedProtocolError if @protocol == Stomp::SPL_10
  raise Stomp::Error::MessageIDRequiredError if message_id.nil? || message_id == ""
  headers = headers.symbolize_keys
  case @protocol
    when Stomp::SPL_12
      # The ACK frame MUST include an id header matching the ack header 
      # of the MESSAGE being acknowledged.
      headers[:id] = message_id
    else # Stomp::SPL_11 only
      # ACK has two REQUIRED headers: message-id, which MUST contain a value 
      # matching the message-id for the MESSAGE being acknowledged and 
      # subscription, which MUST be set to match the value of the subscription's 
      # id header.
      headers[:'message-id'] = message_id
      raise Stomp::Error::SubscriptionRequiredError unless headers[:subscription]
  end
  _headerCheck(headers)
  transmit(Stomp::CMD_NACK, headers)
end
open?() click to toggle source

open? tests if this connection is open.

# File lib/stomp/connection.rb, line 150
def open?
  !@closed
end
poll() click to toggle source

poll returns a pending message if one is available, otherwise returns nil.

# File lib/stomp/connection.rb, line 360
def poll()
  raise Stomp::Error::NoCurrentConnection if @closed_check && closed?
  # No need for a read lock here.  The receive method eventually fulfills
  # that requirement.
  return nil if @socket.nil? || !@socket.ready?
  receive()
end
publish(destination, message, headers = {}) click to toggle source

Publish message to destination. To disable content length header use header ( :suppress_content_length => true ). Accepts a transaction header ( :transaction => 'some_transaction_id' ).

# File lib/stomp/connection.rb, line 284
def publish(destination, message, headers = {})
  raise Stomp::Error::NoCurrentConnection if @closed_check && closed?
  headers = headers.symbolize_keys
  headers[:destination] = destination
  _headerCheck(headers)
  if @logger && @logger.respond_to?(:on_publish)
    @logger.on_publish(log_params, message, headers)
  end
  transmit(Stomp::CMD_SEND, headers, message)
end
receive() click to toggle source

receive returns the next Message off of the wire.

# File lib/stomp/connection.rb, line 369
def receive()
  raise Stomp::Error::NoCurrentConnection if @closed_check && closed?
  super_result = __old_receive
  if super_result.nil? && @reliable && !closed?
    errstr = "connection.receive returning EOF as nil - resetting connection.\n"
    if @logger && @logger.respond_to?(:on_miscerr)
      @logger.on_miscerr(log_params, "es_recv: " + errstr)
    else
      $stderr.print errstr
    end
    @socket = nil
    super_result = __old_receive
  end
  #
  if @logger && @logger.respond_to?(:on_receive)
    @logger.on_receive(log_params, super_result)
  end
  return super_result
end
set_logger(logger) click to toggle source

#set_logger selects a new callback logger instance.

# File lib/stomp/connection.rb, line 390
def set_logger(logger)
  @logger = logger
end
sha1(data) click to toggle source

sha1 returns a SHA1 digest for arbitrary string data.

# File lib/stomp/connection.rb, line 406
def sha1(data)
  Digest::SHA1.hexdigest(data)
end
subscribe(name, headers = {}, subId = nil) click to toggle source

Subscribe subscribes to a destination. A subscription name is required. For Stomp 1.1+ a session unique subscription ID is also required.

# File lib/stomp/connection.rb, line 241
def subscribe(name, headers = {}, subId = nil)
  raise Stomp::Error::NoCurrentConnection if @closed_check && closed?
  headers = headers.symbolize_keys
  headers[:destination] = name
  if @protocol >= Stomp::SPL_11
    raise Stomp::Error::SubscriptionRequiredError if (headers[:id].nil? && subId.nil?)
    headers[:id] = subId if headers[:id].nil?
  end
  _headerCheck(headers)
  if @logger && @logger.respond_to?(:on_subscribe)
    @logger.on_subscribe(log_params, headers)
  end

  # Store the subscription so that we can replay if we reconnect.
  if @reliable
    subId = name if subId.nil?
    raise Stomp::Error::DuplicateSubscription if @subscriptions[subId]
    @subscriptions[subId] = headers
  end

  transmit(Stomp::CMD_SUBSCRIBE, headers)
end
unreceive(message, options = {}) click to toggle source

Send a message back to the source or to the dead letter queue. Accepts a dead letter queue option ( :dead_letter_queue => “/queue/DLQ” ). Accepts a limit number of redeliveries option ( :max_redeliveries => 6 ). Accepts a force client acknowledgement option (:force_client_ack => true).

# File lib/stomp/connection.rb, line 299
def unreceive(message, options = {})
  raise Stomp::Error::NoCurrentConnection if @closed_check && closed?
  options = { :dead_letter_queue => "/queue/DLQ", :max_redeliveries => 6 }.merge(options)
  # Lets make sure all keys are symbols
  message.headers = message.headers.symbolize_keys

  retry_count = message.headers[:retry_count].to_i || 0
  message.headers[:retry_count] = retry_count + 1
  transaction_id = "transaction-#{message.headers[:'message-id']}-#{retry_count}"
  message_id = message.headers.delete(:'message-id')

  begin
    self.begin transaction_id

    if client_ack?(message) || options[:force_client_ack]
      self.ack(message_id, :transaction => transaction_id)
    end

    if retry_count <= options[:max_redeliveries]
      self.publish(message.headers[:destination], message.body, 
        message.headers.merge(:transaction => transaction_id))
    else
      # Poison ack, sending the message to the DLQ
      self.publish(options[:dead_letter_queue], message.body, 
        message.headers.merge(:transaction => transaction_id, 
        :original_destination => message.headers[:destination], 
        :persistent => true))
    end
    self.commit transaction_id
  rescue Exception => exception
    self.abort transaction_id
    raise exception
  end
end
unsubscribe(dest, headers = {}, subId = nil) click to toggle source

Unsubscribe from a destination. A subscription name is required. For Stomp 1.1+ a session unique subscription ID is also required.

# File lib/stomp/connection.rb, line 266
def unsubscribe(dest, headers = {}, subId = nil)
  raise Stomp::Error::NoCurrentConnection if @closed_check && closed?
  headers = headers.symbolize_keys
  headers[:destination] = dest
  if @protocol >= Stomp::SPL_11
    raise Stomp::Error::SubscriptionRequiredError if (headers[:id].nil? && subId.nil?)
  end
  _headerCheck(headers)
  transmit(Stomp::CMD_UNSUBSCRIBE, headers)
  if @reliable
    subId = dest if subId.nil?
    @subscriptions.delete(subId)
  end
end
uuid() click to toggle source

uuid returns a type 4 UUID.

# File lib/stomp/connection.rb, line 411
def uuid()
  b = []
  0.upto(15) do |i|
    b << rand(255)
  end
  b[6] = (b[6] & 0x0F) | 0x40
  b[8] = (b[8] & 0xbf) | 0x80
  #             0  1  2  3   4   5  6  7   8  9  10 11 12 13 14 15
  rs = sprintf("%02x%02x%02x%02x-%02x%02x-%02x%02x-%02x%02x%02x%02x%02x%02x%02x%02x",
  b[0], b[1], b[2], b[3], b[4], b[5], b[6], b[7], b[8], b[9], b[10], b[11], b[12], b[13], b[14], b[15])
  rs
end
valid_utf8?(s) click to toggle source

valid_utf8? returns an indicator if the given string is a valid UTF8 string.

# File lib/stomp/connection.rb, line 395
def valid_utf8?(s)
  case RUBY_VERSION
  when /1\.8/
    rv = _valid_utf8?(s)
  else
    rv = s.encoding.name != Stomp::UTF8 ? false : s.valid_encoding?
  end
  rv
end

Private Instance Methods

__old_receive() click to toggle source

#__old_receive receives a frame, blocks until the frame is received.

# File lib/connection/utils.rb, line 216
def __old_receive()
  # The receive may fail so we may need to retry.
  while TRUE
    begin
      used_socket = socket()
      return _receive(used_socket)
    rescue
      @failure = $!
      raise unless @reliable
      errstr = "receive failed: #{$!}"
      if @logger && @logger.respond_to?(:on_miscerr)
        @logger.on_miscerr(log_params, "es_oldrecv: " + errstr)
      else
        $stderr.print errstr
      end
    end
  end
end
_decodeHeaders(h) click to toggle source

decode returns a Hash of decoded headers per the Stomp 1.1 specification.

# File lib/connection/utf8.rb, line 272
def _decodeHeaders(h)
  dh = {}
  h.each_pair do |k,v|
    # Keys here are NOT! symbolized
    if v.is_a?(Array)
      kdec = Stomp::HeaderCodec::decode(k)
      dh[kdec] = []
      v.each do |e|
        dh[kdec] << Stomp::HeaderCodec::decode(e)
      end
    else
      vs = v.to_s
      dh[Stomp::HeaderCodec::decode(k)] = Stomp::HeaderCodec::decode(vs)
    end
  end
  dh
end
_encodeHeaders(h) click to toggle source

encode returns a Hash of encoded headers per the Stomp 1.1 specification.

# File lib/connection/utf8.rb, line 252
def _encodeHeaders(h)
  eh = {}
  h.each_pair do |k,v|
    # Keys are symbolized
    ks = k.to_s
    if v.is_a?(Array)
      kenc = Stomp::HeaderCodec::encode(ks)
      eh[kenc] = []
      v.each do |e|
        eh[kenc] << Stomp::HeaderCodec::encode(e)
      end
    else
      vs = v.to_s
      eh[Stomp::HeaderCodec::encode(ks)] = Stomp::HeaderCodec::encode(vs)
    end
  end
  eh
end
_expand_hosts(hash) click to toggle source

Support multi-homed servers.

# File lib/connection/utils.rb, line 14
def _expand_hosts(hash)
  new_hash = hash.clone
  new_hash[:hosts_cloned] = hash[:hosts].clone
  new_hash[:hosts] = []
  #
  hash[:hosts].each do |host_parms|
    ai = Socket.getaddrinfo(host_parms[:host], nil, nil, Socket::SOCK_STREAM)
    next if ai.nil? || ai.size == 0
    info6 = ai.detect {|info| info[4] == Socket::AF_INET6}
    info4 = ai.detect {|info| info[4] == Socket::AF_INET}
    if info6
      new_hostp = host_parms.clone
      new_hostp[:host] = info6[3]
      new_hash[:hosts] << new_hostp
    end
    if info4
      new_hostp = host_parms.clone
      new_hostp[:host] = info4[3]
      new_hash[:hosts] << new_hostp
    end
  end
  return new_hash
end
_headerCheck(h) click to toggle source

Stomp 1.1+ header check for UTF8 validity. Raises Stomp::Error::UTF8ValidationError if header data is not valid UTF8.

# File lib/connection/utf8.rb, line 228
def _headerCheck(h)
  return if @protocol == Stomp::SPL_10 # Do nothing for this environment
  #
  h.each_pair do |k,v|
    # Keys here are symbolized
    ks = k.to_s
    ks.force_encoding(Stomp::UTF8) if ks.respond_to?(:force_encoding)
    raise Stomp::Error::UTF8ValidationError unless valid_utf8?(ks)
    #
    if v.is_a?(Array)
      v.each do |e|
        e.force_encoding(Stomp::UTF8) if e.respond_to?(:force_encoding)
        raise Stomp::Error::UTF8ValidationError unless valid_utf8?(e)
      end
    else
      vs = v.to_s + "" # Values are usually Strings, but could be TrueClass or Symbol
      # The + "" above forces an 'unfreeze' if necessary
      vs.force_encoding(Stomp::UTF8) if vs.respond_to?(:force_encoding)
      raise Stomp::Error::UTF8ValidationError unless valid_utf8?(vs)
    end
  end
end
_init_heartbeats() click to toggle source
# File lib/connection/heartbeats.rb, line 21
def _init_heartbeats()
  return if @connect_headers[:"heart-beat"] == "0,0" # Caller does not want heartbeats.  OK.

  # Init.

  #
  @cx = @cy = @sx = @sy = 0   # Variable names as in spec

  #
  @hbsend_interval = @hbrecv_interval = 0.0 # Send/Receive ticker interval.

  #
  @hbsend_count = @hbrecv_count = 0 # Send/Receive ticker counts.

  #
  @ls = @lr = -1.0 # Last send/receive time (from Time.now.to_f)

  #
  @st = @rt = nil # Send/receive ticker thread

  # Handle current client / server capabilities.

  #
  cfh = @connection_frame.headers.symbolize_keys
  return if cfh[:"heart-beat"] == "0,0" # Server does not want heartbeats

  # Conect header parts.
  parts = @connect_headers[:"heart-beat"].split(",")
  @cx = parts[0].to_i
  @cy = parts[1].to_i

  # Connected frame header parts.
  parts = cfh[:"heart-beat"].split(",")
  @sx = parts[0].to_i
  @sy = parts[1].to_i

  # Catch odd situations like server has used => heart-beat:000,00000
  return if (@cx == 0 && @cy == 0) || (@sx == 0 && @sy == 0)

  # See if we are doing anything at all.

  @hbs = @hbr = true # Sending/Receiving heartbeats. Assume yes at first.
  # Check if sending is possible.
  @hbs = false if @cx == 0 || @sy == 0  # Reset if neither side wants
  # Check if receiving is possible.
  @hbr = false if @sx == 0 || @cy == 0  # Reset if neither side wants

  # Check if we should not do heartbeats at all
  return if (!@hbs && !@hbr)

  # If sending
  if @hbs
    sm = @cx >= @sy ? @cx : @sy     # ticker interval, ms
    @hbsend_interval = 1000.0 * sm  # ticker interval, μs
    @ls = Time.now.to_f             # best guess at start
    _start_send_ticker()
  end

  # If receiving
  if @hbr
    rm = @sx >= @cy ? @sx : @cy     # ticker interval, ms
    @hbrecv_interval = 1000.0 * rm  # ticker interval, μs
    @lr = Time.now.to_f             # best guess at start
    _start_receive_ticker()
  end

end
_normalize_line_end(line) click to toggle source

Normalize line ends because 1.2+ brokers can send 'mixed mode' headers, i.e.:

  • Some headers end with 'n'

  • Other headers end with 'rn'

# File lib/connection/netio.rb, line 97
def _normalize_line_end(line)
  return line unless @usecrlf
  # p [ "nleln", line ]
  line_len = line.respond_to?(:bytesize) ? line.bytesize : line.length
  last2 = line[line_len-2...line_len]
  # p [ "nlel2", last2 ]
  return line unless last2 == "\r\n"
  return line[0...line_len-2] + "\n"
end
_post_connect() click to toggle source

#_post_connect handles low level logic just after a physical connect.

# File lib/connection/utils.rb, line 84
def _post_connect()
  return unless (@connect_headers[:"accept-version"] && @connect_headers[:host]) # 1.0
  if @connection_frame.command == Stomp::CMD_ERROR
    @connection_frame.headers = _decodeHeaders(@connection_frame.headers)
    return
  end
  # We are CONNECTed
  cfh = @connection_frame.headers.symbolize_keys
  @protocol = cfh[:version]
  if @protocol
    # Should not happen, but check anyway
    raise Stomp::Error::UnsupportedProtocolError unless Stomp::SUPPORTED.index(@protocol)
  else # CONNECTed to a 1.0 server that does not return *any* 1.1 type headers
    @protocol = Stomp::SPL_10 # reset
    return
  end
  # Heartbeats
  return unless @connect_headers[:"heart-beat"]
  _init_heartbeats()
end
_pre_connect() click to toggle source

#_pre_connect handles low level logic just prior to a physical connect.

# File lib/connection/utils.rb, line 61
def _pre_connect()
  @connect_headers = @connect_headers.symbolize_keys
  raise Stomp::Error::ProtocolErrorConnect if (@connect_headers[:"accept-version"] && !@connect_headers[:host])
  raise Stomp::Error::ProtocolErrorConnect if (!@connect_headers[:"accept-version"] && @connect_headers[:host])
  return unless (@connect_headers[:"accept-version"] && @connect_headers[:host]) # 1.0
  # Try 1.1 or greater
  @hhas10 = false
  okvers = []
  avers = @connect_headers[:"accept-version"].split(",")
  avers.each do |nver|
    if Stomp::SUPPORTED.index(nver)
      okvers << nver
      @hhas10 = true if nver == Stomp::SPL_10
    end
  end
  raise Stomp::Error::UnsupportedProtocolError if okvers == []
  @connect_headers[:"accept-version"] = okvers.join(",") # This goes to server
  # Heartbeats - pre connect
  return unless @connect_headers[:"heart-beat"]
  _validate_hbheader()
end
_receive(read_socket) click to toggle source

Really read from the wire.

# File lib/connection/netio.rb, line 14
def _receive(read_socket)
  @read_semaphore.synchronize do
    line = ''
    if @protocol == Stomp::SPL_10 || (@protocol >= Stomp::SPL_11 && !@hbr)
      line = read_socket.gets # The old way
    else # We are >= 1.1 *AND* receiving heartbeats.
      while true
        line = read_socket.gets # Data from wire
        break unless line == "\n"
        line = ''
        @lr = Time.now.to_f
      end
    end
    return nil if line.nil?
    # p [ "wiredatain_01", line ]
    line = _normalize_line_end(line) if @protocol >= Stomp::SPL_12

    # If the reading hangs for more than X seconds, abort the parsing process.
    # X defaults to 5.  Override allowed in connection hash parameters.
    Timeout::timeout(@parse_timeout, Stomp::Error::PacketParsingTimeout) do
      # Reads the beginning of the message until it runs into a empty line
      message_header = ''
      begin
        message_header += line
        line = read_socket.gets
        # p [ "wiredatain_02", line ]
        raise Stomp::Error::StompServerError if line.nil?
        line = _normalize_line_end(line) if @protocol >= Stomp::SPL_12
      end until line =~ /^\s?\n$/

      # Checks if it includes content_length header
      content_length = message_header.match /content-length\s?:\s?(\d+)\s?\n/
      message_body = ''

      # If content_length is present, read the specified amount of bytes
      if content_length
        message_body = read_socket.read content_length[1].to_i
        raise Stomp::Error::InvalidMessageLength unless parse_char(read_socket.getc) == "\00""
        # Else read the rest of the message until the first \0
      else
        message_body = read_socket.readline("\00"")
        message_body.chop!
      end

      # If the buffer isn't empty, reads trailing new lines.
      #
      # Note: experiments with JRuby seem to show that .ready? never
      # returns true.  This means that this code to drain trailing new
      # lines never runs using JRuby.
      #
      # Note 2: the draining of new lines must be done _after_ a message
      # is read.  Do _not_ leave them on the wire and attempt to drain them
      # at the start of the next read.  Attempting to do that breaks the
      # asynchronous nature of the 'poll' method.
      while read_socket.ready?
        last_char = read_socket.getc
        break unless last_char
        if parse_char(last_char) != "\n"
          read_socket.ungetc(last_char)
          break
        end
      end
      # And so, a JRuby hack.  Remove any new lines at the start of the
      # next buffer.
      message_header.gsub!(/^\n?/, "")

      if @protocol >= Stomp::SPL_11
        @lr = Time.now.to_f if @hbr
      end
      # Adds the excluded \n and \0 and tries to create a new message with it
      msg = Message.new(message_header + "\n" + message_body + "\00"", @protocol >= Stomp::SPL_11)
      #
      if @protocol >= Stomp::SPL_11 && msg.command != Stomp::CMD_CONNECTED
        msg.headers = _decodeHeaders(msg.headers)
      end
      msg
    end
  end
end
_start_receive_ticker() click to toggle source

#_start_receive_ticker starts a thread that receives heartbeats when required.

# File lib/connection/heartbeats.rb, line 129
def _start_receive_ticker()
  sleeptime = @hbrecv_interval / 1000000.0 # Sleep time secs
  @rt = Thread.new {
    while true do
      sleep sleeptime
      curt = Time.now.to_f
      if @logger && @logger.respond_to?(:on_hbfire)
        @logger.on_hbfire(log_params, "receive_fire", curt)
      end
      delta = curt - @lr
      if delta > ((@hbrecv_interval + (@hbrecv_interval/5.0)) / 1000000.0) # Be tolerant (plus)
        if @logger && @logger.respond_to?(:on_hbfire)
          @logger.on_hbfire(log_params, "receive_heartbeat", curt)
        end
        # Client code could be off doing something else (that is, no reading of
        # the socket has been requested by the caller).  Try to  handle that case.
        lock = @read_semaphore.try_lock
        if lock
          last_char = @socket.getc
          plc = parse_char(last_char)
          if plc == "\n" # Server Heartbeat
            @lr = Time.now.to_f
          else
            @socket.ungetc(last_char)
          end
          @read_semaphore.unlock
          @hbrecv_count += 1
        else
          # Shrug.  Have not received one.  Just set warning flag.
          @hb_received = false
          if @logger && @logger.respond_to?(:on_hbread_fail)
            @logger.on_hbread_fail(log_params, {"ticker_interval" => @hbrecv_interval})
          end
        end
      else
        @hb_received = true # Reset if necessary
      end
      Thread.pass
    end
  }
end
_start_send_ticker() click to toggle source

#_start_send_ticker starts a thread to send heartbeats when required.

# File lib/connection/heartbeats.rb, line 90
def _start_send_ticker()
  sleeptime = @hbsend_interval / 1000000.0 # Sleep time secs
  @st = Thread.new {
    while true do
      sleep sleeptime
      curt = Time.now.to_f
      if @logger && @logger.respond_to?(:on_hbfire)
        @logger.on_hbfire(log_params, "send_fire", curt)
      end
      delta = curt - @ls
      if delta > (@hbsend_interval - (@hbsend_interval/5.0)) / 1000000.0 # Be tolerant (minus)
        if @logger && @logger.respond_to?(:on_hbfire)
          @logger.on_hbfire(log_params, "send_heartbeat", curt)
        end
        # Send a heartbeat
        @transmit_semaphore.synchronize do
          begin
            @socket.puts
            @ls = curt      # Update last send
            @hb_sent = true # Reset if necessary
            @hbsend_count += 1
          rescue Exception => sendex
            @hb_sent = false # Set the warning flag
            if @logger && @logger.respond_to?(:on_hbwrite_fail)
              @logger.on_hbwrite_fail(log_params, {"ticker_interval" => @hbsend_interval,
              "exception" => sendex})
            end
            if @hbser
              raise # Re-raise if user requested this, otherwise ignore
            end
          end
        end
      end
      Thread.pass
    end
  }
end
_transmit(used_socket, command, headers = {}, body = '') click to toggle source

_transmit is the real wire write logic.

# File lib/connection/netio.rb, line 131
def _transmit(used_socket, command, headers = {}, body = '')
  if @protocol >= Stomp::SPL_11 && command != Stomp::CMD_CONNECT
    headers = _encodeHeaders(headers)
  end
  @transmit_semaphore.synchronize do
    # Handle nil body
    body = '' if body.nil?
    # The content-length should be expressed in bytes.
    # Ruby 1.8: String#length => # of bytes; Ruby 1.9: String#length => # of characters
    # With Unicode strings, # of bytes != # of characters.  So, use String#bytesize when available.
    body_length_bytes = body.respond_to?(:bytesize) ? body.bytesize : body.length

    # ActiveMQ interprets every message as a BinaryMessage
    # if content_length header is included.
    # Using :suppress_content_length => true will suppress this behaviour
    # and ActiveMQ will interpret the message as a TextMessage.
    # For more information refer to http://juretta.com/log/2009/05/24/activemq-jms-stomp/
    # Lets send this header in the message, so it can maintain state when using unreceive
    headers[:'content-length'] = "#{body_length_bytes}" unless headers[:suppress_content_length]
    headers[:'content-type'] = "text/plain; charset=UTF-8" unless headers[:'content-type']
    _wire_write(used_socket,command)
    headers.each do |k,v|
      if v.is_a?(Array)
        v.each do |e|
          _wire_write(used_socket,"#{k}:#{e}")
        end
      else
        _wire_write(used_socket,"#{k}:#{v}")
      end
    end
    _wire_write(used_socket,"")
    used_socket.write body
    used_socket.write "\00""
    used_socket.flush if autoflush

    if @protocol >= Stomp::SPL_11
      @ls = Time.now.to_f if @hbs
    end

  end
end
_valid_utf8?(string) click to toggle source

Ref: unicode.org/mail-arch/unicode-ml/y2003-m02/att-0467/01-The_Algorithm_to_Valide_an_UTF-8_String

CONSIDER replacing this with a dependency on the utf8_validator gem. This code has been copied from there.

# File lib/connection/utf8.rb, line 19
def _valid_utf8?(string)
  case RUBY_VERSION
  when /1\.8\.[56]/
    bytes = []
    0.upto(string.length-1) {|i|
      bytes << string[i]
    }
  else
    bytes = string.bytes
  end

  #
  valid = true
  index = -1
  nb_hex = nil
  ni_hex = nil
  state = "start"
  next_byte_save = nil
  #
  bytes.each do |next_byte|
    index += 1
    next_byte_save = next_byte
    ni_hex = sprintf "%x", index
    nb_hex = sprintf "%x", next_byte
    # puts "Top: #{next_byte}(0x#{nb_hex}), index: #{index}(0x#{ni_hex})" if DEBUG
    case state

      # State: 'start'
      # The 'start' state:
      # * handles all occurrences of valid single byte characters i.e., the ASCII character set
      # * provides state transition logic for start bytes of valid characters with 2-4 bytes
      # * signals a validation failure for all other single bytes
      #
    when "start"
      # puts "state: start" if DEBUG
      case next_byte

        # ASCII
        # * Input = 0x00-0x7F : change state to START
      when (0x00..0x7f)
        # puts "state: start 1" if DEBUG
        state = "start"

        # Start byte of two byte characters
        # * Input = 0xC2-0xDF: change state to A
      when (0xc2..0xdf)
        # puts "state: start 2" if DEBUG
        state = "a"

        # Start byte of some three byte characters
        # * Input = 0xE1-0xEC, 0xEE-0xEF: change state to B
      when (0xe1..0xec)
        # puts "state: start 3" if DEBUG
        state = "b"
      when (0xee..0xef)
        # puts "state: start 4" if DEBUG
        state = "b"

        # Start byte of special three byte characters
        # * Input = 0xE0: change state to C
      when 0xe0
        # puts "state: start 5" if DEBUG
        state = "c"

        # Start byte of the remaining three byte characters
        # * Input = 0xED: change state to D
      when 0xed
        # puts "state: start 6" if DEBUG
        state = "d"

        # Start byte of some four byte characters
        # * Input = 0xF1-0xF3:change state to E
      when (0xf1..0xf3)
        # puts "state: start 7" if DEBUG
        state = "e"

        # Start byte of special four byte characters
        # * Input = 0xF0: change state to F
      when 0xf0
        # puts "state: start 8" if DEBUG
        state = "f"

        # Start byte of very special four byte characters
        # * Input = 0xF4: change state to G
      when 0xf4
        # puts "state: start 9" if DEBUG
        state = "g"

        # All other single characters are invalid
        # * Input = Others (0x80-0xBF,0xC0-0xC1, 0xF5-0xFF): ERROR
      else
        valid = false
        break
      end # of the inner case, the 'start' state

      # The last continuation byte of a 2, 3, or 4 byte character
      # State: 'a'
      #  o Input = 0x80-0xBF: change state to START
      #  o Others: ERROR
    when "a"
      # puts "state: a" if DEBUG
      if (0x80..0xbf) === next_byte
        state = "start"
      else
        valid = false
        break
      end

      # The first continuation byte for most 3 byte characters
      # (those with start bytes in: 0xe1-0xec or 0xee-0xef)
      # State: 'b'
      # o Input = 0x80-0xBF: change state to A
      # o Others: ERROR
    when "b"
      # puts "state: b" if DEBUG
      if (0x80..0xbf) === next_byte
        state = "a"
      else
        valid = false
        break
      end

      # The first continuation byte for some special 3 byte characters
      # (those with start byte 0xe0)
      # State: 'c'
      # o Input = 0xA0-0xBF: change state to A
      # o Others: ERROR
    when "c"
      # puts "state: c" if DEBUG
      if (0xa0..0xbf) === next_byte
        state = "a"
      else
        valid = false
        break
      end

      # The first continuation byte for the remaining 3 byte characters
      # (those with start byte 0xed)
      # State: 'd'
      # o Input = 0x80-0x9F: change state to A
      # o Others: ERROR
    when "d"
      # puts "state: d" if DEBUG
      if (0x80..0x9f) === next_byte
        state = "a"
      else
        valid = false
        break
      end

      # The first continuation byte for some 4 byte characters
      # (those with start bytes in: 0xf1-0xf3)
      # State: 'e'
      # o Input = 0x80-0xBF: change state to B
      # o Others: ERROR
    when "e"
      # puts "state: e" if DEBUG
      if (0x80..0xbf) === next_byte
        state = "b"
      else
        valid = false
        break
      end

      # The first continuation byte for some special 4 byte characters
      # (those with start byte 0xf0)
      # State: 'f'
      # o Input = 0x90-0xBF: change state to B
      # o Others: ERROR
    when "f"
      # puts "state: f" if DEBUG
      if (0x90..0xbf) === next_byte
        state = "b"
      else
        valid = false
        break
      end

      # The first continuation byte for the remaining 4 byte characters
      # (those with start byte 0xf4)
      # State: 'g'
      # o Input = 0x80-0x8F: change state to B
      # o Others: ERROR
    when "g"
      # puts "state: g" if DEBUG
      if (0x80..0x8f) === next_byte
        state = "b"
      else
        valid = false
        break
      end

      #
    else
      raise RuntimeError, "state: default"
    end
  end
  #
  # puts "State at end: #{state}" if DEBUG
  # Catch truncation at end of string
  if valid and state != 'start'
    # puts "Resetting valid value" if DEBUG
    valid = false
  end
  #
  valid
end
_validate_hbheader() click to toggle source
# File lib/connection/heartbeats.rb, line 13
def _validate_hbheader()
  return if @connect_headers[:"heart-beat"] == "0,0" # Caller does not want heartbeats.  OK.
  parts = @connect_headers[:"heart-beat"].split(",")
  if (parts.size != 2) || (parts[0] != parts[0].to_i.to_s) || (parts[1] != parts[1].to_i.to_s)
    raise Stomp::Error::InvalidHeartBeatHeaderError
  end
end
_wire_write(sock, data) click to toggle source

Use CRLF if protocol is >= 1.2, and the client requested CRLF

# File lib/connection/netio.rb, line 174
def _wire_write(sock, data)
  # p [ "debug_01", @protocol, @usecrlf ]
  if @protocol >= Stomp::SPL_12 && @usecrlf
    wiredata = "#{data}#{Stomp::CR}#{Stomp::LF}"
    # p [ "wiredataout_01:", wiredata ]
    sock.write(wiredata)
  else
    # p [ "wiredataout_02:", "#{data}\n" ]
    sock.puts data
  end
end
change_host() click to toggle source

#change_host selects the next host for retires.

# File lib/connection/utils.rb, line 184
def change_host
  @parameters[:hosts] = @parameters[:hosts].sort_by { rand } if @parameters[:randomize]

  # Set first as master and send it to the end of array
  current_host = @parameters[:hosts].shift
  @parameters[:hosts] << current_host

  @ssl = current_host[:ssl]
  @host = current_host[:host]
  @port = current_host[:port] || Connection::default_port(@ssl)
  @login = current_host[:login] || ""
  @passcode = current_host[:passcode] || ""

end
close_socket() click to toggle source

#close_socket closes the current open socket, and hence the connection.

# File lib/connection/netio.rb, line 307
def close_socket()
  begin
    # Need to set @closed = true before closing the socket
    # within the @read_semaphore thread
    @closed = true
    @read_semaphore.synchronize do
      @socket.close
    end
  rescue
    #Ignoring if already closed
  end
  @closed
end
connect(used_socket) click to toggle source

connect performs a basic STOMP CONNECT operation.

# File lib/connection/netio.rb, line 337
def connect(used_socket)
  @connect_headers = {} unless @connect_headers # Caller said nil/false
  headers = @connect_headers.clone
  headers[:login] = @login
  headers[:passcode] = @passcode
  _pre_connect
  if !@hhas10 && @stompconn
    _transmit(used_socket, Stomp::CMD_STOMP, headers)
  else
    _transmit(used_socket, Stomp::CMD_CONNECT, headers)
  end
  @connection_frame = _receive(used_socket)
  _post_connect
  @disconnect_receipt = nil
  @session = @connection_frame.headers["session"] if @connection_frame
  # replay any subscriptions.
  @subscriptions.each { |k,v| _transmit(used_socket, Stomp::CMD_SUBSCRIBE, v) }
end
increase_reconnect_delay() click to toggle source

#increase_reconnect_delay increases the reconnect delay for the next connection attempt.

# File lib/connection/utils.rb, line 207
def increase_reconnect_delay

  @reconnect_delay *= @parameters[:back_off_multiplier] if @parameters[:use_exponential_back_off]
  @reconnect_delay = @parameters[:max_reconnect_delay] if @reconnect_delay > @parameters[:max_reconnect_delay]

  @reconnect_delay
end
log_params() click to toggle source

Create parameters for any callback logger.

# File lib/connection/utils.rb, line 44
def log_params()
  lparms = @parameters.clone if @parameters
  lparms = {} unless lparms
  lparms[:cur_host] = @host
  lparms[:cur_port] = @port
  lparms[:cur_login] = @login
  lparms[:cur_passcode] = @passcode
  lparms[:cur_ssl] = @ssl
  lparms[:cur_recondelay] = @reconnect_delay
  lparms[:cur_parseto] = @parse_timeout
  lparms[:cur_conattempts] = @connection_attempts
  lparms[:openstat] = open?
  #
  lparms
end
max_reconnect_attempts?() click to toggle source

max_reconnect_attempts? returns nil or the number of maximum reconnect attempts.

# File lib/connection/utils.rb, line 201
def max_reconnect_attempts?
  !(@parameters.nil? || @parameters[:max_reconnect_attempts].nil?) && @parameters[:max_reconnect_attempts] != 0 && @connection_attempts >= @parameters[:max_reconnect_attempts]
end
open_socket() click to toggle source

#open_socket opens a TCP or SSL soclet as required.

# File lib/connection/netio.rb, line 322
def open_socket()
  used_socket = @ssl ? open_ssl_socket : open_tcp_socket
  # try to close the old connection if any
  close_socket

  @closed = false
  if @parameters # nil in some rspec tests
    @reconnect_delay = @parameters[:initial_reconnect_delay] ? @parameters[:initial_reconnect_delay] : 0.01
  end
  # Use keepalive
  used_socket.setsockopt(Socket::SOL_SOCKET, Socket::SO_KEEPALIVE, true)
  used_socket
end
open_ssl_socket() click to toggle source

#open_ssl_socket opens an SSL socket.

# File lib/connection/netio.rb, line 202
def open_ssl_socket()
  require 'openssl' unless defined?(OpenSSL)
  begin # Any raised SSL exceptions
    ctx = OpenSSL::SSL::SSLContext.new
    ctx.verify_mode = OpenSSL::SSL::VERIFY_NONE # Assume for now
    #
    # Note: if a client uses :ssl => true this results in the gem using
    # the _default_ Ruby ciphers list.  This is _known_ to fail in later
    # Ruby releases.  The gem provides a default cipher list that may
    # function in these cases.  To use this connect with:
    # * :ssl => Stomp::SSLParams.new
    # * :ssl => Stomp::SSLParams.new(..., :ciphers => Stomp::DEFAULT_CIPHERS)
    #
    # If connecting with an SSLParams instance, and the _default_ Ruby
    # ciphers list is required, use:
    # * :ssl => Stomp::SSLParams.new(..., :use_ruby_ciphers => true)
    #
    # If a custom ciphers list is required, connect with:
    # * :ssl => Stomp::SSLParams.new(..., :ciphers => custom_ciphers_list)
    #
    if @ssl != true
      #
      # Here @ssl is:
      # * an instance of Stomp::SSLParams
      # Control would not be here if @ssl == false or @ssl.nil?.
      #

      # Back reference the SSLContext
      @ssl.ctx = ctx

      # Server authentication parameters if required
      if @ssl.ts_files
        ctx.verify_mode = OpenSSL::SSL::VERIFY_PEER
        truststores = OpenSSL::X509::Store.new
        fl = @ssl.ts_files.split(",")
        fl.each do |fn|
          # Add next cert file listed
          raise Stomp::Error::SSLNoTruststoreFileError if !File::exists?(fn)
          raise Stomp::Error::SSLUnreadableTruststoreFileError if !File::readable?(fn)
          truststores.add_file(fn)
        end
        ctx.cert_store = truststores
      end

      # Client authentication parameters.
      # Both cert file and key file must be present or not, it can not be a mix.
      raise Stomp::Error::SSLClientParamsError if @ssl.cert_file.nil? && !@ssl.key_file.nil?
      raise Stomp::Error::SSLClientParamsError if !@ssl.cert_file.nil? && @ssl.key_file.nil?
      if @ssl.cert_file # Any check will do here
        raise Stomp::Error::SSLNoCertFileError if !File::exists?(@ssl.cert_file)
        raise Stomp::Error::SSLUnreadableCertFileError if !File::readable?(@ssl.cert_file)
        ctx.cert = OpenSSL::X509::Certificate.new(File.read(@ssl.cert_file))
        raise Stomp::Error::SSLNoKeyFileError if !File::exists?(@ssl.key_file)
        raise Stomp::Error::SSLUnreadableKeyFileError if !File::readable?(@ssl.key_file)
        ctx.key  = OpenSSL::PKey::RSA.new(File.read(@ssl.key_file), @ssl.key_password)
      end

      # Cipher list
      if !@ssl.use_ruby_ciphers # No Ruby ciphers (the default)
        if @ssl.ciphers # User ciphers list?
          ctx.ciphers = @ssl.ciphers # Accept user supplied ciphers
        else
          ctx.ciphers = Stomp::DEFAULT_CIPHERS # Just use Stomp defaults
        end
      end
    end

    #
    ssl = nil
    if @logger && @logger.respond_to?(:on_ssl_connecting)
      @logger.on_ssl_connecting(log_params)
    end

    Timeout::timeout(@connect_timeout, Stomp::Error::SocketOpenTimeout) do
      ssl = OpenSSL::SSL::SSLSocket.new(open_tcp_socket, ctx)
      ssl.connect
    end
    def ssl.ready?
      ! @rbuffer.empty? || @io.ready?
    end
    if @ssl != true
      # Pass back results if possible
      if RUBY_VERSION =~ /1\.8\.[56]/
        @ssl.verify_result = "N/A for Ruby #{RUBY_VERSION}"
      else
        @ssl.verify_result = ssl.verify_result
      end
      @ssl.peer_cert = ssl.peer_cert
    end
    if @logger && @logger.respond_to?(:on_ssl_connected)
      @logger.on_ssl_connected(log_params)
    end
    ssl
  rescue Exception => ex
    if @logger && @logger.respond_to?(:on_ssl_connectfail)
      lp = log_params.clone
      lp[:ssl_exception] = ex
      @logger.on_ssl_connectfail(lp)
    end
    #
    raise # Reraise
  end
end
open_tcp_socket() click to toggle source

#open_tcp_socket opens a TCP socket.

# File lib/connection/netio.rb, line 187
def open_tcp_socket()
  tcp_socket = nil

  if @logger && @logger.respond_to?(:on_connecting)
    @logger.on_connecting(log_params)
  end

  Timeout::timeout(@connect_timeout, Stomp::Error::SocketOpenTimeout) do
    tcp_socket = TCPSocket.open(@host, @port)
  end

  tcp_socket
end
parse_char(char) click to toggle source

Handle 1.9+ character representation.

# File lib/connection/utils.rb, line 39
def parse_char(char)
  RUBY_VERSION > '1.9' ? char : char.chr
end
refine_params(params) click to toggle source

#refine_params sets up defaults for a Hash initialize.

# File lib/connection/utils.rb, line 154
def refine_params(params)
  params = params.uncamelize_and_symbolize_keys
  default_params = {
    :connect_headers => {},
    :reliable => true,
    # Failover parameters
    :initial_reconnect_delay => 0.01,
    :max_reconnect_delay => 30.0,
    :use_exponential_back_off => true,
    :back_off_multiplier => 2,
    :max_reconnect_attempts => 0,
    :randomize => false,
    :connect_timeout => 0,
    # Parse Timeout
    :parse_timeout => 5,
    :dmh => false,
    # Closed check logic
    :closed_check => true,
    :hbser => false,
    :stompconn => false,
  }

  res_params = default_params.merge(params)
  if res_params[:dmh]
    res_params = _expand_hosts(res_params)
  end
  return res_params
end
socket() click to toggle source

socket creates and returns a new socket for use by the connection.

# File lib/connection/utils.rb, line 106
def socket()
  @socket_semaphore.synchronize do
    used_socket = @socket
    used_socket = nil if closed?

    while used_socket.nil? || !@failure.nil?
      @failure = nil
      begin
        used_socket = open_socket() # sets @closed = false if OK
        # Open is complete
        connect(used_socket)
        if @logger && @logger.respond_to?(:on_connected)
          @logger.on_connected(log_params)
        end
        @connection_attempts = 0
      rescue
        @failure = $!
        used_socket = nil
        raise unless @reliable
        raise if @failure.is_a?(Stomp::Error::LoggerConnectionError)
        @closed = true
        if @logger && @logger.respond_to?(:on_connectfail)
          # on_connectfail may raise
          begin
            @logger.on_connectfail(log_params)
          rescue Exception => aex
            raise if aex.is_a?(Stomp::Error::LoggerConnectionError)
          end
        else
          $stderr.print "connect to #{@host} failed: #{$!} will retry(##{@connection_attempts}) in #{@reconnect_delay}\n"
        end
        raise Stomp::Error::MaxReconnectAttempts if max_reconnect_attempts?

        sleep(@reconnect_delay)

        @connection_attempts += 1

        if @parameters
          change_host()
          increase_reconnect_delay()
        end
      end
    end
    @socket = used_socket
  end
end
transmit(command, headers = {}, body = '') click to toggle source

transmit logically puts a Message on the wire.

# File lib/connection/netio.rb, line 108
def transmit(command, headers = {}, body = '')
  # The transmit may fail so we may need to retry.
  while TRUE
    begin
      used_socket = socket()
      _transmit(used_socket, command, headers, body)
      return
    rescue Stomp::Error::MaxReconnectAttempts => e
      raise
    rescue
      @failure = $!
      raise unless @reliable
      errstr = "transmit to #{@host} failed: #{$!}\n"
      if @logger && @logger.respond_to?(:on_miscerr)
        @logger.on_miscerr(log_params, "es_trans: " + errstr)
      else
        $stderr.print errstr
      end
    end
  end
end