Package proton
[frames] | no frames]

Source Code for Package proton

   1  # 
   2  # Licensed to the Apache Software Foundation (ASF) under one 
   3  # or more contributor license agreements.  See the NOTICE file 
   4  # distributed with this work for additional information 
   5  # regarding copyright ownership.  The ASF licenses this file 
   6  # to you under the Apache License, Version 2.0 (the 
   7  # "License"); you may not use this file except in compliance 
   8  # with the License.  You may obtain a copy of the License at 
   9  # 
  10  #   http://www.apache.org/licenses/LICENSE-2.0 
  11  # 
  12  # Unless required by applicable law or agreed to in writing, 
  13  # software distributed under the License is distributed on an 
  14  # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
  15  # KIND, either express or implied.  See the License for the 
  16  # specific language governing permissions and limitations 
  17  # under the License. 
  18  # 
  19   
  20  """ 
  21  The proton module defines a suite of APIs that implement the AMQP 1.0 
  22  protocol. 
  23   
  24  The proton APIs consist of the following classes: 
  25   
  26   - L{Messenger} -- A messaging endpoint. 
  27   - L{Message}   -- A class for creating and/or accessing AMQP message content. 
  28   - L{Data}      -- A class for creating and/or accessing arbitrary AMQP encoded 
  29                    data. 
  30   
  31  """ 
  32  from __future__ import absolute_import 
  33   
  34  from cproton import * 
  35  from .wrapper import Wrapper 
  36  from . import _compat 
  37   
  38  import weakref, socket, sys, threading 
  39   
  40  try: 
  41    import uuid 
42 43 - def generate_uuid():
44 return uuid.uuid4()
45 46 except ImportError: 47 """ 48 No 'native' UUID support. Provide a very basic UUID type that is a compatible subset of the uuid type provided by more modern python releases. 49 """ 50 import struct
51 - class uuid:
52 - class UUID:
53 - def __init__(self, hex=None, bytes=None):
54 if [hex, bytes].count(None) != 1: 55 raise TypeError("need one of hex or bytes") 56 if bytes is not None: 57 self.bytes = bytes 58 elif hex is not None: 59 fields=hex.split("-") 60 fields[4:5] = [fields[4][:4], fields[4][4:]] 61 self.bytes = struct.pack("!LHHHHL", *[int(x,16) for x in fields])
62
63 - def __cmp__(self, other):
64 if isinstance(other, uuid.UUID): 65 return cmp(self.bytes, other.bytes) 66 else: 67 return -1
68
69 - def __str__(self):
70 return "%08x-%04x-%04x-%04x-%04x%08x" % struct.unpack("!LHHHHL", self.bytes)
71
72 - def __repr__(self):
73 return "UUID(%r)" % str(self)
74
75 - def __hash__(self):
76 return self.bytes.__hash__()
77 78 import os, random, time 79 rand = random.Random() 80 rand.seed((os.getpid(), time.time(), socket.gethostname()))
81 - def random_uuid():
82 data = [rand.randint(0, 255) for i in xrange(16)] 83 84 # From RFC4122, the version bits are set to 0100 85 data[6] &= 0x0F 86 data[6] |= 0x40 87 88 # From RFC4122, the top two bits of byte 8 get set to 01 89 data[8] &= 0x3F 90 data[8] |= 0x80 91 return "".join(map(chr, data))
92
93 - def uuid4():
94 return uuid.UUID(bytes=random_uuid())
95
96 - def generate_uuid():
97 return uuid4()
98 99 # 100 # Hacks to provide Python2 <---> Python3 compatibility 101 # 102 try: 103 bytes() 104 except NameError: 105 bytes = str 106 try: 107 long() 108 except NameError: 109 long = int 110 try: 111 unicode() 112 except NameError: 113 unicode = str 114 115 116 VERSION_MAJOR = PN_VERSION_MAJOR 117 VERSION_MINOR = PN_VERSION_MINOR 118 API_LANGUAGE = "C" 119 IMPLEMENTATION_LANGUAGE = "C"
120 121 -class Constant(object):
122
123 - def __init__(self, name):
124 self.name = name
125
126 - def __repr__(self):
127 return self.name
128
129 -class ProtonException(Exception):
130 """ 131 The root of the proton exception hierarchy. All proton exception 132 classes derive from this exception. 133 """ 134 pass
135
136 -class Timeout(ProtonException):
137 """ 138 A timeout exception indicates that a blocking operation has timed 139 out. 140 """ 141 pass
142
143 -class Interrupt(ProtonException):
144 """ 145 An interrupt exception indicaes that a blocking operation was interrupted. 146 """ 147 pass
148
149 -class MessengerException(ProtonException):
150 """ 151 The root of the messenger exception hierarchy. All exceptions 152 generated by the messenger class derive from this exception. 153 """ 154 pass
155
156 -class MessageException(ProtonException):
157 """ 158 The MessageException class is the root of the message exception 159 hierarhcy. All exceptions generated by the Message class derive from 160 this exception. 161 """ 162 pass
163 164 EXCEPTIONS = { 165 PN_TIMEOUT: Timeout, 166 PN_INTR: Interrupt 167 } 168 169 PENDING = Constant("PENDING") 170 ACCEPTED = Constant("ACCEPTED") 171 REJECTED = Constant("REJECTED") 172 RELEASED = Constant("RELEASED") 173 MODIFIED = Constant("MODIFIED") 174 ABORTED = Constant("ABORTED") 175 SETTLED = Constant("SETTLED") 176 177 STATUSES = { 178 PN_STATUS_ABORTED: ABORTED, 179 PN_STATUS_ACCEPTED: ACCEPTED, 180 PN_STATUS_REJECTED: REJECTED, 181 PN_STATUS_RELEASED: RELEASED, 182 PN_STATUS_MODIFIED: MODIFIED, 183 PN_STATUS_PENDING: PENDING, 184 PN_STATUS_SETTLED: SETTLED, 185 PN_STATUS_UNKNOWN: None 186 } 187 188 AUTOMATIC = Constant("AUTOMATIC") 189 MANUAL = Constant("MANUAL")
190 191 -class Messenger(object):
192 """ 193 The L{Messenger} class defines a high level interface for sending 194 and receiving L{Messages<Message>}. Every L{Messenger} contains a 195 single logical queue of incoming messages and a single logical queue 196 of outgoing messages. These messages in these queues may be destined 197 for, or originate from, a variety of addresses. 198 199 The messenger interface is single-threaded. All methods 200 except one (L{interrupt}) are intended to be used from within 201 the messenger thread. 202 203 204 Address Syntax 205 ============== 206 207 An address has the following form:: 208 209 [ amqp[s]:// ] [user[:password]@] domain [/[name]] 210 211 Where domain can be one of:: 212 213 host | host:port | ip | ip:port | name 214 215 The following are valid examples of addresses: 216 217 - example.org 218 - example.org:1234 219 - amqp://example.org 220 - amqps://example.org 221 - example.org/incoming 222 - amqps://example.org/outgoing 223 - amqps://fred:trustno1@example.org 224 - 127.0.0.1:1234 225 - amqps://127.0.0.1:1234 226 227 Sending & Receiving Messages 228 ============================ 229 230 The L{Messenger} class works in conjuction with the L{Message} class. The 231 L{Message} class is a mutable holder of message content. 232 233 The L{put} method copies its L{Message} to the outgoing queue, and may 234 send queued messages if it can do so without blocking. The L{send} 235 method blocks until it has sent the requested number of messages, 236 or until a timeout interrupts the attempt. 237 238 239 >>> message = Message() 240 >>> for i in range(3): 241 ... message.address = "amqp://host/queue" 242 ... message.subject = "Hello World %i" % i 243 ... messenger.put(message) 244 >>> messenger.send() 245 246 Similarly, the L{recv} method receives messages into the incoming 247 queue, and may block as it attempts to receive the requested number 248 of messages, or until timeout is reached. It may receive fewer 249 than the requested number. The L{get} method pops the 250 eldest L{Message} off the incoming queue and copies it into the L{Message} 251 object that you supply. It will not block. 252 253 254 >>> message = Message() 255 >>> messenger.recv(10): 256 >>> while messenger.incoming > 0: 257 ... messenger.get(message) 258 ... print message.subject 259 Hello World 0 260 Hello World 1 261 Hello World 2 262 263 The blocking flag allows you to turn off blocking behavior entirely, 264 in which case L{send} and L{recv} will do whatever they can without 265 blocking, and then return. You can then look at the number 266 of incoming and outgoing messages to see how much outstanding work 267 still remains. 268 """ 269
270 - def __init__(self, name=None):
271 """ 272 Construct a new L{Messenger} with the given name. The name has 273 global scope. If a NULL name is supplied, a UUID based name will 274 be chosen. 275 276 @type name: string 277 @param name: the name of the messenger or None 278 279 """ 280 self._mng = pn_messenger(name) 281 self._selectables = {}
282
283 - def __del__(self):
284 """ 285 Destroy the L{Messenger}. This will close all connections that 286 are managed by the L{Messenger}. Call the L{stop} method before 287 destroying the L{Messenger}. 288 """ 289 if hasattr(self, "_mng"): 290 pn_messenger_free(self._mng) 291 del self._mng
292
293 - def _check(self, err):
294 if err < 0: 295 if (err == PN_INPROGRESS): 296 return 297 exc = EXCEPTIONS.get(err, MessengerException) 298 raise exc("[%s]: %s" % (err, pn_error_text(pn_messenger_error(self._mng)))) 299 else: 300 return err
301 302 @property
303 - def name(self):
304 """ 305 The name of the L{Messenger}. 306 """ 307 return pn_messenger_name(self._mng)
308
309 - def _get_certificate(self):
310 return pn_messenger_get_certificate(self._mng)
311
312 - def _set_certificate(self, value):
313 self._check(pn_messenger_set_certificate(self._mng, value))
314 315 certificate = property(_get_certificate, _set_certificate, 316 doc=""" 317 Path to a certificate file for the L{Messenger}. This certificate is 318 used when the L{Messenger} accepts or establishes SSL/TLS connections. 319 This property must be specified for the L{Messenger} to accept 320 incoming SSL/TLS connections and to establish client authenticated 321 outgoing SSL/TLS connection. Non client authenticated outgoing SSL/TLS 322 connections do not require this property. 323 """) 324
325 - def _get_private_key(self):
326 return pn_messenger_get_private_key(self._mng)
327
328 - def _set_private_key(self, value):
329 self._check(pn_messenger_set_private_key(self._mng, value))
330 331 private_key = property(_get_private_key, _set_private_key, 332 doc=""" 333 Path to a private key file for the L{Messenger's<Messenger>} 334 certificate. This property must be specified for the L{Messenger} to 335 accept incoming SSL/TLS connections and to establish client 336 authenticated outgoing SSL/TLS connection. Non client authenticated 337 SSL/TLS connections do not require this property. 338 """) 339
340 - def _get_password(self):
341 return pn_messenger_get_password(self._mng)
342
343 - def _set_password(self, value):
344 self._check(pn_messenger_set_password(self._mng, value))
345 346 password = property(_get_password, _set_password, 347 doc=""" 348 This property contains the password for the L{Messenger.private_key} 349 file, or None if the file is not encrypted. 350 """) 351
352 - def _get_trusted_certificates(self):
353 return pn_messenger_get_trusted_certificates(self._mng)
354
355 - def _set_trusted_certificates(self, value):
356 self._check(pn_messenger_set_trusted_certificates(self._mng, value))
357 358 trusted_certificates = property(_get_trusted_certificates, 359 _set_trusted_certificates, 360 doc=""" 361 A path to a database of trusted certificates for use in verifying the 362 peer on an SSL/TLS connection. If this property is None, then the peer 363 will not be verified. 364 """) 365
366 - def _get_timeout(self):
367 t = pn_messenger_get_timeout(self._mng) 368 if t == -1: 369 return None 370 else: 371 return millis2secs(t)
372
373 - def _set_timeout(self, value):
374 if value is None: 375 t = -1 376 else: 377 t = secs2millis(value) 378 self._check(pn_messenger_set_timeout(self._mng, t))
379 380 timeout = property(_get_timeout, _set_timeout, 381 doc=""" 382 The timeout property contains the default timeout for blocking 383 operations performed by the L{Messenger}. 384 """) 385
386 - def _is_blocking(self):
387 return pn_messenger_is_blocking(self._mng)
388
389 - def _set_blocking(self, b):
390 self._check(pn_messenger_set_blocking(self._mng, b))
391 392 blocking = property(_is_blocking, _set_blocking, 393 doc=""" 394 Enable or disable blocking behavior during L{Message} sending 395 and receiving. This affects every blocking call, with the 396 exception of L{work}. Currently, the affected calls are 397 L{send}, L{recv}, and L{stop}. 398 """) 399
400 - def _is_passive(self):
401 return pn_messenger_is_passive(self._mng)
402
403 - def _set_passive(self, b):
404 self._check(pn_messenger_set_passive(self._mng, b))
405 406 passive = property(_is_passive, _set_passive, 407 doc=""" 408 When passive is set to true, Messenger will not attempt to perform I/O 409 internally. In this mode it is necessary to use the selectables API to 410 drive any I/O needed to perform requested actions. In this mode 411 Messenger will never block. 412 """) 413
414 - def _get_incoming_window(self):
415 return pn_messenger_get_incoming_window(self._mng)
416
417 - def _set_incoming_window(self, window):
418 self._check(pn_messenger_set_incoming_window(self._mng, window))
419 420 incoming_window = property(_get_incoming_window, _set_incoming_window, 421 doc=""" 422 The incoming tracking window for the messenger. The messenger will 423 track the remote status of this many incoming deliveries after they 424 have been accepted or rejected. Defaults to zero. 425 426 L{Messages<Message>} enter this window only when you take them into your application 427 using L{get}. If your incoming window size is I{n}, and you get I{n}+1 L{messages<Message>} 428 without explicitly accepting or rejecting the oldest message, then the 429 message that passes beyond the edge of the incoming window will be assigned 430 the default disposition of its link. 431 """) 432
433 - def _get_outgoing_window(self):
434 return pn_messenger_get_outgoing_window(self._mng)
435
436 - def _set_outgoing_window(self, window):
437 self._check(pn_messenger_set_outgoing_window(self._mng, window))
438 439 outgoing_window = property(_get_outgoing_window, _set_outgoing_window, 440 doc=""" 441 The outgoing tracking window for the messenger. The messenger will 442 track the remote status of this many outgoing deliveries after calling 443 send. Defaults to zero. 444 445 A L{Message} enters this window when you call the put() method with the 446 message. If your outgoing window size is I{n}, and you call L{put} I{n}+1 447 times, status information will no longer be available for the 448 first message. 449 """) 450
451 - def start(self):
452 """ 453 Currently a no-op placeholder. 454 For future compatibility, do not L{send} or L{recv} messages 455 before starting the L{Messenger}. 456 """ 457 self._check(pn_messenger_start(self._mng))
458
459 - def stop(self):
460 """ 461 Transitions the L{Messenger} to an inactive state. An inactive 462 L{Messenger} will not send or receive messages from its internal 463 queues. A L{Messenger} should be stopped before being discarded to 464 ensure a clean shutdown handshake occurs on any internally managed 465 connections. 466 """ 467 self._check(pn_messenger_stop(self._mng))
468 469 @property
470 - def stopped(self):
471 """ 472 Returns true iff a L{Messenger} is in the stopped state. 473 This function does not block. 474 """ 475 return pn_messenger_stopped(self._mng)
476
477 - def subscribe(self, source):
478 """ 479 Subscribes the L{Messenger} to messages originating from the 480 specified source. The source is an address as specified in the 481 L{Messenger} introduction with the following addition. If the 482 domain portion of the address begins with the '~' character, the 483 L{Messenger} will interpret the domain as host/port, bind to it, 484 and listen for incoming messages. For example "~0.0.0.0", 485 "amqp://~0.0.0.0", and "amqps://~0.0.0.0" will all bind to any 486 local interface and listen for incoming messages with the last 487 variant only permitting incoming SSL connections. 488 489 @type source: string 490 @param source: the source of messages to subscribe to 491 """ 492 sub_impl = pn_messenger_subscribe(self._mng, source) 493 if not sub_impl: 494 self._check(pn_error_code(pn_messenger_error(self._mng))) 495 raise MessengerException("Cannot subscribe to %s"%source) 496 return Subscription(sub_impl)
497
498 - def put(self, message):
499 """ 500 Places the content contained in the message onto the outgoing 501 queue of the L{Messenger}. This method will never block, however 502 it will send any unblocked L{Messages<Message>} in the outgoing 503 queue immediately and leave any blocked L{Messages<Message>} 504 remaining in the outgoing queue. The L{send} call may be used to 505 block until the outgoing queue is empty. The L{outgoing} property 506 may be used to check the depth of the outgoing queue. 507 508 When the content in a given L{Message} object is copied to the outgoing 509 message queue, you may then modify or discard the L{Message} object 510 without having any impact on the content in the outgoing queue. 511 512 This method returns an outgoing tracker for the L{Message}. The tracker 513 can be used to determine the delivery status of the L{Message}. 514 515 @type message: Message 516 @param message: the message to place in the outgoing queue 517 @return: a tracker 518 """ 519 message._pre_encode() 520 self._check(pn_messenger_put(self._mng, message._msg)) 521 return pn_messenger_outgoing_tracker(self._mng)
522
523 - def status(self, tracker):
524 """ 525 Gets the last known remote state of the delivery associated with 526 the given tracker. 527 528 @type tracker: tracker 529 @param tracker: the tracker whose status is to be retrieved 530 531 @return: one of None, PENDING, REJECTED, MODIFIED, or ACCEPTED 532 """ 533 disp = pn_messenger_status(self._mng, tracker); 534 return STATUSES.get(disp, disp)
535
536 - def buffered(self, tracker):
537 """ 538 Checks if the delivery associated with the given tracker is still 539 waiting to be sent. 540 541 @type tracker: tracker 542 @param tracker: the tracker whose status is to be retrieved 543 544 @return: true if delivery is still buffered 545 """ 546 return pn_messenger_buffered(self._mng, tracker);
547
548 - def settle(self, tracker=None):
549 """ 550 Frees a L{Messenger} from tracking the status associated with a given 551 tracker. If you don't supply a tracker, all outgoing L{messages<Message>} up 552 to the most recent will be settled. 553 """ 554 if tracker is None: 555 tracker = pn_messenger_outgoing_tracker(self._mng) 556 flags = PN_CUMULATIVE 557 else: 558 flags = 0 559 self._check(pn_messenger_settle(self._mng, tracker, flags))
560
561 - def send(self, n=-1):
562 """ 563 This call will block until the indicated number of L{messages<Message>} 564 have been sent, or until the operation times out. If n is -1 this call will 565 block until all outgoing L{messages<Message>} have been sent. If n is 0 then 566 this call will send whatever it can without blocking. 567 """ 568 self._check(pn_messenger_send(self._mng, n))
569
570 - def recv(self, n=None):
571 """ 572 Receives up to I{n} L{messages<Message>} into the incoming queue. If no value 573 for I{n} is supplied, this call will receive as many L{messages<Message>} as it 574 can buffer internally. If the L{Messenger} is in blocking mode, this 575 call will block until at least one L{Message} is available in the 576 incoming queue. 577 """ 578 if n is None: 579 n = -1 580 self._check(pn_messenger_recv(self._mng, n))
581
582 - def work(self, timeout=None):
583 """ 584 Sends or receives any outstanding L{messages<Message>} queued for a L{Messenger}. 585 This will block for the indicated timeout. 586 This method may also do I/O work other than sending and receiving 587 L{messages<Message>}. For example, closing connections after messenger.L{stop}() 588 has been called. 589 """ 590 if timeout is None: 591 t = -1 592 else: 593 t = secs2millis(timeout) 594 err = pn_messenger_work(self._mng, t) 595 if (err == PN_TIMEOUT): 596 return False 597 else: 598 self._check(err) 599 return True
600 601 @property
602 - def receiving(self):
603 return pn_messenger_receiving(self._mng)
604
605 - def interrupt(self):
606 """ 607 The L{Messenger} interface is single-threaded. 608 This is the only L{Messenger} function intended to be called 609 from outside of the L{Messenger} thread. 610 Call this from a non-messenger thread to interrupt 611 a L{Messenger} that is blocking. 612 This will cause any in-progress blocking call to throw 613 the L{Interrupt} exception. If there is no currently blocking 614 call, then the next blocking call will be affected, even if it 615 is within the same thread that interrupt was called from. 616 """ 617 self._check(pn_messenger_interrupt(self._mng))
618
619 - def get(self, message=None):
620 """ 621 Moves the message from the head of the incoming message queue into 622 the supplied message object. Any content in the message will be 623 overwritten. 624 625 A tracker for the incoming L{Message} is returned. The tracker can 626 later be used to communicate your acceptance or rejection of the 627 L{Message}. 628 629 If None is passed in for the L{Message} object, the L{Message} 630 popped from the head of the queue is discarded. 631 632 @type message: Message 633 @param message: the destination message object 634 @return: a tracker 635 """ 636 if message is None: 637 impl = None 638 else: 639 impl = message._msg 640 self._check(pn_messenger_get(self._mng, impl)) 641 if message is not None: 642 message._post_decode() 643 return pn_messenger_incoming_tracker(self._mng)
644
645 - def accept(self, tracker=None):
646 """ 647 Signal the sender that you have acted on the L{Message} 648 pointed to by the tracker. If no tracker is supplied, 649 then all messages that have been returned by the L{get} 650 method are accepted, except those that have already been 651 auto-settled by passing beyond your incoming window size. 652 653 @type tracker: tracker 654 @param tracker: a tracker as returned by get 655 """ 656 if tracker is None: 657 tracker = pn_messenger_incoming_tracker(self._mng) 658 flags = PN_CUMULATIVE 659 else: 660 flags = 0 661 self._check(pn_messenger_accept(self._mng, tracker, flags))
662
663 - def reject(self, tracker=None):
664 """ 665 Rejects the L{Message} indicated by the tracker. If no tracker 666 is supplied, all messages that have been returned by the L{get} 667 method are rejected, except those that have already been auto-settled 668 by passing beyond your outgoing window size. 669 670 @type tracker: tracker 671 @param tracker: a tracker as returned by get 672 """ 673 if tracker is None: 674 tracker = pn_messenger_incoming_tracker(self._mng) 675 flags = PN_CUMULATIVE 676 else: 677 flags = 0 678 self._check(pn_messenger_reject(self._mng, tracker, flags))
679 680 @property
681 - def outgoing(self):
682 """ 683 The outgoing queue depth. 684 """ 685 return pn_messenger_outgoing(self._mng)
686 687 @property
688 - def incoming(self):
689 """ 690 The incoming queue depth. 691 """ 692 return pn_messenger_incoming(self._mng)
693
694 - def route(self, pattern, address):
695 """ 696 Adds a routing rule to a L{Messenger's<Messenger>} internal routing table. 697 698 The route procedure may be used to influence how a L{Messenger} will 699 internally treat a given address or class of addresses. Every call 700 to the route procedure will result in L{Messenger} appending a routing 701 rule to its internal routing table. 702 703 Whenever a L{Message} is presented to a L{Messenger} for delivery, it 704 will match the address of this message against the set of routing 705 rules in order. The first rule to match will be triggered, and 706 instead of routing based on the address presented in the message, 707 the L{Messenger} will route based on the address supplied in the rule. 708 709 The pattern matching syntax supports two types of matches, a '%' 710 will match any character except a '/', and a '*' will match any 711 character including a '/'. 712 713 A routing address is specified as a normal AMQP address, however it 714 may additionally use substitution variables from the pattern match 715 that triggered the rule. 716 717 Any message sent to "foo" will be routed to "amqp://foo.com": 718 719 >>> messenger.route("foo", "amqp://foo.com"); 720 721 Any message sent to "foobar" will be routed to 722 "amqp://foo.com/bar": 723 724 >>> messenger.route("foobar", "amqp://foo.com/bar"); 725 726 Any message sent to bar/<path> will be routed to the corresponding 727 path within the amqp://bar.com domain: 728 729 >>> messenger.route("bar/*", "amqp://bar.com/$1"); 730 731 Route all L{messages<Message>} over TLS: 732 733 >>> messenger.route("amqp:*", "amqps:$1") 734 735 Supply credentials for foo.com: 736 737 >>> messenger.route("amqp://foo.com/*", "amqp://user:password@foo.com/$1"); 738 739 Supply credentials for all domains: 740 741 >>> messenger.route("amqp://*", "amqp://user:password@$1"); 742 743 Route all addresses through a single proxy while preserving the 744 original destination: 745 746 >>> messenger.route("amqp://%/*", "amqp://user:password@proxy/$1/$2"); 747 748 Route any address through a single broker: 749 750 >>> messenger.route("*", "amqp://user:password@broker/$1"); 751 """ 752 self._check(pn_messenger_route(self._mng, unicode2utf8(pattern), unicode2utf8(address)))
753
754 - def rewrite(self, pattern, address):
755 """ 756 Similar to route(), except that the destination of 757 the L{Message} is determined before the message address is rewritten. 758 759 The outgoing address is only rewritten after routing has been 760 finalized. If a message has an outgoing address of 761 "amqp://0.0.0.0:5678", and a rewriting rule that changes its 762 outgoing address to "foo", it will still arrive at the peer that 763 is listening on "amqp://0.0.0.0:5678", but when it arrives there, 764 the receiver will see its outgoing address as "foo". 765 766 The default rewrite rule removes username and password from addresses 767 before they are transmitted. 768 """ 769 self._check(pn_messenger_rewrite(self._mng, unicode2utf8(pattern), unicode2utf8(address)))
770
771 - def selectable(self):
772 return Selectable.wrap(pn_messenger_selectable(self._mng))
773 774 @property
775 - def deadline(self):
776 tstamp = pn_messenger_deadline(self._mng) 777 if tstamp: 778 return millis2secs(tstamp) 779 else: 780 return None
781
782 -class Message(object):
783 """The L{Message} class is a mutable holder of message content. 784 785 @ivar instructions: delivery instructions for the message 786 @type instructions: dict 787 @ivar annotations: infrastructure defined message annotations 788 @type annotations: dict 789 @ivar properties: application defined message properties 790 @type properties: dict 791 @ivar body: message body 792 @type body: bytes | unicode | dict | list | int | long | float | UUID 793 """ 794 795 DEFAULT_PRIORITY = PN_DEFAULT_PRIORITY 796
797 - def __init__(self, body=None, **kwargs):
798 """ 799 @param kwargs: Message property name/value pairs to initialise the Message 800 """ 801 self._msg = pn_message() 802 self._id = Data(pn_message_id(self._msg)) 803 self._correlation_id = Data(pn_message_correlation_id(self._msg)) 804 self.instructions = None 805 self.annotations = None 806 self.properties = None 807 self.body = body 808 for k,v in _compat.iteritems(kwargs): 809 getattr(self, k) # Raise exception if it's not a valid attribute. 810 setattr(self, k, v)
811
812 - def __del__(self):
813 if hasattr(self, "_msg"): 814 pn_message_free(self._msg) 815 del self._msg
816
817 - def _check(self, err):
818 if err < 0: 819 exc = EXCEPTIONS.get(err, MessageException) 820 raise exc("[%s]: %s" % (err, pn_error_text(pn_message_error(self._msg)))) 821 else: 822 return err
823
824 - def _pre_encode(self):
825 inst = Data(pn_message_instructions(self._msg)) 826 ann = Data(pn_message_annotations(self._msg)) 827 props = Data(pn_message_properties(self._msg)) 828 body = Data(pn_message_body(self._msg)) 829 830 inst.clear() 831 if self.instructions is not None: 832 inst.put_object(self.instructions) 833 ann.clear() 834 if self.annotations is not None: 835 ann.put_object(self.annotations) 836 props.clear() 837 if self.properties is not None: 838 props.put_object(self.properties) 839 body.clear() 840 if self.body is not None: 841 body.put_object(self.body)
842
843 - def _post_decode(self):
844 inst = Data(pn_message_instructions(self._msg)) 845 ann = Data(pn_message_annotations(self._msg)) 846 props = Data(pn_message_properties(self._msg)) 847 body = Data(pn_message_body(self._msg)) 848 849 if inst.next(): 850 self.instructions = inst.get_object() 851 else: 852 self.instructions = None 853 if ann.next(): 854 self.annotations = ann.get_object() 855 else: 856 self.annotations = None 857 if props.next(): 858 self.properties = props.get_object() 859 else: 860 self.properties = None 861 if body.next(): 862 self.body = body.get_object() 863 else: 864 self.body = None
865
866 - def clear(self):
867 """ 868 Clears the contents of the L{Message}. All fields will be reset to 869 their default values. 870 """ 871 pn_message_clear(self._msg) 872 self.instructions = None 873 self.annotations = None 874 self.properties = None 875 self.body = None
876
877 - def _is_inferred(self):
878 return pn_message_is_inferred(self._msg)
879
880 - def _set_inferred(self, value):
881 self._check(pn_message_set_inferred(self._msg, bool(value)))
882 883 inferred = property(_is_inferred, _set_inferred, doc=""" 884 The inferred flag for a message indicates how the message content 885 is encoded into AMQP sections. If inferred is true then binary and 886 list values in the body of the message will be encoded as AMQP DATA 887 and AMQP SEQUENCE sections, respectively. If inferred is false, 888 then all values in the body of the message will be encoded as AMQP 889 VALUE sections regardless of their type. 890 """) 891
892 - def _is_durable(self):
893 return pn_message_is_durable(self._msg)
894
895 - def _set_durable(self, value):
896 self._check(pn_message_set_durable(self._msg, bool(value)))
897 898 durable = property(_is_durable, _set_durable, 899 doc=""" 900 The durable property indicates that the message should be held durably 901 by any intermediaries taking responsibility for the message. 902 """) 903
904 - def _get_priority(self):
905 return pn_message_get_priority(self._msg)
906
907 - def _set_priority(self, value):
908 self._check(pn_message_set_priority(self._msg, value))
909 910 priority = property(_get_priority, _set_priority, 911 doc=""" 912 The priority of the message. 913 """) 914
915 - def _get_ttl(self):
916 return millis2secs(pn_message_get_ttl(self._msg))
917
918 - def _set_ttl(self, value):
919 self._check(pn_message_set_ttl(self._msg, secs2millis(value)))
920 921 ttl = property(_get_ttl, _set_ttl, 922 doc=""" 923 The time to live of the message measured in seconds. Expired messages 924 may be dropped. 925 """) 926
927 - def _is_first_acquirer(self):
928 return pn_message_is_first_acquirer(self._msg)
929
930 - def _set_first_acquirer(self, value):
931 self._check(pn_message_set_first_acquirer(self._msg, bool(value)))
932 933 first_acquirer = property(_is_first_acquirer, _set_first_acquirer, 934 doc=""" 935 True iff the recipient is the first to acquire the message. 936 """) 937
938 - def _get_delivery_count(self):
939 return pn_message_get_delivery_count(self._msg)
940
941 - def _set_delivery_count(self, value):
942 self._check(pn_message_set_delivery_count(self._msg, value))
943 944 delivery_count = property(_get_delivery_count, _set_delivery_count, 945 doc=""" 946 The number of delivery attempts made for this message. 947 """) 948 949
950 - def _get_id(self):
951 return self._id.get_object()
952 - def _set_id(self, value):
953 if type(value) in _compat.INT_TYPES: 954 value = ulong(value) 955 self._id.rewind() 956 self._id.put_object(value)
957 id = property(_get_id, _set_id, 958 doc=""" 959 The id of the message. 960 """) 961
962 - def _get_user_id(self):
963 return pn_message_get_user_id(self._msg)
964
965 - def _set_user_id(self, value):
966 self._check(pn_message_set_user_id(self._msg, value))
967 968 user_id = property(_get_user_id, _set_user_id, 969 doc=""" 970 The user id of the message creator. 971 """) 972
973 - def _get_address(self):
974 return utf82unicode(pn_message_get_address(self._msg))
975
976 - def _set_address(self, value):
977 self._check(pn_message_set_address(self._msg, unicode2utf8(value)))
978 979 address = property(_get_address, _set_address, 980 doc=""" 981 The address of the message. 982 """) 983
984 - def _get_subject(self):
985 return pn_message_get_subject(self._msg)
986
987 - def _set_subject(self, value):
988 self._check(pn_message_set_subject(self._msg, value))
989 990 subject = property(_get_subject, _set_subject, 991 doc=""" 992 The subject of the message. 993 """) 994
995 - def _get_reply_to(self):
996 return utf82unicode(pn_message_get_reply_to(self._msg))
997
998 - def _set_reply_to(self, value):
999 self._check(pn_message_set_reply_to(self._msg, unicode2utf8(value)))
1000 1001 reply_to = property(_get_reply_to, _set_reply_to, 1002 doc=""" 1003 The reply-to address for the message. 1004 """) 1005
1006 - def _get_correlation_id(self):
1007 return self._correlation_id.get_object()
1008 - def _set_correlation_id(self, value):
1009 if type(value) in _compat.INT_TYPES: 1010 value = ulong(value) 1011 self._correlation_id.rewind() 1012 self._correlation_id.put_object(value)
1013 1014 correlation_id = property(_get_correlation_id, _set_correlation_id, 1015 doc=""" 1016 The correlation-id for the message. 1017 """) 1018
1019 - def _get_content_type(self):
1020 return pn_message_get_content_type(self._msg)
1021
1022 - def _set_content_type(self, value):
1023 self._check(pn_message_set_content_type(self._msg, value))
1024 1025 content_type = property(_get_content_type, _set_content_type, 1026 doc=""" 1027 The content-type of the message. 1028 """) 1029
1030 - def _get_content_encoding(self):
1031 return pn_message_get_content_encoding(self._msg)
1032
1033 - def _set_content_encoding(self, value):
1034 self._check(pn_message_set_content_encoding(self._msg, value))
1035 1036 content_encoding = property(_get_content_encoding, _set_content_encoding, 1037 doc=""" 1038 The content-encoding of the message. 1039 """) 1040
1041 - def _get_expiry_time(self):
1042 return millis2secs(pn_message_get_expiry_time(self._msg))
1043
1044 - def _set_expiry_time(self, value):
1045 self._check(pn_message_set_expiry_time(self._msg, secs2millis(value)))
1046 1047 expiry_time = property(_get_expiry_time, _set_expiry_time, 1048 doc=""" 1049 The expiry time of the message. 1050 """) 1051
1052 - def _get_creation_time(self):
1053 return millis2secs(pn_message_get_creation_time(self._msg))
1054
1055 - def _set_creation_time(self, value):
1056 self._check(pn_message_set_creation_time(self._msg, secs2millis(value)))
1057 1058 creation_time = property(_get_creation_time, _set_creation_time, 1059 doc=""" 1060 The creation time of the message. 1061 """) 1062
1063 - def _get_group_id(self):
1064 return pn_message_get_group_id(self._msg)
1065
1066 - def _set_group_id(self, value):
1067 self._check(pn_message_set_group_id(self._msg, value))
1068 1069 group_id = property(_get_group_id, _set_group_id, 1070 doc=""" 1071 The group id of the message. 1072 """) 1073
1074 - def _get_group_sequence(self):
1075 return pn_message_get_group_sequence(self._msg)
1076
1077 - def _set_group_sequence(self, value):
1078 self._check(pn_message_set_group_sequence(self._msg, value))
1079 1080 group_sequence = property(_get_group_sequence, _set_group_sequence, 1081 doc=""" 1082 The sequence of the message within its group. 1083 """) 1084
1085 - def _get_reply_to_group_id(self):
1086 return pn_message_get_reply_to_group_id(self._msg)
1087
1088 - def _set_reply_to_group_id(self, value):
1089 self._check(pn_message_set_reply_to_group_id(self._msg, value))
1090 1091 reply_to_group_id = property(_get_reply_to_group_id, _set_reply_to_group_id, 1092 doc=""" 1093 The group-id for any replies. 1094 """) 1095
1096 - def encode(self):
1097 self._pre_encode() 1098 sz = 16 1099 while True: 1100 err, data = pn_message_encode(self._msg, sz) 1101 if err == PN_OVERFLOW: 1102 sz *= 2 1103 continue 1104 else: 1105 self._check(err) 1106 return data
1107
1108 - def decode(self, data):
1109 self._check(pn_message_decode(self._msg, data)) 1110 self._post_decode()
1111
1112 - def send(self, sender, tag=None):
1113 dlv = sender.delivery(tag or sender.delivery_tag()) 1114 encoded = self.encode() 1115 sender.stream(encoded) 1116 sender.advance() 1117 if sender.snd_settle_mode == Link.SND_SETTLED: 1118 dlv.settle() 1119 return dlv
1120
1121 - def recv(self, link):
1122 """ 1123 Receives and decodes the message content for the current delivery 1124 from the link. Upon success it will return the current delivery 1125 for the link. If there is no current delivery, or if the current 1126 delivery is incomplete, or if the link is not a receiver, it will 1127 return None. 1128 1129 @type link: Link 1130 @param link: the link to receive a message from 1131 @return the delivery associated with the decoded message (or None) 1132 1133 """ 1134 if link.is_sender: return None 1135 dlv = link.current 1136 if not dlv or dlv.partial: return None 1137 encoded = link.recv(dlv.pending) 1138 link.advance() 1139 # the sender has already forgotten about the delivery, so we might 1140 # as well too 1141 if link.remote_snd_settle_mode == Link.SND_SETTLED: 1142 dlv.settle() 1143 self.decode(encoded) 1144 return dlv
1145
1146 - def __repr2__(self):
1147 props = [] 1148 for attr in ("inferred", "address", "reply_to", "durable", "ttl", 1149 "priority", "first_acquirer", "delivery_count", "id", 1150 "correlation_id", "user_id", "group_id", "group_sequence", 1151 "reply_to_group_id", "instructions", "annotations", 1152 "properties", "body"): 1153 value = getattr(self, attr) 1154 if value: props.append("%s=%r" % (attr, value)) 1155 return "Message(%s)" % ", ".join(props)
1156
1157 - def __repr__(self):
1158 tmp = pn_string(None) 1159 err = pn_inspect(self._msg, tmp) 1160 result = pn_string_get(tmp) 1161 pn_free(tmp) 1162 self._check(err) 1163 return result
1164
1165 -class Subscription(object):
1166
1167 - def __init__(self, impl):
1168 self._impl = impl
1169 1170 @property
1171 - def address(self):
1172 return pn_subscription_address(self._impl)
1173 1174 _DEFAULT = object()
1175 1176 -class Selectable(Wrapper):
1177 1178 @staticmethod
1179 - def wrap(impl):
1180 if impl is None: 1181 return None 1182 else: 1183 return Selectable(impl)
1184
1185 - def __init__(self, impl):
1186 Wrapper.__init__(self, impl, pn_selectable_attachments)
1187
1188 - def _init(self):
1189 pass
1190
1191 - def fileno(self, fd = _DEFAULT):
1192 if fd is _DEFAULT: 1193 return pn_selectable_get_fd(self._impl) 1194 elif fd is None: 1195 pn_selectable_set_fd(self._impl, PN_INVALID_SOCKET) 1196 else: 1197 pn_selectable_set_fd(self._impl, fd)
1198
1199 - def _is_reading(self):
1200 return pn_selectable_is_reading(self._impl)
1201
1202 - def _set_reading(self, val):
1203 pn_selectable_set_reading(self._impl, bool(val))
1204 1205 reading = property(_is_reading, _set_reading) 1206
1207 - def _is_writing(self):
1208 return pn_selectable_is_writing(self._impl)
1209
1210 - def _set_writing(self, val):
1211 pn_selectable_set_writing(self._impl, bool(val))
1212 1213 writing = property(_is_writing, _set_writing) 1214
1215 - def _get_deadline(self):
1216 tstamp = pn_selectable_get_deadline(self._impl) 1217 if tstamp: 1218 return millis2secs(tstamp) 1219 else: 1220 return None
1221
1222 - def _set_deadline(self, deadline):
1223 pn_selectable_set_deadline(self._impl, secs2millis(deadline))
1224 1225 deadline = property(_get_deadline, _set_deadline) 1226
1227 - def readable(self):
1228 pn_selectable_readable(self._impl)
1229
1230 - def writable(self):
1231 pn_selectable_writable(self._impl)
1232
1233 - def expired(self):
1234 pn_selectable_expired(self._impl)
1235
1236 - def _is_registered(self):
1237 return pn_selectable_is_registered(self._impl)
1238
1239 - def _set_registered(self, registered):
1240 pn_selectable_set_registered(self._impl, registered)
1241 1242 registered = property(_is_registered, _set_registered, 1243 doc=""" 1244 The registered property may be get/set by an I/O polling system to 1245 indicate whether the fd has been registered or not. 1246 """) 1247 1248 @property
1249 - def is_terminal(self):
1250 return pn_selectable_is_terminal(self._impl)
1251
1252 - def terminate(self):
1253 pn_selectable_terminate(self._impl)
1254
1255 - def release(self):
1256 pn_selectable_release(self._impl)
1257
1258 -class DataException(ProtonException):
1259 """ 1260 The DataException class is the root of the Data exception hierarchy. 1261 All exceptions raised by the Data class extend this exception. 1262 """ 1263 pass
1264
1265 -class UnmappedType:
1266
1267 - def __init__(self, msg):
1268 self.msg = msg
1269
1270 - def __repr__(self):
1271 return "UnmappedType(%s)" % self.msg
1272
1273 -class ulong(long):
1274
1275 - def __repr__(self):
1276 return "ulong(%s)" % long.__repr__(self)
1277
1278 -class timestamp(long):
1279
1280 - def __repr__(self):
1281 return "timestamp(%s)" % long.__repr__(self)
1282
1283 -class symbol(unicode):
1284
1285 - def __repr__(self):
1286 return "symbol(%s)" % unicode.__repr__(self)
1287
1288 -class char(unicode):
1289
1290 - def __repr__(self):
1291 return "char(%s)" % unicode.__repr__(self)
1292
1293 -class Described(object):
1294
1295 - def __init__(self, descriptor, value):
1296 self.descriptor = descriptor 1297 self.value = value
1298
1299 - def __repr__(self):
1300 return "Described(%r, %r)" % (self.descriptor, self.value)
1301
1302 - def __eq__(self, o):
1303 if isinstance(o, Described): 1304 return self.descriptor == o.descriptor and self.value == o.value 1305 else: 1306 return False
1307 1308 UNDESCRIBED = Constant("UNDESCRIBED")
1309 1310 -class Array(object):
1311
1312 - def __init__(self, descriptor, type, *elements):
1313 self.descriptor = descriptor 1314 self.type = type 1315 self.elements = elements
1316
1317 - def __iter__(self):
1318 return iter(self.elements)
1319
1320 - def __repr__(self):
1321 if self.elements: 1322 els = ", %s" % (", ".join(map(repr, self.elements))) 1323 else: 1324 els = "" 1325 return "Array(%r, %r%s)" % (self.descriptor, self.type, els)
1326
1327 - def __eq__(self, o):
1328 if isinstance(o, Array): 1329 return self.descriptor == o.descriptor and \ 1330 self.type == o.type and self.elements == o.elements 1331 else: 1332 return False
1333
1334 -class Data:
1335 """ 1336 The L{Data} class provides an interface for decoding, extracting, 1337 creating, and encoding arbitrary AMQP data. A L{Data} object 1338 contains a tree of AMQP values. Leaf nodes in this tree correspond 1339 to scalars in the AMQP type system such as L{ints<INT>} or 1340 L{strings<STRING>}. Non-leaf nodes in this tree correspond to 1341 compound values in the AMQP type system such as L{lists<LIST>}, 1342 L{maps<MAP>}, L{arrays<ARRAY>}, or L{described values<DESCRIBED>}. 1343 The root node of the tree is the L{Data} object itself and can have 1344 an arbitrary number of children. 1345 1346 A L{Data} object maintains the notion of the current sibling node 1347 and a current parent node. Siblings are ordered within their parent. 1348 Values are accessed and/or added by using the L{next}, L{prev}, 1349 L{enter}, and L{exit} methods to navigate to the desired location in 1350 the tree and using the supplied variety of put_*/get_* methods to 1351 access or add a value of the desired type. 1352 1353 The put_* methods will always add a value I{after} the current node 1354 in the tree. If the current node has a next sibling the put_* method 1355 will overwrite the value on this node. If there is no current node 1356 or the current node has no next sibling then one will be added. The 1357 put_* methods always set the added/modified node to the current 1358 node. The get_* methods read the value of the current node and do 1359 not change which node is current. 1360 1361 The following types of scalar values are supported: 1362 1363 - L{NULL} 1364 - L{BOOL} 1365 - L{UBYTE} 1366 - L{USHORT} 1367 - L{SHORT} 1368 - L{UINT} 1369 - L{INT} 1370 - L{ULONG} 1371 - L{LONG} 1372 - L{FLOAT} 1373 - L{DOUBLE} 1374 - L{BINARY} 1375 - L{STRING} 1376 - L{SYMBOL} 1377 1378 The following types of compound values are supported: 1379 1380 - L{DESCRIBED} 1381 - L{ARRAY} 1382 - L{LIST} 1383 - L{MAP} 1384 """ 1385 1386 NULL = PN_NULL; "A null value." 1387 BOOL = PN_BOOL; "A boolean value." 1388 UBYTE = PN_UBYTE; "An unsigned byte value." 1389 BYTE = PN_BYTE; "A signed byte value." 1390 USHORT = PN_USHORT; "An unsigned short value." 1391 SHORT = PN_SHORT; "A short value." 1392 UINT = PN_UINT; "An unsigned int value." 1393 INT = PN_INT; "A signed int value." 1394 CHAR = PN_CHAR; "A character value." 1395 ULONG = PN_ULONG; "An unsigned long value." 1396 LONG = PN_LONG; "A signed long value." 1397 TIMESTAMP = PN_TIMESTAMP; "A timestamp value." 1398 FLOAT = PN_FLOAT; "A float value." 1399 DOUBLE = PN_DOUBLE; "A double value." 1400 DECIMAL32 = PN_DECIMAL32; "A DECIMAL32 value." 1401 DECIMAL64 = PN_DECIMAL64; "A DECIMAL64 value." 1402 DECIMAL128 = PN_DECIMAL128; "A DECIMAL128 value." 1403 UUID = PN_UUID; "A UUID value." 1404 BINARY = PN_BINARY; "A binary string." 1405 STRING = PN_STRING; "A unicode string." 1406 SYMBOL = PN_SYMBOL; "A symbolic string." 1407 DESCRIBED = PN_DESCRIBED; "A described value." 1408 ARRAY = PN_ARRAY; "An array value." 1409 LIST = PN_LIST; "A list value." 1410 MAP = PN_MAP; "A map value." 1411 1412 type_names = { 1413 NULL: "null", 1414 BOOL: "bool", 1415 BYTE: "byte", 1416 UBYTE: "ubyte", 1417 SHORT: "short", 1418 USHORT: "ushort", 1419 INT: "int", 1420 UINT: "uint", 1421 CHAR: "char", 1422 LONG: "long", 1423 ULONG: "ulong", 1424 TIMESTAMP: "timestamp", 1425 FLOAT: "float", 1426 DOUBLE: "double", 1427 DECIMAL32: "decimal32", 1428 DECIMAL64: "decimal64", 1429 DECIMAL128: "decimal128", 1430 UUID: "uuid", 1431 BINARY: "binary", 1432 STRING: "string", 1433 SYMBOL: "symbol", 1434 DESCRIBED: "described", 1435 ARRAY: "array", 1436 LIST: "list", 1437 MAP: "map" 1438 } 1439 1440 @classmethod
1441 - def type_name(type): return Data.type_names[type]
1442
1443 - def __init__(self, capacity=16):
1444 if type(capacity) in _compat.INT_TYPES: 1445 self._data = pn_data(capacity) 1446 self._free = True 1447 else: 1448 self._data = capacity 1449 self._free = False
1450
1451 - def __del__(self):
1452 if self._free and hasattr(self, "_data"): 1453 pn_data_free(self._data) 1454 del self._data
1455
1456 - def _check(self, err):
1457 if err < 0: 1458 exc = EXCEPTIONS.get(err, DataException) 1459 raise exc("[%s]: %s" % (err, pn_error_text(pn_data_error(self._data)))) 1460 else: 1461 return err
1462
1463 - def clear(self):
1464 """ 1465 Clears the data object. 1466 """ 1467 pn_data_clear(self._data)
1468
1469 - def rewind(self):
1470 """ 1471 Clears current node and sets the parent to the root node. Clearing the 1472 current node sets it _before_ the first node, calling next() will advance to 1473 the first node. 1474 """ 1475 assert self._data is not None 1476 pn_data_rewind(self._data)
1477
1478 - def next(self):
1479 """ 1480 Advances the current node to its next sibling and returns its 1481 type. If there is no next sibling the current node remains 1482 unchanged and None is returned. 1483 """ 1484 found = pn_data_next(self._data) 1485 if found: 1486 return self.type() 1487 else: 1488 return None
1489
1490 - def prev(self):
1491 """ 1492 Advances the current node to its previous sibling and returns its 1493 type. If there is no previous sibling the current node remains 1494 unchanged and None is returned. 1495 """ 1496 found = pn_data_prev(self._data) 1497 if found: 1498 return self.type() 1499 else: 1500 return None
1501
1502 - def enter(self):
1503 """ 1504 Sets the parent node to the current node and clears the current node. 1505 Clearing the current node sets it _before_ the first child, 1506 call next() advances to the first child. 1507 """ 1508 return pn_data_enter(self._data)
1509
1510 - def exit(self):
1511 """ 1512 Sets the current node to the parent node and the parent node to 1513 its own parent. 1514 """ 1515 return pn_data_exit(self._data)
1516
1517 - def lookup(self, name):
1518 return pn_data_lookup(self._data, name)
1519
1520 - def narrow(self):
1521 pn_data_narrow(self._data)
1522
1523 - def widen(self):
1524 pn_data_widen(self._data)
1525
1526 - def type(self):
1527 """ 1528 Returns the type of the current node. 1529 """ 1530 dtype = pn_data_type(self._data) 1531 if dtype == -1: 1532 return None 1533 else: 1534 return dtype
1535
1536 - def encoded_size(self):
1537 """ 1538 Returns the size in bytes needed to encode the data in AMQP format. 1539 """ 1540 return pn_data_encoded_size(self._data)
1541
1542 - def encode(self):
1543 """ 1544 Returns a representation of the data encoded in AMQP format. 1545 """ 1546 size = 1024 1547 while True: 1548 cd, enc = pn_data_encode(self._data, size) 1549 if cd == PN_OVERFLOW: 1550 size *= 2 1551 elif cd >= 0: 1552 return enc 1553 else: 1554 self._check(cd)
1555
1556 - def decode(self, encoded):
1557 """ 1558 Decodes the first value from supplied AMQP data and returns the 1559 number of bytes consumed. 1560 1561 @type encoded: binary 1562 @param encoded: AMQP encoded binary data 1563 """ 1564 return self._check(pn_data_decode(self._data, encoded))
1565
1566 - def put_list(self):
1567 """ 1568 Puts a list value. Elements may be filled by entering the list 1569 node and putting element values. 1570 1571 >>> data = Data() 1572 >>> data.put_list() 1573 >>> data.enter() 1574 >>> data.put_int(1) 1575 >>> data.put_int(2) 1576 >>> data.put_int(3) 1577 >>> data.exit() 1578 """ 1579 self._check(pn_data_put_list(self._data))
1580
1581 - def put_map(self):
1582 """ 1583 Puts a map value. Elements may be filled by entering the map node 1584 and putting alternating key value pairs. 1585 1586 >>> data = Data() 1587 >>> data.put_map() 1588 >>> data.enter() 1589 >>> data.put_string("key") 1590 >>> data.put_string("value") 1591 >>> data.exit() 1592 """ 1593 self._check(pn_data_put_map(self._data))
1594
1595 - def put_array(self, described, element_type):
1596 """ 1597 Puts an array value. Elements may be filled by entering the array 1598 node and putting the element values. The values must all be of the 1599 specified array element type. If an array is described then the 1600 first child value of the array is the descriptor and may be of any 1601 type. 1602 1603 >>> data = Data() 1604 >>> 1605 >>> data.put_array(False, Data.INT) 1606 >>> data.enter() 1607 >>> data.put_int(1) 1608 >>> data.put_int(2) 1609 >>> data.put_int(3) 1610 >>> data.exit() 1611 >>> 1612 >>> data.put_array(True, Data.DOUBLE) 1613 >>> data.enter() 1614 >>> data.put_symbol("array-descriptor") 1615 >>> data.put_double(1.1) 1616 >>> data.put_double(1.2) 1617 >>> data.put_double(1.3) 1618 >>> data.exit() 1619 1620 @type described: bool 1621 @param described: specifies whether the array is described 1622 @type element_type: int 1623 @param element_type: the type of the array elements 1624 """ 1625 self._check(pn_data_put_array(self._data, described, element_type))
1626
1627 - def put_described(self):
1628 """ 1629 Puts a described value. A described node has two children, the 1630 descriptor and the value. These are specified by entering the node 1631 and putting the desired values. 1632 1633 >>> data = Data() 1634 >>> data.put_described() 1635 >>> data.enter() 1636 >>> data.put_symbol("value-descriptor") 1637 >>> data.put_string("the value") 1638 >>> data.exit() 1639 """ 1640 self._check(pn_data_put_described(self._data))
1641
1642 - def put_null(self):
1643 """ 1644 Puts a null value. 1645 """ 1646 self._check(pn_data_put_null(self._data))
1647
1648 - def put_bool(self, b):
1649 """ 1650 Puts a boolean value. 1651 1652 @param b: a boolean value 1653 """ 1654 self._check(pn_data_put_bool(self._data, b))
1655
1656 - def put_ubyte(self, ub):
1657 """ 1658 Puts an unsigned byte value. 1659 1660 @param ub: an integral value 1661 """ 1662 self._check(pn_data_put_ubyte(self._data, ub))
1663
1664 - def put_byte(self, b):
1665 """ 1666 Puts a signed byte value. 1667 1668 @param b: an integral value 1669 """ 1670 self._check(pn_data_put_byte(self._data, b))
1671
1672 - def put_ushort(self, us):
1673 """ 1674 Puts an unsigned short value. 1675 1676 @param us: an integral value. 1677 """ 1678 self._check(pn_data_put_ushort(self._data, us))
1679
1680 - def put_short(self, s):
1681 """ 1682 Puts a signed short value. 1683 1684 @param s: an integral value 1685 """ 1686 self._check(pn_data_put_short(self._data, s))
1687
1688 - def put_uint(self, ui):
1689 """ 1690 Puts an unsigned int value. 1691 1692 @param ui: an integral value 1693 """ 1694 self._check(pn_data_put_uint(self._data, ui))
1695
1696 - def put_int(self, i):
1697 """ 1698 Puts a signed int value. 1699 1700 @param i: an integral value 1701 """ 1702 self._check(pn_data_put_int(self._data, i))
1703
1704 - def put_char(self, c):
1705 """ 1706 Puts a char value. 1707 1708 @param c: a single character 1709 """ 1710 self._check(pn_data_put_char(self._data, ord(c)))
1711
1712 - def put_ulong(self, ul):
1713 """ 1714 Puts an unsigned long value. 1715 1716 @param ul: an integral value 1717 """ 1718 self._check(pn_data_put_ulong(self._data, ul))
1719
1720 - def put_long(self, l):
1721 """ 1722 Puts a signed long value. 1723 1724 @param l: an integral value 1725 """ 1726 self._check(pn_data_put_long(self._data, l))
1727
1728 - def put_timestamp(self, t):
1729 """ 1730 Puts a timestamp value. 1731 1732 @param t: an integral value 1733 """ 1734 self._check(pn_data_put_timestamp(self._data, t))
1735
1736 - def put_float(self, f):
1737 """ 1738 Puts a float value. 1739 1740 @param f: a floating point value 1741 """ 1742 self._check(pn_data_put_float(self._data, f))
1743
1744 - def put_double(self, d):
1745 """ 1746 Puts a double value. 1747 1748 @param d: a floating point value. 1749 """ 1750 self._check(pn_data_put_double(self._data, d))
1751
1752 - def put_decimal32(self, d):
1753 """ 1754 Puts a decimal32 value. 1755 1756 @param d: a decimal32 value 1757 """ 1758 self._check(pn_data_put_decimal32(self._data, d))
1759
1760 - def put_decimal64(self, d):
1761 """ 1762 Puts a decimal64 value. 1763 1764 @param d: a decimal64 value 1765 """ 1766 self._check(pn_data_put_decimal64(self._data, d))
1767
1768 - def put_decimal128(self, d):
1769 """ 1770 Puts a decimal128 value. 1771 1772 @param d: a decimal128 value 1773 """ 1774 self._check(pn_data_put_decimal128(self._data, d))
1775
1776 - def put_uuid(self, u):
1777 """ 1778 Puts a UUID value. 1779 1780 @param u: a uuid value 1781 """ 1782 self._check(pn_data_put_uuid(self._data, u.bytes))
1783
1784 - def put_binary(self, b):
1785 """ 1786 Puts a binary value. 1787 1788 @type b: binary 1789 @param b: a binary value 1790 """ 1791 self._check(pn_data_put_binary(self._data, b))
1792
1793 - def put_string(self, s):
1794 """ 1795 Puts a unicode value. 1796 1797 @type s: unicode 1798 @param s: a unicode value 1799 """ 1800 self._check(pn_data_put_string(self._data, s.encode("utf8")))
1801
1802 - def put_symbol(self, s):
1803 """ 1804 Puts a symbolic value. 1805 1806 @type s: string 1807 @param s: the symbol name 1808 """ 1809 self._check(pn_data_put_symbol(self._data, s.encode('ascii')))
1810
1811 - def get_list(self):
1812 """ 1813 If the current node is a list, return the number of elements, 1814 otherwise return zero. List elements can be accessed by entering 1815 the list. 1816 1817 >>> count = data.get_list() 1818 >>> data.enter() 1819 >>> for i in range(count): 1820 ... type = data.next() 1821 ... if type == Data.STRING: 1822 ... print data.get_string() 1823 ... elif type == ...: 1824 ... ... 1825 >>> data.exit() 1826 """ 1827 return pn_data_get_list(self._data)
1828
1829 - def get_map(self):
1830 """ 1831 If the current node is a map, return the number of child elements, 1832 otherwise return zero. Key value pairs can be accessed by entering 1833 the map. 1834 1835 >>> count = data.get_map() 1836 >>> data.enter() 1837 >>> for i in range(count/2): 1838 ... type = data.next() 1839 ... if type == Data.STRING: 1840 ... print data.get_string() 1841 ... elif type == ...: 1842 ... ... 1843 >>> data.exit() 1844 """ 1845 return pn_data_get_map(self._data)
1846
1847 - def get_array(self):
1848 """ 1849 If the current node is an array, return a tuple of the element 1850 count, a boolean indicating whether the array is described, and 1851 the type of each element, otherwise return (0, False, None). Array 1852 data can be accessed by entering the array. 1853 1854 >>> # read an array of strings with a symbolic descriptor 1855 >>> count, described, type = data.get_array() 1856 >>> data.enter() 1857 >>> data.next() 1858 >>> print "Descriptor:", data.get_symbol() 1859 >>> for i in range(count): 1860 ... data.next() 1861 ... print "Element:", data.get_string() 1862 >>> data.exit() 1863 """ 1864 count = pn_data_get_array(self._data) 1865 described = pn_data_is_array_described(self._data) 1866 type = pn_data_get_array_type(self._data) 1867 if type == -1: 1868 type = None 1869 return count, described, type
1870
1871 - def is_described(self):
1872 """ 1873 Checks if the current node is a described value. The descriptor 1874 and value may be accessed by entering the described value. 1875 1876 >>> # read a symbolically described string 1877 >>> assert data.is_described() # will error if the current node is not described 1878 >>> data.enter() 1879 >>> data.next() 1880 >>> print data.get_symbol() 1881 >>> data.next() 1882 >>> print data.get_string() 1883 >>> data.exit() 1884 """ 1885 return pn_data_is_described(self._data)
1886
1887 - def is_null(self):
1888 """ 1889 Checks if the current node is a null. 1890 """ 1891 return pn_data_is_null(self._data)
1892
1893 - def get_bool(self):
1894 """ 1895 If the current node is a boolean, returns its value, returns False 1896 otherwise. 1897 """ 1898 return pn_data_get_bool(self._data)
1899
1900 - def get_ubyte(self):
1901 """ 1902 If the current node is an unsigned byte, returns its value, 1903 returns 0 otherwise. 1904 """ 1905 return pn_data_get_ubyte(self._data)
1906
1907 - def get_byte(self):
1908 """ 1909 If the current node is a signed byte, returns its value, returns 0 1910 otherwise. 1911 """ 1912 return pn_data_get_byte(self._data)
1913
1914 - def get_ushort(self):
1915 """ 1916 If the current node is an unsigned short, returns its value, 1917 returns 0 otherwise. 1918 """ 1919 return pn_data_get_ushort(self._data)
1920
1921 - def get_short(self):
1922 """ 1923 If the current node is a signed short, returns its value, returns 1924 0 otherwise. 1925 """ 1926 return pn_data_get_short(self._data)
1927
1928 - def get_uint(self):
1929 """ 1930 If the current node is an unsigned int, returns its value, returns 1931 0 otherwise. 1932 """ 1933 return pn_data_get_uint(self._data)
1934
1935 - def get_int(self):
1936 """ 1937 If the current node is a signed int, returns its value, returns 0 1938 otherwise. 1939 """ 1940 return int(pn_data_get_int(self._data))
1941
1942 - def get_char(self):
1943 """ 1944 If the current node is a char, returns its value, returns 0 1945 otherwise. 1946 """ 1947 return char(_compat.unichar(pn_data_get_char(self._data)))
1948
1949 - def get_ulong(self):
1950 """ 1951 If the current node is an unsigned long, returns its value, 1952 returns 0 otherwise. 1953 """ 1954 return ulong(pn_data_get_ulong(self._data))
1955
1956 - def get_long(self):
1957 """ 1958 If the current node is an signed long, returns its value, returns 1959 0 otherwise. 1960 """ 1961 return long(pn_data_get_long(self._data))
1962
1963 - def get_timestamp(self):
1964 """ 1965 If the current node is a timestamp, returns its value, returns 0 1966 otherwise. 1967 """ 1968 return timestamp(pn_data_get_timestamp(self._data))
1969
1970 - def get_float(self):
1971 """ 1972 If the current node is a float, returns its value, raises 0 1973 otherwise. 1974 """ 1975 return pn_data_get_float(self._data)
1976
1977 - def get_double(self):
1978 """ 1979 If the current node is a double, returns its value, returns 0 1980 otherwise. 1981 """ 1982 return pn_data_get_double(self._data)
1983 1984 # XXX: need to convert
1985 - def get_decimal32(self):
1986 """ 1987 If the current node is a decimal32, returns its value, returns 0 1988 otherwise. 1989 """ 1990 return pn_data_get_decimal32(self._data)
1991 1992 # XXX: need to convert
1993 - def get_decimal64(self):
1994 """ 1995 If the current node is a decimal64, returns its value, returns 0 1996 otherwise. 1997 """ 1998 return pn_data_get_decimal64(self._data)
1999 2000 # XXX: need to convert
2001 - def get_decimal128(self):
2002 """ 2003 If the current node is a decimal128, returns its value, returns 0 2004 otherwise. 2005 """ 2006 return pn_data_get_decimal128(self._data)
2007
2008 - def get_uuid(self):
2009 """ 2010 If the current node is a UUID, returns its value, returns None 2011 otherwise. 2012 """ 2013 if pn_data_type(self._data) == Data.UUID: 2014 return uuid.UUID(bytes=pn_data_get_uuid(self._data)) 2015 else: 2016 return None
2017
2018 - def get_binary(self):
2019 """ 2020 If the current node is binary, returns its value, returns "" 2021 otherwise. 2022 """ 2023 return pn_data_get_binary(self._data)
2024
2025 - def get_string(self):
2026 """ 2027 If the current node is a string, returns its value, returns "" 2028 otherwise. 2029 """ 2030 return pn_data_get_string(self._data).decode("utf8")
2031
2032 - def get_symbol(self):
2033 """ 2034 If the current node is a symbol, returns its value, returns "" 2035 otherwise. 2036 """ 2037 return symbol(pn_data_get_symbol(self._data).decode('ascii'))
2038
2039 - def copy(self, src):
2040 self._check(pn_data_copy(self._data, src._data))
2041
2042 - def format(self):
2043 sz = 16 2044 while True: 2045 err, result = pn_data_format(self._data, sz) 2046 if err == PN_OVERFLOW: 2047 sz *= 2 2048 continue 2049 else: 2050 self._check(err) 2051 return result
2052
2053 - def dump(self):
2054 pn_data_dump(self._data)
2055
2056 - def put_dict(self, d):
2057 self.put_map() 2058 self.enter() 2059 try: 2060 for k, v in d.items(): 2061 self.put_object(k) 2062 self.put_object(v) 2063 finally: 2064 self.exit()
2065
2066 - def get_dict(self):
2067 if self.enter(): 2068 try: 2069 result = {} 2070 while self.next(): 2071 k = self.get_object() 2072 if self.next(): 2073 v = self.get_object() 2074 else: 2075 v = None 2076 result[k] = v 2077 finally: 2078 self.exit() 2079 return result
2080
2081 - def put_sequence(self, s):
2082 self.put_list() 2083 self.enter() 2084 try: 2085 for o in s: 2086 self.put_object(o) 2087 finally: 2088 self.exit()
2089
2090 - def get_sequence(self):
2091 if self.enter(): 2092 try: 2093 result = [] 2094 while self.next(): 2095 result.append(self.get_object()) 2096 finally: 2097 self.exit() 2098 return result
2099
2100 - def get_py_described(self):
2101 if self.enter(): 2102 try: 2103 self.next() 2104 descriptor = self.get_object() 2105 self.next() 2106 value = self.get_object() 2107 finally: 2108 self.exit() 2109 return Described(descriptor, value)
2110
2111 - def put_py_described(self, d):
2112 self.put_described() 2113 self.enter() 2114 try: 2115 self.put_object(d.descriptor) 2116 self.put_object(d.value) 2117 finally: 2118 self.exit()
2119
2120 - def get_py_array(self):
2121 """ 2122 If the current node is an array, return an Array object 2123 representing the array and its contents. Otherwise return None. 2124 This is a convenience wrapper around get_array, enter, etc. 2125 """ 2126 2127 count, described, type = self.get_array() 2128 if type is None: return None 2129 if self.enter(): 2130 try: 2131 if described: 2132 self.next() 2133 descriptor = self.get_object() 2134 else: 2135 descriptor = UNDESCRIBED 2136 elements = [] 2137 while self.next(): 2138 elements.append(self.get_object()) 2139 finally: 2140 self.exit() 2141 return Array(descriptor, type, *elements)
2142
2143 - def put_py_array(self, a):
2144 described = a.descriptor != UNDESCRIBED 2145 self.put_array(described, a.type) 2146 self.enter() 2147 try: 2148 if described: 2149 self.put_object(a.descriptor) 2150 for e in a.elements: 2151 self.put_object(e) 2152 finally: 2153 self.exit()
2154 2155 put_mappings = { 2156 None.__class__: lambda s, _: s.put_null(), 2157 bool: put_bool, 2158 dict: put_dict, 2159 list: put_sequence, 2160 tuple: put_sequence, 2161 unicode: put_string, 2162 bytes: put_binary, 2163 symbol: put_symbol, 2164 long: put_long, 2165 char: put_char, 2166 ulong: put_ulong, 2167 timestamp: put_timestamp, 2168 float: put_double, 2169 uuid.UUID: put_uuid, 2170 Described: put_py_described, 2171 Array: put_py_array 2172 } 2173 # for python 3.x, long is merely an alias for int, but for python 2.x 2174 # we need to add an explicit int since it is a different type 2175 if int not in put_mappings: 2176 put_mappings[int] = put_int 2177 2178 get_mappings = { 2179 NULL: lambda s: None, 2180 BOOL: get_bool, 2181 BYTE: get_byte, 2182 UBYTE: get_ubyte, 2183 SHORT: get_short, 2184 USHORT: get_ushort, 2185 INT: get_int, 2186 UINT: get_uint, 2187 CHAR: get_char, 2188 LONG: get_long, 2189 ULONG: get_ulong, 2190 TIMESTAMP: get_timestamp, 2191 FLOAT: get_float, 2192 DOUBLE: get_double, 2193 DECIMAL32: get_decimal32, 2194 DECIMAL64: get_decimal64, 2195 DECIMAL128: get_decimal128, 2196 UUID: get_uuid, 2197 BINARY: get_binary, 2198 STRING: get_string, 2199 SYMBOL: get_symbol, 2200 DESCRIBED: get_py_described, 2201 ARRAY: get_py_array, 2202 LIST: get_sequence, 2203 MAP: get_dict 2204 } 2205 2206
2207 - def put_object(self, obj):
2208 putter = self.put_mappings[obj.__class__] 2209 putter(self, obj)
2210
2211 - def get_object(self):
2212 type = self.type() 2213 if type is None: return None 2214 getter = self.get_mappings.get(type) 2215 if getter: 2216 return getter(self) 2217 else: 2218 return UnmappedType(str(type))
2219
2220 -class ConnectionException(ProtonException):
2221 pass
2222
2223 -class Endpoint(object):
2224 2225 LOCAL_UNINIT = PN_LOCAL_UNINIT 2226 REMOTE_UNINIT = PN_REMOTE_UNINIT 2227 LOCAL_ACTIVE = PN_LOCAL_ACTIVE 2228 REMOTE_ACTIVE = PN_REMOTE_ACTIVE 2229 LOCAL_CLOSED = PN_LOCAL_CLOSED 2230 REMOTE_CLOSED = PN_REMOTE_CLOSED 2231
2232 - def _init(self):
2233 self.condition = None
2234
2235 - def _update_cond(self):
2236 obj2cond(self.condition, self._get_cond_impl())
2237 2238 @property
2239 - def remote_condition(self):
2240 return cond2obj(self._get_remote_cond_impl())
2241 2242 # the following must be provided by subclasses
2243 - def _get_cond_impl(self):
2244 assert False, "Subclass must override this!"
2245
2246 - def _get_remote_cond_impl(self):
2247 assert False, "Subclass must override this!"
2248
2249 - def _get_handler(self):
2250 from . import reactor 2251 ractor = reactor.Reactor.wrap(pn_object_reactor(self._impl)) 2252 if ractor: 2253 on_error = ractor.on_error 2254 else: 2255 on_error = None 2256 record = self._get_attachments() 2257 return WrappedHandler.wrap(pn_record_get_handler(record), on_error)
2258
2259 - def _set_handler(self, handler):
2260 from . import reactor 2261 ractor = reactor.Reactor.wrap(pn_object_reactor(self._impl)) 2262 if ractor: 2263 on_error = ractor.on_error 2264 else: 2265 on_error = None 2266 impl = _chandler(handler, on_error) 2267 record = self._get_attachments() 2268 pn_record_set_handler(record, impl) 2269 pn_decref(impl)
2270 2271 handler = property(_get_handler, _set_handler) 2272 2273 @property
2274 - def transport(self):
2275 return self.connection.transport
2276
2277 -class Condition:
2278
2279 - def __init__(self, name, description=None, info=None):
2280 self.name = name 2281 self.description = description 2282 self.info = info
2283
2284 - def __repr__(self):
2285 return "Condition(%s)" % ", ".join([repr(x) for x in 2286 (self.name, self.description, self.info) 2287 if x])
2288
2289 - def __eq__(self, o):
2290 if not isinstance(o, Condition): return False 2291 return self.name == o.name and \ 2292 self.description == o.description and \ 2293 self.info == o.info
2294
2295 -def obj2cond(obj, cond):
2296 pn_condition_clear(cond) 2297 if obj: 2298 pn_condition_set_name(cond, str(obj.name)) 2299 pn_condition_set_description(cond, obj.description) 2300 info = Data(pn_condition_info(cond)) 2301 if obj.info: 2302 info.put_object(obj.info)
2303
2304 -def cond2obj(cond):
2305 if pn_condition_is_set(cond): 2306 return Condition(pn_condition_get_name(cond), 2307 pn_condition_get_description(cond), 2308 dat2obj(pn_condition_info(cond))) 2309 else: 2310 return None
2311
2312 -def dat2obj(dimpl):
2313 if dimpl: 2314 d = Data(dimpl) 2315 d.rewind() 2316 d.next() 2317 obj = d.get_object() 2318 d.rewind() 2319 return obj
2320
2321 -def obj2dat(obj, dimpl):
2322 if obj is not None: 2323 d = Data(dimpl) 2324 d.put_object(obj)
2325
2326 -def secs2millis(secs):
2327 return long(secs*1000)
2328
2329 -def millis2secs(millis):
2330 return float(millis)/1000.0
2331
2332 -def timeout2millis(secs):
2333 if secs is None: return PN_MILLIS_MAX 2334 return secs2millis(secs)
2335
2336 -def millis2timeout(millis):
2337 if millis == PN_MILLIS_MAX: return None 2338 return millis2secs(millis)
2339
2340 -def unicode2utf8(string):
2341 """Some Proton APIs expect a null terminated string. Convert python text 2342 types to UTF8 to avoid zero bytes introduced by other multi-byte encodings. 2343 This method will throw if the string cannot be converted. 2344 """ 2345 if string is None: 2346 return None 2347 if _compat.IS_PY2: 2348 if isinstance(string, unicode): 2349 return string.encode('utf-8') 2350 elif isinstance(string, str): 2351 return string 2352 else: 2353 # decoding a string results in bytes 2354 if isinstance(string, str): 2355 string = string.encode('utf-8') 2356 # fall through 2357 if isinstance(string, bytes): 2358 return string.decode('utf-8') 2359 raise TypeError("Unrecognized string type: %r (%s)" % (string, type(string)))
2360
2361 -def utf82unicode(string):
2362 """Covert C strings returned from proton-c into python unicode""" 2363 if string is None: 2364 return None 2365 if isinstance(string, _compat.TEXT_TYPES): 2366 # already unicode 2367 return string 2368 elif isinstance(string, _compat.BINARY_TYPES): 2369 return string.decode('utf8') 2370 else: 2371 raise TypeError("Unrecognized string type")
2372
2373 -class Connection(Wrapper, Endpoint):
2374 """ 2375 A representation of an AMQP connection 2376 """ 2377 2378 @staticmethod
2379 - def wrap(impl):
2380 if impl is None: 2381 return None 2382 else: 2383 return Connection(impl)
2384
2385 - def __init__(self, impl = pn_connection):
2386 Wrapper.__init__(self, impl, pn_connection_attachments)
2387
2388 - def _init(self):
2389 Endpoint._init(self) 2390 self.offered_capabilities = None 2391 self.desired_capabilities = None 2392 self.properties = None
2393
2394 - def _get_attachments(self):
2395 return pn_connection_attachments(self._impl)
2396 2397 @property
2398 - def connection(self):
2399 return self
2400 2401 @property
2402 - def transport(self):
2403 return Transport.wrap(pn_connection_transport(self._impl))
2404
2405 - def _check(self, err):
2406 if err < 0: 2407 exc = EXCEPTIONS.get(err, ConnectionException) 2408 raise exc("[%s]: %s" % (err, pn_connection_error(self._impl))) 2409 else: 2410 return err
2411
2412 - def _get_cond_impl(self):
2413 return pn_connection_condition(self._impl)
2414
2415 - def _get_remote_cond_impl(self):
2416 return pn_connection_remote_condition(self._impl)
2417
2418 - def collect(self, collector):
2419 if collector is None: 2420 pn_connection_collect(self._impl, None) 2421 else: 2422 pn_connection_collect(self._impl, collector._impl) 2423 self._collector = weakref.ref(collector)
2424
2425 - def _get_container(self):
2426 return utf82unicode(pn_connection_get_container(self._impl))
2427 - def _set_container(self, name):
2428 return pn_connection_set_container(self._impl, unicode2utf8(name))
2429 2430 container = property(_get_container, _set_container) 2431
2432 - def _get_hostname(self):
2433 return utf82unicode(pn_connection_get_hostname(self._impl))
2434 - def _set_hostname(self, name):
2435 return pn_connection_set_hostname(self._impl, unicode2utf8(name))
2436 2437 hostname = property(_get_hostname, _set_hostname) 2438
2439 - def _get_user(self):
2440 return utf82unicode(pn_connection_get_user(self._impl))
2441 - def _set_user(self, name):
2442 return pn_connection_set_user(self._impl, unicode2utf8(name))
2443 2444 user = property(_get_user, _set_user) 2445
2446 - def _get_password(self):
2447 return None
2448 - def _set_password(self, name):
2449 return pn_connection_set_password(self._impl, unicode2utf8(name))
2450 2451 password = property(_get_password, _set_password) 2452 2453 @property
2454 - def remote_container(self):
2455 """The container identifier specified by the remote peer for this connection.""" 2456 return pn_connection_remote_container(self._impl)
2457 2458 @property
2459 - def remote_hostname(self):
2460 """The hostname specified by the remote peer for this connection.""" 2461 return pn_connection_remote_hostname(self._impl)
2462 2463 @property
2465 """The capabilities offered by the remote peer for this connection.""" 2466 return dat2obj(pn_connection_remote_offered_capabilities(self._impl))
2467 2468 @property
2470 """The capabilities desired by the remote peer for this connection.""" 2471 return dat2obj(pn_connection_remote_desired_capabilities(self._impl))
2472 2473 @property
2474 - def remote_properties(self):
2475 """The properties specified by the remote peer for this connection.""" 2476 return dat2obj(pn_connection_remote_properties(self._impl))
2477
2478 - def open(self):
2479 """ 2480 Opens the connection. 2481 2482 In more detail, this moves the local state of the connection to 2483 the ACTIVE state and triggers an open frame to be sent to the 2484 peer. A connection is fully active once both peers have opened it. 2485 """ 2486 obj2dat(self.offered_capabilities, 2487 pn_connection_offered_capabilities(self._impl)) 2488 obj2dat(self.desired_capabilities, 2489 pn_connection_desired_capabilities(self._impl)) 2490 obj2dat(self.properties, pn_connection_properties(self._impl)) 2491 pn_connection_open(self._impl)
2492
2493 - def close(self):
2494 """ 2495 Closes the connection. 2496 2497 In more detail, this moves the local state of the connection to 2498 the CLOSED state and triggers a close frame to be sent to the 2499 peer. A connection is fully closed once both peers have closed it. 2500 """ 2501 self._update_cond() 2502 pn_connection_close(self._impl)
2503 2504 @property
2505 - def state(self):
2506 """ 2507 The state of the connection as a bit field. The state has a local 2508 and a remote component. Each of these can be in one of three 2509 states: UNINIT, ACTIVE or CLOSED. These can be tested by masking 2510 against LOCAL_UNINIT, LOCAL_ACTIVE, LOCAL_CLOSED, REMOTE_UNINIT, 2511 REMOTE_ACTIVE and REMOTE_CLOSED. 2512 """ 2513 return pn_connection_state(self._impl)
2514
2515 - def session(self):
2516 """ 2517 Returns a new session on this connection. 2518 """ 2519 ssn = pn_session(self._impl) 2520 if ssn is None: 2521 raise(SessionException("Session allocation failed.")) 2522 else: 2523 return Session(ssn)
2524
2525 - def session_head(self, mask):
2526 return Session.wrap(pn_session_head(self._impl, mask))
2527 2530 2531 @property
2532 - def work_head(self):
2533 return Delivery.wrap(pn_work_head(self._impl))
2534 2535 @property
2536 - def error(self):
2537 return pn_error_code(pn_connection_error(self._impl))
2538
2539 - def free(self):
2540 pn_connection_release(self._impl)
2541
2542 -class SessionException(ProtonException):
2543 pass
2544
2545 -class Session(Wrapper, Endpoint):
2546 2547 @staticmethod
2548 - def wrap(impl):
2549 if impl is None: 2550 return None 2551 else: 2552 return Session(impl)
2553
2554 - def __init__(self, impl):
2555 Wrapper.__init__(self, impl, pn_session_attachments)
2556
2557 - def _get_attachments(self):
2558 return pn_session_attachments(self._impl)
2559
2560 - def _get_cond_impl(self):
2561 return pn_session_condition(self._impl)
2562
2563 - def _get_remote_cond_impl(self):
2564 return pn_session_remote_condition(self._impl)
2565
2566 - def _get_incoming_capacity(self):
2567 return pn_session_get_incoming_capacity(self._impl)
2568
2569 - def _set_incoming_capacity(self, capacity):
2570 pn_session_set_incoming_capacity(self._impl, capacity)
2571 2572 incoming_capacity = property(_get_incoming_capacity, _set_incoming_capacity) 2573
2574 - def _get_outgoing_window(self):
2575 return pn_session_get_outgoing_window(self._impl)
2576
2577 - def _set_outgoing_window(self, window):
2578 pn_session_set_outgoing_window(self._impl, window)
2579 2580 outgoing_window = property(_get_outgoing_window, _set_outgoing_window) 2581 2582 @property
2583 - def outgoing_bytes(self):
2584 return pn_session_outgoing_bytes(self._impl)
2585 2586 @property
2587 - def incoming_bytes(self):
2588 return pn_session_incoming_bytes(self._impl)
2589
2590 - def open(self):
2591 pn_session_open(self._impl)
2592
2593 - def close(self):
2594 self._update_cond() 2595 pn_session_close(self._impl)
2596
2597 - def next(self, mask):
2598 return Session.wrap(pn_session_next(self._impl, mask))
2599 2600 @property
2601 - def state(self):
2602 return pn_session_state(self._impl)
2603 2604 @property
2605 - def connection(self):
2606 return Connection.wrap(pn_session_connection(self._impl))
2607
2608 - def sender(self, name):
2609 return Sender(pn_sender(self._impl, unicode2utf8(name)))
2610
2611 - def receiver(self, name):
2612 return Receiver(pn_receiver(self._impl, unicode2utf8(name)))
2613
2614 - def free(self):
2615 pn_session_free(self._impl)
2616
2617 -class LinkException(ProtonException):
2618 pass
2619 2802
2803 -class Terminus(object):
2804 2805 UNSPECIFIED = PN_UNSPECIFIED 2806 SOURCE = PN_SOURCE 2807 TARGET = PN_TARGET 2808 COORDINATOR = PN_COORDINATOR 2809 2810 NONDURABLE = PN_NONDURABLE 2811 CONFIGURATION = PN_CONFIGURATION 2812 DELIVERIES = PN_DELIVERIES 2813 2814 DIST_MODE_UNSPECIFIED = PN_DIST_MODE_UNSPECIFIED 2815 DIST_MODE_COPY = PN_DIST_MODE_COPY 2816 DIST_MODE_MOVE = PN_DIST_MODE_MOVE 2817 2818 EXPIRE_WITH_LINK = PN_EXPIRE_WITH_LINK 2819 EXPIRE_WITH_SESSION = PN_EXPIRE_WITH_SESSION 2820 EXPIRE_WITH_CONNECTION = PN_EXPIRE_WITH_CONNECTION 2821 EXPIRE_NEVER = PN_EXPIRE_NEVER 2822
2823 - def __init__(self, impl):
2824 self._impl = impl
2825
2826 - def _check(self, err):
2827 if err < 0: 2828 exc = EXCEPTIONS.get(err, LinkException) 2829 raise exc("[%s]" % err) 2830 else: 2831 return err
2832
2833 - def _get_type(self):
2834 return pn_terminus_get_type(self._impl)
2835 - def _set_type(self, type):
2836 self._check(pn_terminus_set_type(self._impl, type))
2837 type = property(_get_type, _set_type) 2838
2839 - def _get_address(self):
2840 """The address that identifies the source or target node""" 2841 return utf82unicode(pn_terminus_get_address(self._impl))
2842 - def _set_address(self, address):
2843 self._check(pn_terminus_set_address(self._impl, unicode2utf8(address)))
2844 address = property(_get_address, _set_address) 2845
2846 - def _get_durability(self):
2847 return pn_terminus_get_durability(self._impl)
2848 - def _set_durability(self, seconds):
2849 self._check(pn_terminus_set_durability(self._impl, seconds))
2850 durability = property(_get_durability, _set_durability) 2851
2852 - def _get_expiry_policy(self):
2853 return pn_terminus_get_expiry_policy(self._impl)
2854 - def _set_expiry_policy(self, seconds):
2855 self._check(pn_terminus_set_expiry_policy(self._impl, seconds))
2856 expiry_policy = property(_get_expiry_policy, _set_expiry_policy) 2857
2858 - def _get_timeout(self):
2859 return pn_terminus_get_timeout(self._impl)
2860 - def _set_timeout(self, seconds):
2861 self._check(pn_terminus_set_timeout(self._impl, seconds))
2862 timeout = property(_get_timeout, _set_timeout) 2863
2864 - def _is_dynamic(self):
2865 """Indicates whether the source or target node was dynamically 2866 created""" 2867 return pn_terminus_is_dynamic(self._impl)
2868 - def _set_dynamic(self, dynamic):
2869 self._check(pn_terminus_set_dynamic(self._impl, dynamic))
2870 dynamic = property(_is_dynamic, _set_dynamic) 2871
2872 - def _get_distribution_mode(self):
2873 return pn_terminus_get_distribution_mode(self._impl)
2874 - def _set_distribution_mode(self, mode):
2875 self._check(pn_terminus_set_distribution_mode(self._impl, mode))
2876 distribution_mode = property(_get_distribution_mode, _set_distribution_mode) 2877 2878 @property
2879 - def properties(self):
2880 """Properties of a dynamic source or target.""" 2881 return Data(pn_terminus_properties(self._impl))
2882 2883 @property
2884 - def capabilities(self):
2885 """Capabilities of the source or target.""" 2886 return Data(pn_terminus_capabilities(self._impl))
2887 2888 @property
2889 - def outcomes(self):
2890 return Data(pn_terminus_outcomes(self._impl))
2891 2892 @property
2893 - def filter(self):
2894 """A filter on a source allows the set of messages transfered over 2895 the link to be restricted""" 2896 return Data(pn_terminus_filter(self._impl))
2897
2898 - def copy(self, src):
2899 self._check(pn_terminus_copy(self._impl, src._impl))
2900
2901 -class Sender(Link):
2902 """ 2903 A link over which messages are sent. 2904 """ 2905
2906 - def offered(self, n):
2907 pn_link_offered(self._impl, n)
2908
2909 - def stream(self, data):
2910 """ 2911 Send specified data as part of the current delivery 2912 2913 @type data: binary 2914 @param data: data to send 2915 """ 2916 return self._check(pn_link_send(self._impl, data))
2917
2918 - def send(self, obj, tag=None):
2919 """ 2920 Send specified object over this sender; the object is expected to 2921 have a send() method on it that takes the sender and an optional 2922 tag as arguments. 2923 2924 Where the object is a Message, this will send the message over 2925 this link, creating a new delivery for the purpose. 2926 """ 2927 if hasattr(obj, 'send'): 2928 return obj.send(self, tag=tag) 2929 else: 2930 # treat object as bytes 2931 return self.stream(obj)
2932
2933 - def delivery_tag(self):
2934 if not hasattr(self, 'tag_generator'): 2935 def simple_tags(): 2936 count = 1 2937 while True: 2938 yield str(count) 2939 count += 1
2940 self.tag_generator = simple_tags() 2941 return next(self.tag_generator)
2942
2943 -class Receiver(Link):
2944 """ 2945 A link over which messages are received. 2946 """ 2947
2948 - def flow(self, n):
2949 """Increases the credit issued to the remote sender by the specified number of messages.""" 2950 pn_link_flow(self._impl, n)
2951
2952 - def recv(self, limit):
2953 n, binary = pn_link_recv(self._impl, limit) 2954 if n == PN_EOS: 2955 return None 2956 else: 2957 self._check(n) 2958 return binary
2959
2960 - def drain(self, n):
2961 pn_link_drain(self._impl, n)
2962
2963 - def draining(self):
2964 return pn_link_draining(self._impl)
2965
2966 -class NamedInt(int):
2967 2968 values = {} 2969
2970 - def __new__(cls, i, name):
2971 ni = super(NamedInt, cls).__new__(cls, i) 2972 cls.values[i] = ni 2973 return ni
2974
2975 - def __init__(self, i, name):
2976 self.name = name
2977
2978 - def __repr__(self):
2979 return self.name
2980
2981 - def __str__(self):
2982 return self.name
2983 2984 @classmethod
2985 - def get(cls, i):
2986 return cls.values.get(i, i)
2987
2988 -class DispositionType(NamedInt):
2989 values = {}
2990
2991 -class Disposition(object):
2992 2993 RECEIVED = DispositionType(PN_RECEIVED, "RECEIVED") 2994 ACCEPTED = DispositionType(PN_ACCEPTED, "ACCEPTED") 2995 REJECTED = DispositionType(PN_REJECTED, "REJECTED") 2996 RELEASED = DispositionType(PN_RELEASED, "RELEASED") 2997 MODIFIED = DispositionType(PN_MODIFIED, "MODIFIED") 2998
2999 - def __init__(self, impl, local):
3000 self._impl = impl 3001 self.local = local 3002 self._data = None 3003 self._condition = None 3004 self._annotations = None
3005 3006 @property
3007 - def type(self):
3008 return DispositionType.get(pn_disposition_type(self._impl))
3009
3010 - def _get_section_number(self):
3011 return pn_disposition_get_section_number(self._impl)
3012 - def _set_section_number(self, n):
3013 pn_disposition_set_section_number(self._impl, n)
3014 section_number = property(_get_section_number, _set_section_number) 3015
3016 - def _get_section_offset(self):
3017 return pn_disposition_get_section_offset(self._impl)
3018 - def _set_section_offset(self, n):
3019 pn_disposition_set_section_offset(self._impl, n)
3020 section_offset = property(_get_section_offset, _set_section_offset) 3021
3022 - def _get_failed(self):
3023 return pn_disposition_is_failed(self._impl)
3024 - def _set_failed(self, b):
3025 pn_disposition_set_failed(self._impl, b)
3026 failed = property(_get_failed, _set_failed) 3027
3028 - def _get_undeliverable(self):
3029 return pn_disposition_is_undeliverable(self._impl)
3030 - def _set_undeliverable(self, b):
3031 pn_disposition_set_undeliverable(self._impl, b)
3032 undeliverable = property(_get_undeliverable, _set_undeliverable) 3033
3034 - def _get_data(self):
3035 if self.local: 3036 return self._data 3037 else: 3038 return dat2obj(pn_disposition_data(self._impl))
3039 - def _set_data(self, obj):
3040 if self.local: 3041 self._data = obj 3042 else: 3043 raise AttributeError("data attribute is read-only")
3044 data = property(_get_data, _set_data) 3045
3046 - def _get_annotations(self):
3047 if self.local: 3048 return self._annotations 3049 else: 3050 return dat2obj(pn_disposition_annotations(self._impl))
3051 - def _set_annotations(self, obj):
3052 if self.local: 3053 self._annotations = obj 3054 else: 3055 raise AttributeError("annotations attribute is read-only")
3056 annotations = property(_get_annotations, _set_annotations) 3057
3058 - def _get_condition(self):
3059 if self.local: 3060 return self._condition 3061 else: 3062 return cond2obj(pn_disposition_condition(self._impl))
3063 - def _set_condition(self, obj):
3064 if self.local: 3065 self._condition = obj 3066 else: 3067 raise AttributeError("condition attribute is read-only")
3068 condition = property(_get_condition, _set_condition)
3069
3070 -class Delivery(Wrapper):
3071 """ 3072 Tracks and/or records the delivery of a message over a link. 3073 """ 3074 3075 RECEIVED = Disposition.RECEIVED 3076 ACCEPTED = Disposition.ACCEPTED 3077 REJECTED = Disposition.REJECTED 3078 RELEASED = Disposition.RELEASED 3079 MODIFIED = Disposition.MODIFIED 3080 3081 @staticmethod
3082 - def wrap(impl):
3083 if impl is None: 3084 return None 3085 else: 3086 return Delivery(impl)
3087
3088 - def __init__(self, impl):
3089 Wrapper.__init__(self, impl, pn_delivery_attachments)
3090
3091 - def _init(self):
3092 self.local = Disposition(pn_delivery_local(self._impl), True) 3093 self.remote = Disposition(pn_delivery_remote(self._impl), False)
3094 3095 @property
3096 - def tag(self):
3097 """The identifier for the delivery.""" 3098 return pn_delivery_tag(self._impl)
3099 3100 @property
3101 - def writable(self):
3102 """Returns true for an outgoing delivery to which data can now be written.""" 3103 return pn_delivery_writable(self._impl)
3104 3105 @property
3106 - def readable(self):
3107 """Returns true for an incoming delivery that has data to read.""" 3108 return pn_delivery_readable(self._impl)
3109 3110 @property
3111 - def updated(self):
3112 """Returns true if the state of the delivery has been updated 3113 (e.g. it has been settled and/or accepted, rejected etc).""" 3114 return pn_delivery_updated(self._impl)
3115
3116 - def update(self, state):
3117 """ 3118 Set the local state of the delivery e.g. ACCEPTED, REJECTED, RELEASED. 3119 """ 3120 obj2dat(self.local._data, pn_disposition_data(self.local._impl)) 3121 obj2dat(self.local._annotations, pn_disposition_annotations(self.local._impl)) 3122 obj2cond(self.local._condition, pn_disposition_condition(self.local._impl)) 3123 pn_delivery_update(self._impl, state)
3124 3125 @property
3126 - def pending(self):
3127 return pn_delivery_pending(self._impl)
3128 3129 @property
3130 - def partial(self):
3131 """ 3132 Returns true for an incoming delivery if not all the data is 3133 yet available. 3134 """ 3135 return pn_delivery_partial(self._impl)
3136 3137 @property
3138 - def local_state(self):
3139 """Returns the local state of the delivery.""" 3140 return DispositionType.get(pn_delivery_local_state(self._impl))
3141 3142 @property
3143 - def remote_state(self):
3144 """ 3145 Returns the state of the delivery as indicated by the remote 3146 peer. 3147 """ 3148 return DispositionType.get(pn_delivery_remote_state(self._impl))
3149 3150 @property
3151 - def settled(self):
3152 """ 3153 Returns true if the delivery has been settled by the remote peer. 3154 """ 3155 return pn_delivery_settled(self._impl)
3156
3157 - def settle(self):
3158 """ 3159 Settles the delivery locally. This indicates the aplication 3160 considers the delivery complete and does not wish to receive any 3161 further events about it. Every delivery should be settled locally. 3162 """ 3163 pn_delivery_settle(self._impl)
3164 3165 @property
3166 - def work_next(self):
3167 return Delivery.wrap(pn_work_next(self._impl))
3168 3169 @property 3175 3176 @property
3177 - def session(self):
3178 """ 3179 Returns the session over which the delivery was sent or received. 3180 """ 3181 return self.link.session
3182 3183 @property
3184 - def connection(self):
3185 """ 3186 Returns the connection over which the delivery was sent or received. 3187 """ 3188 return self.session.connection
3189 3190 @property
3191 - def transport(self):
3192 return self.connection.transport
3193
3194 -class TransportException(ProtonException):
3195 pass
3196
3197 -class TraceAdapter:
3198
3199 - def __init__(self, tracer):
3200 self.tracer = tracer
3201
3202 - def __call__(self, trans_impl, message):
3203 self.tracer(Transport.wrap(trans_impl), message)
3204
3205 -class Transport(Wrapper):
3206 3207 TRACE_OFF = PN_TRACE_OFF 3208 TRACE_DRV = PN_TRACE_DRV 3209 TRACE_FRM = PN_TRACE_FRM 3210 TRACE_RAW = PN_TRACE_RAW 3211 3212 CLIENT = 1 3213 SERVER = 2 3214 3215 @staticmethod
3216 - def wrap(impl):
3217 if impl is None: 3218 return None 3219 else: 3220 return Transport(_impl=impl)
3221
3222 - def __init__(self, mode=None, _impl = pn_transport):
3223 Wrapper.__init__(self, _impl, pn_transport_attachments) 3224 if mode == Transport.SERVER: 3225 pn_transport_set_server(self._impl) 3226 elif mode is None or mode==Transport.CLIENT: 3227 pass 3228 else: 3229 raise TransportException("Cannot initialise Transport from mode: %s" % str(mode))
3230
3231 - def _init(self):
3232 self._sasl = None 3233 self._ssl = None
3234
3235 - def _check(self, err):
3236 if err < 0: 3237 exc = EXCEPTIONS.get(err, TransportException) 3238 raise exc("[%s]: %s" % (err, pn_error_text(pn_transport_error(self._impl)))) 3239 else: 3240 return err
3241
3242 - def _set_tracer(self, tracer):
3243 pn_transport_set_pytracer(self._impl, TraceAdapter(tracer));
3244
3245 - def _get_tracer(self):
3246 adapter = pn_transport_get_pytracer(self._impl) 3247 if adapter: 3248 return adapter.tracer 3249 else: 3250 return None
3251 3252 tracer = property(_get_tracer, _set_tracer, 3253 doc=""" 3254 A callback for trace logging. The callback is passed the transport and log message. 3255 """) 3256
3257 - def log(self, message):
3258 pn_transport_log(self._impl, message)
3259
3260 - def require_auth(self, bool):
3261 pn_transport_require_auth(self._impl, bool)
3262 3263 @property
3264 - def authenticated(self):
3265 return pn_transport_is_authenticated(self._impl)
3266
3267 - def require_encryption(self, bool):
3268 pn_transport_require_encryption(self._impl, bool)
3269 3270 @property
3271 - def encrypted(self):
3272 return pn_transport_is_encrypted(self._impl)
3273 3274 @property
3275 - def user(self):
3276 return pn_transport_get_user(self._impl)
3277
3278 - def bind(self, connection):
3279 """Assign a connection to the transport""" 3280 self._check(pn_transport_bind(self._impl, connection._impl))
3281
3282 - def unbind(self):
3283 """Release the connection""" 3284 self._check(pn_transport_unbind(self._impl))
3285
3286 - def trace(self, n):
3287 pn_transport_trace(self._impl, n)
3288
3289 - def tick(self, now):
3290 """Process any timed events (like heartbeat generation). 3291 now = seconds since epoch (float). 3292 """ 3293 return millis2secs(pn_transport_tick(self._impl, secs2millis(now)))
3294
3295 - def capacity(self):
3296 c = pn_transport_capacity(self._impl) 3297 if c >= PN_EOS: 3298 return c 3299 else: 3300 return self._check(c)
3301
3302 - def push(self, binary):
3303 n = self._check(pn_transport_push(self._impl, binary)) 3304 if n != len(binary): 3305 raise OverflowError("unable to process all bytes: %s, %s" % (n, len(binary)))
3306
3307 - def close_tail(self):
3308 self._check(pn_transport_close_tail(self._impl))
3309
3310 - def pending(self):
3311 p = pn_transport_pending(self._impl) 3312 if p >= PN_EOS: 3313 return p 3314 else: 3315 return self._check(p)
3316
3317 - def peek(self, size):
3318 cd, out = pn_transport_peek(self._impl, size) 3319 if cd == PN_EOS: 3320 return None 3321 else: 3322 self._check(cd) 3323 return out
3324
3325 - def pop(self, size):
3326 pn_transport_pop(self._impl, size)
3327
3328 - def close_head(self):
3329 self._check(pn_transport_close_head(self._impl))
3330 3331 @property
3332 - def closed(self):
3333 return pn_transport_closed(self._impl)
3334 3335 # AMQP 1.0 max-frame-size
3336 - def _get_max_frame_size(self):
3337 return pn_transport_get_max_frame(self._impl)
3338
3339 - def _set_max_frame_size(self, value):
3340 pn_transport_set_max_frame(self._impl, value)
3341 3342 max_frame_size = property(_get_max_frame_size, _set_max_frame_size, 3343 doc=""" 3344 Sets the maximum size for received frames (in bytes). 3345 """) 3346 3347 @property
3348 - def remote_max_frame_size(self):
3349 return pn_transport_get_remote_max_frame(self._impl)
3350
3351 - def _get_channel_max(self):
3352 return pn_transport_get_channel_max(self._impl)
3353
3354 - def _set_channel_max(self, value):
3355 if pn_transport_set_channel_max(self._impl, value): 3356 raise SessionException("Too late to change channel max.")
3357 3358 channel_max = property(_get_channel_max, _set_channel_max, 3359 doc=""" 3360 Sets the maximum channel that may be used on the transport. 3361 """) 3362 3363 @property
3364 - def remote_channel_max(self):
3365 return pn_transport_remote_channel_max(self._impl)
3366 3367 # AMQP 1.0 idle-time-out
3368 - def _get_idle_timeout(self):
3369 return millis2secs(pn_transport_get_idle_timeout(self._impl))
3370
3371 - def _set_idle_timeout(self, sec):
3372 pn_transport_set_idle_timeout(self._impl, secs2millis(sec))
3373 3374 idle_timeout = property(_get_idle_timeout, _set_idle_timeout, 3375 doc=""" 3376 The idle timeout of the connection (float, in seconds). 3377 """) 3378 3379 @property
3380 - def remote_idle_timeout(self):
3381 return millis2secs(pn_transport_get_remote_idle_timeout(self._impl))
3382 3383 @property
3384 - def frames_output(self):
3385 return pn_transport_get_frames_output(self._impl)
3386 3387 @property
3388 - def frames_input(self):
3389 return pn_transport_get_frames_input(self._impl)
3390
3391 - def sasl(self):
3392 return SASL(self)
3393
3394 - def ssl(self, domain=None, session_details=None):
3395 # SSL factory (singleton for this transport) 3396 if not self._ssl: 3397 self._ssl = SSL(self, domain, session_details) 3398 return self._ssl
3399 3400 @property
3401 - def condition(self):
3402 return cond2obj(pn_transport_condition(self._impl))
3403 3404 @property
3405 - def connection(self):
3406 return Connection.wrap(pn_transport_connection(self._impl))
3407
3408 -class SASLException(TransportException):
3409 pass
3410
3411 -class SASL(Wrapper):
3412 3413 OK = PN_SASL_OK 3414 AUTH = PN_SASL_AUTH 3415 SYS = PN_SASL_SYS 3416 PERM = PN_SASL_PERM 3417 TEMP = PN_SASL_TEMP 3418 3419 @staticmethod
3420 - def extended():
3421 return pn_sasl_extended()
3422
3423 - def __init__(self, transport):
3424 Wrapper.__init__(self, transport._impl, pn_transport_attachments) 3425 self._sasl = pn_sasl(transport._impl)
3426
3427 - def _check(self, err):
3428 if err < 0: 3429 exc = EXCEPTIONS.get(err, SASLException) 3430 raise exc("[%s]" % (err)) 3431 else: 3432 return err
3433 3434 @property
3435 - def user(self):
3436 return pn_sasl_get_user(self._sasl)
3437 3438 @property
3439 - def mech(self):
3440 return pn_sasl_get_mech(self._sasl)
3441 3442 @property
3443 - def outcome(self):
3444 outcome = pn_sasl_outcome(self._sasl) 3445 if outcome == PN_SASL_NONE: 3446 return None 3447 else: 3448 return outcome
3449
3450 - def allowed_mechs(self, mechs):
3451 pn_sasl_allowed_mechs(self._sasl, mechs)
3452
3453 - def _get_allow_insecure_mechs(self):
3454 return pn_sasl_get_allow_insecure_mechs(self._sasl)
3455
3456 - def _set_allow_insecure_mechs(self, insecure):
3457 pn_sasl_set_allow_insecure_mechs(self._sasl, insecure)
3458 3459 allow_insecure_mechs = property(_get_allow_insecure_mechs, _set_allow_insecure_mechs, 3460 doc=""" 3461 Allow unencrypted cleartext passwords (PLAIN mech) 3462 """) 3463
3464 - def done(self, outcome):
3465 pn_sasl_done(self._sasl, outcome)
3466
3467 - def config_name(self, name):
3468 pn_sasl_config_name(self._sasl, name)
3469
3470 - def config_path(self, path):
3471 pn_sasl_config_path(self._sasl, path)
3472
3473 -class SSLException(TransportException):
3474 pass
3475
3476 -class SSLUnavailable(SSLException):
3477 pass
3478
3479 -class SSLDomain(object):
3480 3481 MODE_CLIENT = PN_SSL_MODE_CLIENT 3482 MODE_SERVER = PN_SSL_MODE_SERVER 3483 VERIFY_PEER = PN_SSL_VERIFY_PEER 3484 VERIFY_PEER_NAME = PN_SSL_VERIFY_PEER_NAME 3485 ANONYMOUS_PEER = PN_SSL_ANONYMOUS_PEER 3486
3487 - def __init__(self, mode):
3488 self._domain = pn_ssl_domain(mode) 3489 if self._domain is None: 3490 raise SSLUnavailable()
3491
3492 - def _check(self, err):
3493 if err < 0: 3494 exc = EXCEPTIONS.get(err, SSLException) 3495 raise exc("SSL failure.") 3496 else: 3497 return err
3498
3499 - def set_credentials(self, cert_file, key_file, password):
3500 return self._check( pn_ssl_domain_set_credentials(self._domain, 3501 cert_file, key_file, 3502 password) )
3503 - def set_trusted_ca_db(self, certificate_db):
3504 return self._check( pn_ssl_domain_set_trusted_ca_db(self._domain, 3505 certificate_db) )
3506 - def set_peer_authentication(self, verify_mode, trusted_CAs=None):
3507 return self._check( pn_ssl_domain_set_peer_authentication(self._domain, 3508 verify_mode, 3509 trusted_CAs) )
3510
3511 - def allow_unsecured_client(self):
3512 return self._check( pn_ssl_domain_allow_unsecured_client(self._domain) )
3513
3514 - def __del__(self):
3515 pn_ssl_domain_free(self._domain)
3516
3517 -class SSL(object):
3518 3519 @staticmethod
3520 - def present():
3521 return pn_ssl_present()
3522
3523 - def _check(self, err):
3524 if err < 0: 3525 exc = EXCEPTIONS.get(err, SSLException) 3526 raise exc("SSL failure.") 3527 else: 3528 return err
3529
3530 - def __new__(cls, transport, domain, session_details=None):
3531 """Enforce a singleton SSL object per Transport""" 3532 if transport._ssl: 3533 # unfortunately, we've combined the allocation and the configuration in a 3534 # single step. So catch any attempt by the application to provide what 3535 # may be a different configuration than the original (hack) 3536 ssl = transport._ssl 3537 if (domain and (ssl._domain is not domain) or 3538 session_details and (ssl._session_details is not session_details)): 3539 raise SSLException("Cannot re-configure existing SSL object!") 3540 else: 3541 obj = super(SSL, cls).__new__(cls) 3542 obj._domain = domain 3543 obj._session_details = session_details 3544 session_id = None 3545 if session_details: 3546 session_id = session_details.get_session_id() 3547 obj._ssl = pn_ssl( transport._impl ) 3548 if obj._ssl is None: 3549 raise SSLUnavailable() 3550 if domain: 3551 pn_ssl_init( obj._ssl, domain._domain, session_id ) 3552 transport._ssl = obj 3553 return transport._ssl
3554
3555 - def cipher_name(self):
3556 rc, name = pn_ssl_get_cipher_name( self._ssl, 128 ) 3557 if rc: 3558 return name 3559 return None
3560
3561 - def protocol_name(self):
3562 rc, name = pn_ssl_get_protocol_name( self._ssl, 128 ) 3563 if rc: 3564 return name 3565 return None
3566 3567 @property
3568 - def remote_subject(self):
3569 return pn_ssl_get_remote_subject( self._ssl )
3570 3571 RESUME_UNKNOWN = PN_SSL_RESUME_UNKNOWN 3572 RESUME_NEW = PN_SSL_RESUME_NEW 3573 RESUME_REUSED = PN_SSL_RESUME_REUSED 3574
3575 - def resume_status(self):
3576 return pn_ssl_resume_status( self._ssl )
3577
3578 - def _set_peer_hostname(self, hostname):
3579 self._check(pn_ssl_set_peer_hostname( self._ssl, unicode2utf8(hostname) ))
3580 - def _get_peer_hostname(self):
3581 err, name = pn_ssl_get_peer_hostname( self._ssl, 1024 ) 3582 self._check(err) 3583 return utf82unicode(name)
3584 peer_hostname = property(_get_peer_hostname, _set_peer_hostname, 3585 doc=""" 3586 Manage the expected name of the remote peer. Used to authenticate the remote. 3587 """)
3588
3589 3590 -class SSLSessionDetails(object):
3591 """ Unique identifier for the SSL session. Used to resume previous session on a new 3592 SSL connection. 3593 """ 3594
3595 - def __init__(self, session_id):
3596 self._session_id = session_id
3597
3598 - def get_session_id(self):
3599 return self._session_id
3600 3601 3602 wrappers = { 3603 "pn_void": lambda x: pn_void2py(x), 3604 "pn_pyref": lambda x: pn_void2py(x), 3605 "pn_connection": lambda x: Connection.wrap(pn_cast_pn_connection(x)), 3606 "pn_session": lambda x: Session.wrap(pn_cast_pn_session(x)), 3607 "pn_link": lambda x: Link.wrap(pn_cast_pn_link(x)), 3608 "pn_delivery": lambda x: Delivery.wrap(pn_cast_pn_delivery(x)), 3609 "pn_transport": lambda x: Transport.wrap(pn_cast_pn_transport(x)), 3610 "pn_selectable": lambda x: Selectable.wrap(pn_cast_pn_selectable(x)) 3611 }
3612 3613 -class Collector:
3614
3615 - def __init__(self):
3616 self._impl = pn_collector()
3617
3618 - def put(self, obj, etype):
3619 pn_collector_put(self._impl, PN_PYREF, pn_py2void(obj), etype.number)
3620
3621 - def peek(self):
3622 return Event.wrap(pn_collector_peek(self._impl))
3623
3624 - def pop(self):
3625 ev = self.peek() 3626 pn_collector_pop(self._impl)
3627
3628 - def __del__(self):
3629 pn_collector_free(self._impl) 3630 del self._impl
3631
3632 -class EventType(object):
3633 3634 _lock = threading.Lock() 3635 _extended = 10000 3636 TYPES = {} 3637
3638 - def __init__(self, name=None, number=None, method=None):
3639 if name is None and number is None: 3640 raise TypeError("extended events require a name") 3641 try: 3642 self._lock.acquire() 3643 if name is None: 3644 name = pn_event_type_name(number) 3645 3646 if number is None: 3647 number = EventType._extended 3648 EventType._extended += 1 3649 3650 if method is None: 3651 method = "on_%s" % name 3652 3653 self.name = name 3654 self.number = number 3655 self.method = method 3656 3657 self.TYPES[number] = self 3658 finally: 3659 self._lock.release()
3660
3661 - def __repr__(self):
3662 return self.name
3663
3664 -def dispatch(handler, method, *args):
3665 m = getattr(handler, method, None) 3666 if m: 3667 return m(*args) 3668 elif hasattr(handler, "on_unhandled"): 3669 return handler.on_unhandled(method, *args)
3670
3671 -class EventBase(object):
3672
3673 - def __init__(self, clazz, context, type):
3674 self.clazz = clazz 3675 self.context = context 3676 self.type = type
3677
3678 - def dispatch(self, handler):
3679 return dispatch(handler, self.type.method, self)
3680
3681 -def _none(x): return None
3682 3683 DELEGATED = Constant("DELEGATED")
3684 3685 -def _core(number, method):
3686 return EventType(number=number, method=method)
3687
3688 -class Event(Wrapper, EventBase):
3689 3690 REACTOR_INIT = _core(PN_REACTOR_INIT, "on_reactor_init") 3691 REACTOR_QUIESCED = _core(PN_REACTOR_QUIESCED, "on_reactor_quiesced") 3692 REACTOR_FINAL = _core(PN_REACTOR_FINAL, "on_reactor_final") 3693 3694 TIMER_TASK = _core(PN_TIMER_TASK, "on_timer_task") 3695 3696 CONNECTION_INIT = _core(PN_CONNECTION_INIT, "on_connection_init") 3697 CONNECTION_BOUND = _core(PN_CONNECTION_BOUND, "on_connection_bound") 3698 CONNECTION_UNBOUND = _core(PN_CONNECTION_UNBOUND, "on_connection_unbound") 3699 CONNECTION_LOCAL_OPEN = _core(PN_CONNECTION_LOCAL_OPEN, "on_connection_local_open") 3700 CONNECTION_LOCAL_CLOSE = _core(PN_CONNECTION_LOCAL_CLOSE, "on_connection_local_close") 3701 CONNECTION_REMOTE_OPEN = _core(PN_CONNECTION_REMOTE_OPEN, "on_connection_remote_open") 3702 CONNECTION_REMOTE_CLOSE = _core(PN_CONNECTION_REMOTE_CLOSE, "on_connection_remote_close") 3703 CONNECTION_FINAL = _core(PN_CONNECTION_FINAL, "on_connection_final") 3704 3705 SESSION_INIT = _core(PN_SESSION_INIT, "on_session_init") 3706 SESSION_LOCAL_OPEN = _core(PN_SESSION_LOCAL_OPEN, "on_session_local_open") 3707 SESSION_LOCAL_CLOSE = _core(PN_SESSION_LOCAL_CLOSE, "on_session_local_close") 3708 SESSION_REMOTE_OPEN = _core(PN_SESSION_REMOTE_OPEN, "on_session_remote_open") 3709 SESSION_REMOTE_CLOSE = _core(PN_SESSION_REMOTE_CLOSE, "on_session_remote_close") 3710 SESSION_FINAL = _core(PN_SESSION_FINAL, "on_session_final") 3711 3712 LINK_INIT = _core(PN_LINK_INIT, "on_link_init") 3713 LINK_LOCAL_OPEN = _core(PN_LINK_LOCAL_OPEN, "on_link_local_open") 3714 LINK_LOCAL_CLOSE = _core(PN_LINK_LOCAL_CLOSE, "on_link_local_close") 3715 LINK_LOCAL_DETACH = _core(PN_LINK_LOCAL_DETACH, "on_link_local_detach") 3716 LINK_REMOTE_OPEN = _core(PN_LINK_REMOTE_OPEN, "on_link_remote_open") 3717 LINK_REMOTE_CLOSE = _core(PN_LINK_REMOTE_CLOSE, "on_link_remote_close") 3718 LINK_REMOTE_DETACH = _core(PN_LINK_REMOTE_DETACH, "on_link_remote_detach") 3719 LINK_FLOW = _core(PN_LINK_FLOW, "on_link_flow") 3720 LINK_FINAL = _core(PN_LINK_FINAL, "on_link_final") 3721 3722 DELIVERY = _core(PN_DELIVERY, "on_delivery") 3723 3724 TRANSPORT = _core(PN_TRANSPORT, "on_transport") 3725 TRANSPORT_ERROR = _core(PN_TRANSPORT_ERROR, "on_transport_error") 3726 TRANSPORT_HEAD_CLOSED = _core(PN_TRANSPORT_HEAD_CLOSED, "on_transport_head_closed") 3727 TRANSPORT_TAIL_CLOSED = _core(PN_TRANSPORT_TAIL_CLOSED, "on_transport_tail_closed") 3728 TRANSPORT_CLOSED = _core(PN_TRANSPORT_CLOSED, "on_transport_closed") 3729 3730 SELECTABLE_INIT = _core(PN_SELECTABLE_INIT, "on_selectable_init") 3731 SELECTABLE_UPDATED = _core(PN_SELECTABLE_UPDATED, "on_selectable_updated") 3732 SELECTABLE_READABLE = _core(PN_SELECTABLE_READABLE, "on_selectable_readable") 3733 SELECTABLE_WRITABLE = _core(PN_SELECTABLE_WRITABLE, "on_selectable_writable") 3734 SELECTABLE_EXPIRED = _core(PN_SELECTABLE_EXPIRED, "on_selectable_expired") 3735 SELECTABLE_ERROR = _core(PN_SELECTABLE_ERROR, "on_selectable_error") 3736 SELECTABLE_FINAL = _core(PN_SELECTABLE_FINAL, "on_selectable_final") 3737 3738 @staticmethod
3739 - def wrap(impl, number=None):
3740 if impl is None: 3741 return None 3742 3743 if number is None: 3744 number = pn_event_type(impl) 3745 3746 event = Event(impl, number) 3747 3748 if isinstance(event.context, EventBase): 3749 return event.context 3750 else: 3751 return event
3752
3753 - def __init__(self, impl, number):
3754 Wrapper.__init__(self, impl, pn_event_attachments) 3755 self.__dict__["type"] = EventType.TYPES[number]
3756
3757 - def _init(self):
3758 pass
3759 3760 @property
3761 - def clazz(self):
3762 cls = pn_event_class(self._impl) 3763 if cls: 3764 return pn_class_name(cls) 3765 else: 3766 return None
3767 3768 @property
3769 - def context(self):
3770 """Returns the context object associated with the event. The type of this depend on the type of event.""" 3771 return wrappers[self.clazz](pn_event_context(self._impl))
3772
3773 - def dispatch(self, handler, type=None):
3774 type = type or self.type 3775 if isinstance(handler, WrappedHandler): 3776 pn_handler_dispatch(handler._impl, self._impl, type.number) 3777 else: 3778 result = dispatch(handler, type.method, self) 3779 if result != DELEGATED and hasattr(handler, "handlers"): 3780 for h in handler.handlers: 3781 self.dispatch(h, type)
3782 3783 3784 @property
3785 - def reactor(self):
3786 """Returns the reactor associated with the event.""" 3787 return wrappers.get("pn_reactor", _none)(pn_event_reactor(self._impl))
3788 3789 @property
3790 - def transport(self):
3791 """Returns the transport associated with the event, or null if none is associated with it.""" 3792 return Transport.wrap(pn_event_transport(self._impl))
3793 3794 @property
3795 - def connection(self):
3796 """Returns the connection associated with the event, or null if none is associated with it.""" 3797 return Connection.wrap(pn_event_connection(self._impl))
3798 3799 @property
3800 - def session(self):
3801 """Returns the session associated with the event, or null if none is associated with it.""" 3802 return Session.wrap(pn_event_session(self._impl))
3803 3804 @property 3808 3809 @property
3810 - def sender(self):
3811 """Returns the sender link associated with the event, or null if 3812 none is associated with it. This is essentially an alias for 3813 link(), that does an additional checkon the type of the 3814 link.""" 3815 l = self.link 3816 if l and l.is_sender: 3817 return l 3818 else: 3819 return None
3820 3821 @property
3822 - def receiver(self):
3823 """Returns the receiver link associated with the event, or null if 3824 none is associated with it. This is essentially an alias for 3825 link(), that does an additional checkon the type of the link.""" 3826 l = self.link 3827 if l and l.is_receiver: 3828 return l 3829 else: 3830 return None
3831 3832 @property
3833 - def delivery(self):
3834 """Returns the delivery associated with the event, or null if none is associated with it.""" 3835 return Delivery.wrap(pn_event_delivery(self._impl))
3836
3837 - def __repr__(self):
3838 return "%s(%s)" % (self.type, self.context)
3839
3840 -class Handler(object):
3841
3842 - def on_unhandled(self, method, *args):
3843 pass
3844
3845 -class _cadapter:
3846
3847 - def __init__(self, handler, on_error=None):
3848 self.handler = handler 3849 self.on_error = on_error
3850
3851 - def dispatch(self, cevent, ctype):
3852 ev = Event.wrap(cevent, ctype) 3853 ev.dispatch(self.handler)
3854
3855 - def exception(self, exc, val, tb):
3856 if self.on_error is None: 3857 _compat.raise_(exc, val, tb) 3858 else: 3859 self.on_error((exc, val, tb))
3860
3861 -class WrappedHandler(Wrapper):
3862 3863 @staticmethod
3864 - def wrap(impl, on_error=None):
3865 if impl is None: 3866 return None 3867 else: 3868 handler = WrappedHandler(impl) 3869 handler.__dict__["on_error"] = on_error 3870 return handler
3871
3872 - def __init__(self, impl_or_constructor):
3873 Wrapper.__init__(self, impl_or_constructor)
3874
3875 - def _on_error(self, info):
3876 on_error = getattr(self, "on_error", None) 3877 if on_error is None: 3878 _compat.raise_(info[0], info[1], info[2]) 3879 else: 3880 on_error(info)
3881
3882 - def add(self, handler):
3883 if handler is None: return 3884 impl = _chandler(handler, self._on_error) 3885 pn_handler_add(self._impl, impl) 3886 pn_decref(impl)
3887
3888 - def clear(self):
3889 pn_handler_clear(self._impl)
3890
3891 -def _chandler(obj, on_error=None):
3892 if obj is None: 3893 return None 3894 elif isinstance(obj, WrappedHandler): 3895 impl = obj._impl 3896 pn_incref(impl) 3897 return impl 3898 else: 3899 return pn_pyhandler(_cadapter(obj, on_error))
3900
3901 -class Url(object):
3902 """ 3903 Simple URL parser/constructor, handles URLs of the form: 3904 3905 <scheme>://<user>:<password>@<host>:<port>/<path> 3906 3907 All components can be None if not specifeid in the URL string. 3908 3909 The port can be specified as a service name, e.g. 'amqp' in the 3910 URL string but Url.port always gives the integer value. 3911 3912 @ivar scheme: Url scheme e.g. 'amqp' or 'amqps' 3913 @ivar user: Username 3914 @ivar password: Password 3915 @ivar host: Host name, ipv6 literal or ipv4 dotted quad. 3916 @ivar port: Integer port. 3917 @ivar host_port: Returns host:port 3918 """ 3919 3920 AMQPS = "amqps" 3921 AMQP = "amqp" 3922
3923 - class Port(int):
3924 """An integer port number that can be constructed from a service name string""" 3925
3926 - def __new__(cls, value):
3927 """@param value: integer port number or string service name.""" 3928 port = super(Url.Port, cls).__new__(cls, cls._port_int(value)) 3929 setattr(port, 'name', str(value)) 3930 return port
3931
3932 - def __eq__(self, x): return str(self) == x or int(self) == x
3933 - def __ne__(self, x): return not self == x
3934 - def __str__(self): return str(self.name)
3935 3936 @staticmethod
3937 - def _port_int(value):
3938 """Convert service, an integer or a service name, into an integer port number.""" 3939 try: 3940 return int(value) 3941 except ValueError: 3942 try: 3943 return socket.getservbyname(value) 3944 except socket.error: 3945 # Not every system has amqp/amqps defined as a service 3946 if value == Url.AMQPS: return 5671 3947 elif value == Url.AMQP: return 5672 3948 else: 3949 raise ValueError("Not a valid port number or service name: '%s'" % value)
3950
3951 - def __init__(self, url=None, defaults=True, **kwargs):
3952 """ 3953 @param url: URL string to parse. 3954 @param defaults: If true, fill in missing default values in the URL. 3955 If false, you can fill them in later by calling self.defaults() 3956 @param kwargs: scheme, user, password, host, port, path. 3957 If specified, replaces corresponding part in url string. 3958 """ 3959 if url: 3960 self._url = pn_url_parse(unicode2utf8(str(url))) 3961 if not self._url: raise ValueError("Invalid URL '%s'" % url) 3962 else: 3963 self._url = pn_url() 3964 for k in kwargs: # Let kwargs override values parsed from url 3965 getattr(self, k) # Check for invalid kwargs 3966 setattr(self, k, kwargs[k]) 3967 if defaults: self.defaults()
3968
3969 - class PartDescriptor(object):
3970 - def __init__(self, part):
3971 self.getter = globals()["pn_url_get_%s" % part] 3972 self.setter = globals()["pn_url_set_%s" % part]
3973 - def __get__(self, obj, type=None): return self.getter(obj._url)
3974 - def __set__(self, obj, value): return self.setter(obj._url, str(value))
3975 3976 scheme = PartDescriptor('scheme') 3977 username = PartDescriptor('username') 3978 password = PartDescriptor('password') 3979 host = PartDescriptor('host') 3980 path = PartDescriptor('path') 3981
3982 - def _get_port(self):
3983 portstr = pn_url_get_port(self._url) 3984 return portstr and Url.Port(portstr)
3985
3986 - def _set_port(self, value):
3987 if value is None: pn_url_set_port(self._url, None) 3988 else: pn_url_set_port(self._url, str(Url.Port(value)))
3989 3990 port = property(_get_port, _set_port) 3991
3992 - def __str__(self): return pn_url_str(self._url)
3993
3994 - def __repr__(self): return "Url(%r)" % str(self)
3995
3996 - def __eq__(self, x): return str(self) == str(x)
3997 - def __ne__(self, x): return not self == x
3998
3999 - def __del__(self):
4000 pn_url_free(self._url); 4001 del self._url
4002
4003 - def defaults(self):
4004 """ 4005 Fill in missing values (scheme, host or port) with defaults 4006 @return: self 4007 """ 4008 self.scheme = self.scheme or self.AMQP 4009 self.host = self.host or '0.0.0.0' 4010 self.port = self.port or self.Port(self.scheme) 4011 return self
4012 4013 __all__ = [ 4014 "API_LANGUAGE", 4015 "IMPLEMENTATION_LANGUAGE", 4016 "ABORTED", 4017 "ACCEPTED", 4018 "AUTOMATIC", 4019 "PENDING", 4020 "MANUAL", 4021 "REJECTED", 4022 "RELEASED", 4023 "MODIFIED", 4024 "SETTLED", 4025 "UNDESCRIBED", 4026 "Array", 4027 "Collector", 4028 "Condition", 4029 "Connection", 4030 "Data", 4031 "Delivery", 4032 "Disposition", 4033 "Described", 4034 "Endpoint", 4035 "Event", 4036 "Handler", 4037 "Link", 4038 "Message", 4039 "MessageException", 4040 "Messenger", 4041 "MessengerException", 4042 "ProtonException", 4043 "VERSION_MAJOR", 4044 "VERSION_MINOR", 4045 "Receiver", 4046 "SASL", 4047 "Sender", 4048 "Session", 4049 "SessionException", 4050 "SSL", 4051 "SSLDomain", 4052 "SSLSessionDetails", 4053 "SSLUnavailable", 4054 "SSLException", 4055 "Terminus", 4056 "Timeout", 4057 "Interrupt", 4058 "Transport", 4059 "TransportException", 4060 "Url", 4061 "char", 4062 "dispatch", 4063 "symbol", 4064 "timestamp", 4065 "ulong" 4066 ] 4067