class Stomp::Client

Typical Stomp client class. Uses a listener thread to receive frames from the server, any thread can send.

Receives all happen in one thread, so consider not doing much processing in that thread if you have much message volume.

Attributes

host[R]
login[R]
parameters[R]
passcode[R]
port[R]
reliable[R]

Public Class Methods

new(login = '', passcode = '', host = 'localhost', port = 61613, reliable = false) click to toggle source

A new Client object can be initialized using two forms:

Standard positional parameters:

login     (String,  default : '')
passcode  (String,  default : '')
host      (String,  default : 'localhost')
port      (Integer, default : 61613)
reliable  (Boolean, default : false)

e.g. c = Client.new('login', 'passcode', 'localhost', 61613, true)

Stomp URL :

A Stomp URL must begin with 'stomp://' and can be in one of the following forms:

stomp://host:port
stomp://host.domain.tld:port
stomp://login:passcode@host:port
stomp://login:passcode@host.domain.tld:port
# File lib/stomp/client.rb, line 37
def initialize(login = '', passcode = '', host = 'localhost', port = 61613, reliable = false)

  # Parse stomp:// URL's or set params
  if login.is_a?(Hash)
    @parameters = login
    
    first_host = @parameters[:hosts][0]
    
    @login = first_host[:login]
    @passcode = first_host[:passcode]
    @host = first_host[:host]
    @port = first_host[:port] || Connection::default_port(first_host[:ssl])
    
    @reliable = true
    
  elsif login =~ %r^stomp:\/\/#{url_regex}/ # e.g. stomp://login:passcode@host:port or stomp://host:port
    @login = $2 || ""
    @passcode = $3 || ""
    @host = $4
    @port = $5.to_i
    @reliable = false
  elsif login =~ %r^failover:(\/\/)?\(stomp(\+ssl)?:\/\/#{url_regex}(,stomp(\+ssl)?:\/\/#{url_regex}\))+(\?(.*))?$/ # e.g. failover://(stomp://login1:passcode1@localhost:61616,stomp://login2:passcode2@remotehost:61617)?option1=param

    first_host = {}
    first_host[:ssl] = !$2.nil?
    @login = first_host[:login] = $4 || ""
    @passcode = first_host[:passcode] = $5 || ""
    @host = first_host[:host] = $6
    @port = first_host[:port] = $7.to_i || Connection::default_port(first_host[:ssl])
    
    options = $16 || ""
    parts = options.split(%r&|=/)
    options = Hash[*parts]
    
    hosts = [first_host] + parse_hosts(login)
    
    @parameters = {}
    @parameters[:hosts] = hosts
    
    @parameters.merge! filter_options(options)
            
    @reliable = true
  else
    @login = login
    @passcode = passcode
    @host = host
    @port = port.to_i
    @reliable = reliable
  end

  check_arguments!

  @id_mutex = Mutex.new
  @ids = 1

  if @parameters
    @connection = Connection.new(@parameters)
  else
    @connection = Connection.new(@login, @passcode, @host, @port, @reliable)
  end
  
  start_listeners

end
open(login = '', passcode = '', host = 'localhost', port = 61613, reliable = false) click to toggle source

Syntactic sugar for '::new' See 'initialize' for usage.

# File lib/stomp/client.rb, line 103
def self.open(login = '', passcode = '', host = 'localhost', port = 61613, reliable = false)
  Client.new(login, passcode, host, port, reliable)
end

Public Instance Methods

abort(name, headers = {}) click to toggle source

Abort a transaction by name

# File lib/stomp/client.rb, line 119
def abort(name, headers = {})
  @connection.abort(name, headers)

  # lets replay any ack'd messages in this transaction
  replay_list = @replay_messages_by_txn[name]
  if replay_list
    replay_list.each do |message|
      if listener = find_listener(message)
        listener.call(message)
      end
    end
  end
end
acknowledge(message, headers = {}) { |r| ... } click to toggle source

Acknowledge a message, used when a subscription has specified client acknowledgement ( connection.subscribe "/queue/a", :ack => 'client'g

Accepts a transaction header ( :transaction => 'some_transaction_id' )

# File lib/stomp/client.rb, line 168
def acknowledge(message, headers = {})
  txn_id = headers[:transaction]
  if txn_id
    # lets keep around messages ack'd in this transaction in case we rollback
    replay_list = @replay_messages_by_txn[txn_id]
    if replay_list.nil?
      replay_list = []
      @replay_messages_by_txn[txn_id] = replay_list
    end
    replay_list << message
  end
  if block_given?
    headers['receipt'] = register_receipt_listener lambda {|r| yield r}
  end
  @connection.ack message.headers['message-id'], headers
end
begin(name, headers = {}) click to toggle source

Begin a transaction by name

# File lib/stomp/client.rb, line 114
def begin(name, headers = {})
  @connection.begin(name, headers)
end
close(headers={}) click to toggle source

Close out resources in use by this client

# File lib/stomp/client.rb, line 232
def close headers={}
  @listener_thread.exit
  @connection.disconnect headers
end
closed?() click to toggle source

Is this client closed?

# File lib/stomp/client.rb, line 227
def closed?
  @connection.closed?
end
commit(name, headers = {}) click to toggle source

Commit a transaction by name

# File lib/stomp/client.rb, line 134
def commit(name, headers = {})
  txn_id = headers[:transaction]
  @replay_messages_by_txn.delete(txn_id)
  @connection.commit(name, headers)
end
connection_frame() click to toggle source
# File lib/stomp/client.rb, line 213
def connection_frame
  @connection.connection_frame
end
disconnect_receipt() click to toggle source
# File lib/stomp/client.rb, line 217
def disconnect_receipt
  @connection.disconnect_receipt
end
join(limit = nil) click to toggle source

Join the listener thread for this client, generally used to wait for a quit signal

# File lib/stomp/client.rb, line 109
def join(limit = nil)
  @listener_thread.join(limit)
end
nack(message_id, headers = {}) click to toggle source

Stomp 1.1+ NACK

# File lib/stomp/client.rb, line 186
def nack(message_id, headers = {})
  @connection.nack message_id, headers
end
obj_send(*args) click to toggle source
# File lib/stomp/client.rb, line 209
def obj_send(*args)
  __send__(*args)
end
open?() click to toggle source

Is this client open?

# File lib/stomp/client.rb, line 222
def open?
  @connection.open?
end
protocol() click to toggle source

Convenience method

# File lib/stomp/client.rb, line 248
def protocol()
  @connection.protocol
end
publish(destination, message, headers = {}) { |r| ... } click to toggle source

Publishes message to destination

If a block is given a receipt will be requested and passed to the block on receipt

Accepts a transaction header ( :transaction => 'some_transaction_id' )

# File lib/stomp/client.rb, line 202
def publish(destination, message, headers = {})
  if block_given?
    headers['receipt'] = register_receipt_listener lambda {|r| yield r}
  end
  @connection.publish(destination, message, headers)
end
running() click to toggle source

Check if the thread was created and isn't dead

# File lib/stomp/client.rb, line 238
def running
  @listener_thread && !!@listener_thread.status
end
set_logger(logger) click to toggle source

Convenience method

# File lib/stomp/client.rb, line 243
def set_logger(logger)
  @connection.set_logger(logger)
end
sha1(data) click to toggle source

Convenience method for clients

# File lib/stomp/client.rb, line 258
def sha1(data)
  @connection.sha1(data)
end
subscribe(destination, headers = {}) { |msg| ... } click to toggle source

Subscribe to a destination, must be passed a block which will be used as a callback listener

Accepts a transaction header ( :transaction => 'some_transaction_id' )

# File lib/stomp/client.rb, line 144
def subscribe(destination, headers = {})
  raise "No listener given" unless block_given?
  # use subscription id to correlate messages to subscription. As described in
  # the SUBSCRIPTION section of the protocol: http://stomp.github.com/.
  # If no subscription id is provided, generate one.
  set_subscription_id_if_missing(destination, headers)
  if @listeners[headers[:id]]
    raise "attempting to subscribe to a queue with a previous subscription"
  end
  @listeners[headers[:id]] = lambda {|msg| yield msg}
  @connection.subscribe(destination, headers)
end
unreceive(message, options = {}) click to toggle source

Unreceive a message, sending it back to its queue or to the DLQ

# File lib/stomp/client.rb, line 192
def unreceive(message, options = {})
  @connection.unreceive(message, options)
end
unsubscribe(name, headers = {}) click to toggle source

Unsubecribe from a channel

# File lib/stomp/client.rb, line 158
def unsubscribe(name, headers = {})
  set_subscription_id_if_missing(name, headers)
  @connection.unsubscribe(name, headers)
  @listeners[headers[:id]] = nil
end
uuid() click to toggle source

Convenience method for clients

# File lib/stomp/client.rb, line 263
def uuid()
  @connection.uuid()
end
valid_utf8?(s) click to toggle source

Convenience method

# File lib/stomp/client.rb, line 253
def valid_utf8?(s)
  @connection.valid_utf8?(s)
end