Package proton :: Module reactor
[frames] | no frames]

Source Code for Module proton.reactor

  1  from __future__ import absolute_import 
  2  # 
  3  # Licensed to the Apache Software Foundation (ASF) under one 
  4  # or more contributor license agreements.  See the NOTICE file 
  5  # distributed with this work for additional information 
  6  # regarding copyright ownership.  The ASF licenses this file 
  7  # to you under the Apache License, Version 2.0 (the 
  8  # "License"); you may not use this file except in compliance 
  9  # with the License.  You may obtain a copy of the License at 
 10  # 
 11  #   http://www.apache.org/licenses/LICENSE-2.0 
 12  # 
 13  # Unless required by applicable law or agreed to in writing, 
 14  # software distributed under the License is distributed on an 
 15  # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
 16  # KIND, either express or implied.  See the License for the 
 17  # specific language governing permissions and limitations 
 18  # under the License. 
 19  # 
 20  import logging, os, socket, time, types 
 21  from heapq import heappush, heappop, nsmallest 
 22  from proton import Collector, Connection, ConnectionException, Delivery, Described, dispatch 
 23  from proton import Endpoint, Event, EventBase, EventType, generate_uuid, Handler, Link, Message 
 24  from proton import ProtonException, PN_ACCEPTED, PN_PYREF, SASL, Session, SSL, SSLDomain, SSLUnavailable, symbol 
 25  from proton import Terminus, Timeout, Transport, TransportException, ulong, Url 
 26  from select import select 
 27  from proton.handlers import OutgoingMessageHandler 
 28  from proton import unicode2utf8, utf82unicode 
 29   
 30  import traceback 
 31  from proton import WrappedHandler, _chandler, secs2millis, millis2secs, timeout2millis, millis2timeout, Selectable 
 32  from .wrapper import Wrapper, PYCTX 
 33  from cproton import * 
 34  from . import _compat 
 35   
 36  try: 
 37      import Queue 
 38  except ImportError: 
 39      import queue as Queue 
40 41 -class Task(Wrapper):
42 43 @staticmethod
44 - def wrap(impl):
45 if impl is None: 46 return None 47 else: 48 return Task(impl)
49
50 - def __init__(self, impl):
51 Wrapper.__init__(self, impl, pn_task_attachments)
52
53 - def _init(self):
54 pass
55
56 - def cancel(self):
57 pn_task_cancel(self._impl)
58
59 -class Acceptor(Wrapper):
60
61 - def __init__(self, impl):
62 Wrapper.__init__(self, impl)
63
64 - def set_ssl_domain(self, ssl_domain):
65 pn_acceptor_set_ssl_domain(self._impl, ssl_domain._domain)
66
67 - def close(self):
68 pn_acceptor_close(self._impl)
69
70 -class Reactor(Wrapper):
71 72 @staticmethod
73 - def wrap(impl):
74 if impl is None: 75 return None 76 else: 77 record = pn_reactor_attachments(impl) 78 attrs = pn_void2py(pn_record_get(record, PYCTX)) 79 if attrs and 'subclass' in attrs: 80 return attrs['subclass'](impl=impl) 81 else: 82 return Reactor(impl=impl)
83
84 - def __init__(self, *handlers, **kwargs):
85 Wrapper.__init__(self, kwargs.get("impl", pn_reactor), pn_reactor_attachments) 86 for h in handlers: 87 self.handler.add(h)
88
89 - def _init(self):
90 self.errors = []
91
92 - def on_error(self, info):
93 self.errors.append(info) 94 self.yield_()
95
96 - def _get_global(self):
97 return WrappedHandler.wrap(pn_reactor_get_global_handler(self._impl), self.on_error)
98
99 - def _set_global(self, handler):
100 impl = _chandler(handler, self.on_error) 101 pn_reactor_set_global_handler(self._impl, impl) 102 pn_decref(impl)
103 104 global_handler = property(_get_global, _set_global) 105
106 - def _get_timeout(self):
107 return millis2timeout(pn_reactor_get_timeout(self._impl))
108
109 - def _set_timeout(self, secs):
110 return pn_reactor_set_timeout(self._impl, timeout2millis(secs))
111 112 timeout = property(_get_timeout, _set_timeout) 113
114 - def yield_(self):
115 pn_reactor_yield(self._impl)
116
117 - def mark(self):
118 return pn_reactor_mark(self._impl)
119
120 - def _get_handler(self):
121 return WrappedHandler.wrap(pn_reactor_get_handler(self._impl), self.on_error)
122
123 - def _set_handler(self, handler):
124 impl = _chandler(handler, self.on_error) 125 pn_reactor_set_handler(self._impl, impl) 126 pn_decref(impl)
127 128 handler = property(_get_handler, _set_handler) 129
130 - def run(self):
131 self.timeout = 3.14159265359 132 self.start() 133 while self.process(): pass 134 self.stop()
135
136 - def wakeup(self):
137 n = pn_reactor_wakeup(self._impl) 138 if n: raise IOError(pn_error_text(pn_io_error(pn_reactor_io(self._impl))))
139
140 - def start(self):
141 pn_reactor_start(self._impl)
142 143 @property
144 - def quiesced(self):
145 return pn_reactor_quiesced(self._impl)
146
147 - def _check_errors(self):
148 if self.errors: 149 for exc, value, tb in self.errors[:-1]: 150 traceback.print_exception(exc, value, tb) 151 exc, value, tb = self.errors[-1] 152 _compat.raise_(exc, value, tb)
153
154 - def process(self):
155 result = pn_reactor_process(self._impl) 156 self._check_errors() 157 return result
158
159 - def stop(self):
160 pn_reactor_stop(self._impl) 161 self._check_errors()
162
163 - def schedule(self, delay, task):
164 impl = _chandler(task, self.on_error) 165 task = Task.wrap(pn_reactor_schedule(self._impl, secs2millis(delay), impl)) 166 pn_decref(impl) 167 return task
168
169 - def acceptor(self, host, port, handler=None):
170 impl = _chandler(handler, self.on_error) 171 aimpl = pn_reactor_acceptor(self._impl, unicode2utf8(host), str(port), impl) 172 pn_decref(impl) 173 if aimpl: 174 return Acceptor(aimpl) 175 else: 176 raise IOError("%s (%s:%s)" % (pn_error_text(pn_io_error(pn_reactor_io(self._impl))), host, port))
177
178 - def connection(self, handler=None):
179 impl = _chandler(handler, self.on_error) 180 result = Connection.wrap(pn_reactor_connection(self._impl, impl)) 181 pn_decref(impl) 182 return result
183
184 - def selectable(self, handler=None):
185 impl = _chandler(handler, self.on_error) 186 result = Selectable.wrap(pn_reactor_selectable(self._impl)) 187 if impl: 188 record = pn_selectable_attachments(result._impl) 189 pn_record_set_handler(record, impl) 190 pn_decref(impl) 191 return result
192
193 - def update(self, sel):
194 pn_reactor_update(self._impl, sel._impl)
195
196 - def push_event(self, obj, etype):
197 pn_collector_put(pn_reactor_collector(self._impl), PN_PYREF, pn_py2void(obj), etype.number)
198 199 from proton import wrappers as _wrappers 200 _wrappers["pn_reactor"] = lambda x: Reactor.wrap(pn_cast_pn_reactor(x)) 201 _wrappers["pn_task"] = lambda x: Task.wrap(pn_cast_pn_task(x))
202 203 204 -class EventInjector(object):
205 """ 206 Can be added to a reactor to allow events to be triggered by an 207 external thread but handled on the event thread associated with 208 the reactor. An instance of this class can be passed to the 209 Reactor.selectable() method of the reactor in order to activate 210 it. The close() method should be called when it is no longer 211 needed, to allow the event loop to end if needed. 212 """
213 - def __init__(self):
214 self.queue = Queue.Queue() 215 self.pipe = os.pipe() 216 self._closed = False
217
218 - def trigger(self, event):
219 """ 220 Request that the given event be dispatched on the event thread 221 of the reactor to which this EventInjector was added. 222 """ 223 self.queue.put(event) 224 os.write(self.pipe[1], _compat.str2bin("!"))
225
226 - def close(self):
227 """ 228 Request that this EventInjector be closed. Existing events 229 will be dispctahed on the reactors event dispactch thread, 230 then this will be removed from the set of interest. 231 """ 232 self._closed = True 233 os.write(self.pipe[1], _compat.str2bin("!"))
234
235 - def fileno(self):
236 return self.pipe[0]
237
238 - def on_selectable_init(self, event):
239 sel = event.context 240 sel.fileno(self.fileno()) 241 sel.reading = True 242 event.reactor.update(sel)
243
244 - def on_selectable_readable(self, event):
245 os.read(self.pipe[0], 512) 246 while not self.queue.empty(): 247 requested = self.queue.get() 248 event.reactor.push_event(requested.context, requested.type) 249 if self._closed: 250 s = event.context 251 s.terminate() 252 event.reactor.update(s)
253
254 255 -class ApplicationEvent(EventBase):
256 """ 257 Application defined event, which can optionally be associated with 258 an engine object and or an arbitrary subject 259 """
260 - def __init__(self, typename, connection=None, session=None, link=None, delivery=None, subject=None):
261 super(ApplicationEvent, self).__init__(PN_PYREF, self, EventType(typename)) 262 self.connection = connection 263 self.session = session 264 self.link = link 265 self.delivery = delivery 266 if self.delivery: 267 self.link = self.delivery.link 268 if self.link: 269 self.session = self.link.session 270 if self.session: 271 self.connection = self.session.connection 272 self.subject = subject
273
274 - def __repr__(self):
275 objects = [self.connection, self.session, self.link, self.delivery, self.subject] 276 return "%s(%s)" % (typename, ", ".join([str(o) for o in objects if o is not None]))
277
278 -class Transaction(object):
279 """ 280 Class to track state of an AMQP 1.0 transaction. 281 """
282 - def __init__(self, txn_ctrl, handler, settle_before_discharge=False):
283 self.txn_ctrl = txn_ctrl 284 self.handler = handler 285 self.id = None 286 self._declare = None 287 self._discharge = None 288 self.failed = False 289 self._pending = [] 290 self.settle_before_discharge = settle_before_discharge 291 self.declare()
292
293 - def commit(self):
294 self.discharge(False)
295
296 - def abort(self):
297 self.discharge(True)
298
299 - def declare(self):
300 self._declare = self._send_ctrl(symbol(u'amqp:declare:list'), [None])
301
302 - def discharge(self, failed):
303 self.failed = failed 304 self._discharge = self._send_ctrl(symbol(u'amqp:discharge:list'), [self.id, failed])
305
306 - def _send_ctrl(self, descriptor, value):
307 delivery = self.txn_ctrl.send(Message(body=Described(descriptor, value))) 308 delivery.transaction = self 309 return delivery
310
311 - def send(self, sender, msg, tag=None):
312 dlv = sender.send(msg, tag=tag) 313 dlv.local.data = [self.id] 314 dlv.update(0x34) 315 return dlv
316
317 - def accept(self, delivery):
318 self.update(delivery, PN_ACCEPTED) 319 if self.settle_before_discharge: 320 delivery.settle() 321 else: 322 self._pending.append(delivery)
323
324 - def update(self, delivery, state=None):
325 if state: 326 delivery.local.data = [self.id, Described(ulong(state), [])] 327 delivery.update(0x34)
328
329 - def _release_pending(self):
330 for d in self._pending: 331 d.update(Delivery.RELEASED) 332 d.settle() 333 self._clear_pending()
334
335 - def _clear_pending(self):
336 self._pending = []
337
338 - def handle_outcome(self, event):
339 if event.delivery == self._declare: 340 if event.delivery.remote.data: 341 self.id = event.delivery.remote.data[0] 342 self.handler.on_transaction_declared(event) 343 elif event.delivery.remote_state == Delivery.REJECTED: 344 self.handler.on_transaction_declare_failed(event) 345 else: 346 logging.warning("Unexpected outcome for declare: %s" % event.delivery.remote_state) 347 self.handler.on_transaction_declare_failed(event) 348 elif event.delivery == self._discharge: 349 if event.delivery.remote_state == Delivery.REJECTED: 350 if not self.failed: 351 self.handler.on_transaction_commit_failed(event) 352 self._release_pending() # make this optional? 353 else: 354 if self.failed: 355 self.handler.on_transaction_aborted(event) 356 self._release_pending() 357 else: 358 self.handler.on_transaction_committed(event) 359 self._clear_pending()
360
361 -class LinkOption(object):
362 """ 363 Abstract interface for link configuration options 364 """
365 - def apply(self, link):
366 """ 367 Subclasses will implement any configuration logic in this 368 method 369 """ 370 pass
371 - def test(self, link):
372 """ 373 Subclasses can override this to selectively apply an option 374 e.g. based on some link criteria 375 """ 376 return True
377
378 -class AtMostOnce(LinkOption):
379 - def apply(self, link):
381
382 -class AtLeastOnce(LinkOption):
383 - def apply(self, link):
386
387 -class SenderOption(LinkOption):
388 - def apply(self, sender): pass
389 - def test(self, link): return link.is_sender
390
391 -class ReceiverOption(LinkOption):
392 - def apply(self, receiver): pass
393 - def test(self, link): return link.is_receiver
394
395 -class DynamicNodeProperties(LinkOption):
396 - def __init__(self, props={}):
397 self.properties = {} 398 for k in props: 399 if isinstance(k, symbol): 400 self.properties[k] = props[k] 401 else: 402 self.properties[symbol(k)] = props[k]
403
404 - def apply(self, link):
409
410 -class Filter(ReceiverOption):
411 - def __init__(self, filter_set={}):
412 self.filter_set = filter_set
413
414 - def apply(self, receiver):
415 receiver.source.filter.put_dict(self.filter_set)
416
417 -class Selector(Filter):
418 """ 419 Configures a link with a message selector filter 420 """
421 - def __init__(self, value, name='selector'):
422 super(Selector, self).__init__({symbol(name): Described(symbol('apache.org:selector-filter:string'), value)})
423
424 -class DurableSubscription(ReceiverOption):
425 - def apply(self, receiver):
428
429 -class Move(ReceiverOption):
430 - def apply(self, receiver):
432
433 -class Copy(ReceiverOption):
434 - def apply(self, receiver):
436 444
445 -def _create_session(connection, handler=None):
446 session = connection.session() 447 session.open() 448 return session
449
450 451 -def _get_attr(target, name):
452 if hasattr(target, name): 453 return getattr(target, name) 454 else: 455 return None
456
457 -class SessionPerConnection(object):
458 - def __init__(self):
459 self._default_session = None
460
461 - def session(self, connection):
462 if not self._default_session: 463 self._default_session = _create_session(connection) 464 self._default_session.context = self 465 return self._default_session
466
467 - def on_session_remote_close(self, event):
468 event.connection.close() 469 self._default_session = None
470
471 -class GlobalOverrides(object):
472 """ 473 Internal handler that triggers the necessary socket connect for an 474 opened connection. 475 """
476 - def __init__(self, base):
477 self.base = base
478
479 - def on_unhandled(self, name, event):
480 if not self._override(event): 481 event.dispatch(self.base)
482
483 - def _override(self, event):
484 conn = event.connection 485 return conn and hasattr(conn, '_overrides') and event.dispatch(conn._overrides)
486
487 -class Connector(Handler):
488 """ 489 Internal handler that triggers the necessary socket connect for an 490 opened connection. 491 """
492 - def __init__(self, connection):
493 self.connection = connection 494 self.address = None 495 self.heartbeat = None 496 self.reconnect = None 497 self.ssl_domain = None 498 self.allow_insecure_mechs = True 499 self.allowed_mechs = None
500
501 - def _connect(self, connection):
502 url = self.address.next() 503 # IoHandler uses the hostname to determine where to try to connect to 504 connection.hostname = "%s:%s" % (url.host, url.port) 505 logging.info("connecting to %s..." % connection.hostname) 506 507 if url.username: 508 connection.user = url.username 509 if url.password: 510 connection.password = url.password 511 transport = Transport() 512 sasl = transport.sasl() 513 sasl.allow_insecure_mechs = self.allow_insecure_mechs 514 if self.allowed_mechs: 515 sasl.allowed_mechs(self.allowed_mechs) 516 transport.bind(connection) 517 if self.heartbeat: 518 transport.idle_timeout = self.heartbeat 519 if url.scheme == 'amqps' and self.ssl_domain: 520 self.ssl = SSL(transport, self.ssl_domain) 521 self.ssl.peer_hostname = url.host
522
523 - def on_connection_local_open(self, event):
524 self._connect(event.connection)
525
526 - def on_connection_remote_open(self, event):
527 logging.info("connected to %s" % event.connection.hostname) 528 if self.reconnect: 529 self.reconnect.reset() 530 self.transport = None
531
532 - def on_transport_tail_closed(self, event):
533 self.on_transport_closed(event)
534
535 - def on_transport_closed(self, event):
536 if self.connection and self.connection.state & Endpoint.LOCAL_ACTIVE: 537 if self.reconnect: 538 event.transport.unbind() 539 delay = self.reconnect.next() 540 if delay == 0: 541 logging.info("Disconnected, reconnecting...") 542 self._connect(self.connection) 543 else: 544 logging.info("Disconnected will try to reconnect after %s seconds" % delay) 545 event.reactor.schedule(delay, self) 546 else: 547 logging.info("Disconnected") 548 self.connection = None
549
550 - def on_timer_task(self, event):
551 self._connect(self.connection)
552
553 - def on_connection_remote_close(self, event):
554 self.connection = None
555
556 -class Backoff(object):
557 """ 558 A reconnect strategy involving an increasing delay between 559 retries, up to a maximum or 10 seconds. 560 """
561 - def __init__(self):
562 self.delay = 0
563
564 - def reset(self):
565 self.delay = 0
566
567 - def next(self):
568 current = self.delay 569 if current == 0: 570 self.delay = 0.1 571 else: 572 self.delay = min(10, 2*current) 573 return current
574
575 -class Urls(object):
576 - def __init__(self, values):
577 self.values = [Url(v) for v in values] 578 self.i = iter(self.values)
579
580 - def __iter__(self):
581 return self
582
583 - def next(self):
584 try: 585 return next(self.i) 586 except StopIteration: 587 self.i = iter(self.values) 588 return next(self.i)
589
590 -class SSLConfig(object):
591 - def __init__(self):
592 self.client = SSLDomain(SSLDomain.MODE_CLIENT) 593 self.server = SSLDomain(SSLDomain.MODE_SERVER)
594
595 - def set_credentials(self, cert_file, key_file, password):
596 self.client.set_credentials(cert_file, key_file, password) 597 self.server.set_credentials(cert_file, key_file, password)
598
599 - def set_trusted_ca_db(self, certificate_db):
600 self.client.set_trusted_ca_db(certificate_db) 601 self.server.set_trusted_ca_db(certificate_db)
602
603 604 -class Container(Reactor):
605 """A representation of the AMQP concept of a 'container', which 606 lossely speaking is something that establishes links to or from 607 another container, over which messages are transfered. This is 608 an extension to the Reactor class that adds convenience methods 609 for creating connections and sender- or receiver- links. 610 """
611 - def __init__(self, *handlers, **kwargs):
612 super(Container, self).__init__(*handlers, **kwargs) 613 if "impl" not in kwargs: 614 try: 615 self.ssl = SSLConfig() 616 except SSLUnavailable: 617 self.ssl = None 618 self.global_handler = GlobalOverrides(kwargs.get('global_handler', self.global_handler)) 619 self.trigger = None 620 self.container_id = str(generate_uuid()) 621 self.allow_insecure_mechs = True 622 self.allowed_mechs = None 623 Wrapper.__setattr__(self, 'subclass', self.__class__)
624
625 - def connect(self, url=None, urls=None, address=None, handler=None, reconnect=None, heartbeat=None, ssl_domain=None):
626 """ 627 Initiates the establishment of an AMQP connection. Returns an 628 instance of proton.Connection. 629 """ 630 conn = self.connection(handler) 631 conn.container = self.container_id or str(generate_uuid()) 632 633 connector = Connector(conn) 634 connector.allow_insecure_mechs = self.allow_insecure_mechs 635 connector.allowed_mechs = self.allowed_mechs 636 conn._overrides = connector 637 if url: connector.address = Urls([url]) 638 elif urls: connector.address = Urls(urls) 639 elif address: connector.address = address 640 else: raise ValueError("One of url, urls or address required") 641 if heartbeat: 642 connector.heartbeat = heartbeat 643 if reconnect: 644 connector.reconnect = reconnect 645 elif reconnect is None: 646 connector.reconnect = Backoff() 647 connector.ssl_domain = ssl_domain or (self.ssl and self.ssl.client) 648 conn._session_policy = SessionPerConnection() #todo: make configurable 649 conn.open() 650 return conn
651
652 - def _get_id(self, container, remote, local):
653 if local and remote: "%s-%s-%s" % (container, remote, local) 654 elif local: return "%s-%s" % (container, local) 655 elif remote: return "%s-%s" % (container, remote) 656 else: return "%s-%s" % (container, str(generate_uuid()))
657
658 - def _get_session(self, context):
659 if isinstance(context, Url): 660 return self._get_session(self.connect(url=context)) 661 elif isinstance(context, Session): 662 return context 663 elif isinstance(context, Connection): 664 if hasattr(context, '_session_policy'): 665 return context._session_policy.session(context) 666 else: 667 return _create_session(context) 668 else: 669 return context.session()
670
671 - def create_sender(self, context, target=None, source=None, name=None, handler=None, tags=None, options=None):
672 """ 673 Initiates the establishment of a link over which messages can 674 be sent. Returns an instance of proton.Sender. 675 676 There are two patterns of use. (1) A connection can be passed 677 as the first argument, in which case the link is established 678 on that connection. In this case the target address can be 679 specified as the second argument (or as a keyword 680 argument). The source address can also be specified if 681 desired. (2) Alternatively a URL can be passed as the first 682 argument. In this case a new connection will be establised on 683 which the link will be attached. If a path is specified and 684 the target is not, then the path of the URL is used as the 685 target address. 686 687 The name of the link may be specified if desired, otherwise a 688 unique name will be generated. 689 690 Various LinkOptions can be specified to further control the 691 attachment. 692 """ 693 if isinstance(context, _compat.STRING_TYPES): 694 context = Url(context) 695 if isinstance(context, Url) and not target: 696 target = context.path 697 session = self._get_session(context) 698 snd = session.sender(name or self._get_id(session.connection.container, target, source)) 699 if source: 700 snd.source.address = source 701 if target: 702 snd.target.address = target 703 if handler: 704 snd.handler = handler 705 if tags: 706 snd.tag_generator = tags 707 _apply_link_options(options, snd) 708 snd.open() 709 return snd
710
711 - def create_receiver(self, context, source=None, target=None, name=None, dynamic=False, handler=None, options=None):
712 """ 713 Initiates the establishment of a link over which messages can 714 be received (aka a subscription). Returns an instance of 715 proton.Receiver. 716 717 There are two patterns of use. (1) A connection can be passed 718 as the first argument, in which case the link is established 719 on that connection. In this case the source address can be 720 specified as the second argument (or as a keyword 721 argument). The target address can also be specified if 722 desired. (2) Alternatively a URL can be passed as the first 723 argument. In this case a new connection will be establised on 724 which the link will be attached. If a path is specified and 725 the source is not, then the path of the URL is used as the 726 target address. 727 728 The name of the link may be specified if desired, otherwise a 729 unique name will be generated. 730 731 Various LinkOptions can be specified to further control the 732 attachment. 733 """ 734 if isinstance(context, _compat.STRING_TYPES): 735 context = Url(context) 736 if isinstance(context, Url) and not source: 737 source = context.path 738 session = self._get_session(context) 739 rcv = session.receiver(name or self._get_id(session.connection.container, source, target)) 740 if source: 741 rcv.source.address = source 742 if dynamic: 743 rcv.source.dynamic = True 744 if target: 745 rcv.target.address = target 746 if handler: 747 rcv.handler = handler 748 _apply_link_options(options, rcv) 749 rcv.open() 750 return rcv
751
752 - def declare_transaction(self, context, handler=None, settle_before_discharge=False):
753 if not _get_attr(context, '_txn_ctrl'): 754 class InternalTransactionHandler(OutgoingMessageHandler): 755 def __init__(self): 756 super(InternalTransactionHandler, self).__init__(auto_settle=True)
757 758 def on_settled(self, event): 759 if hasattr(event.delivery, "transaction"): 760 event.transaction = event.delivery.transaction 761 event.delivery.transaction.handle_outcome(event)
762 context._txn_ctrl = self.create_sender(context, None, name='txn-ctrl', handler=InternalTransactionHandler()) 763 context._txn_ctrl.target.type = Terminus.COORDINATOR 764 context._txn_ctrl.target.capabilities.put_object(symbol(u'amqp:local-transactions')) 765 return Transaction(context._txn_ctrl, handler, settle_before_discharge) 766
767 - def listen(self, url, ssl_domain=None):
768 """ 769 Initiates a server socket, accepting incoming AMQP connections 770 on the interface and port specified. 771 """ 772 url = Url(url) 773 acceptor = self.acceptor(url.host, url.port) 774 ssl_config = ssl_domain 775 if not ssl_config and url.scheme == 'amqps' and self.ssl: 776 ssl_config = self.ssl.server 777 if ssl_config: 778 acceptor.set_ssl_domain(ssl_config) 779 return acceptor
780
781 - def do_work(self, timeout=None):
782 if timeout: 783 self.timeout = timeout 784 return self.process()
785