qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From g...@apache.org
Subject svn commit: r504590 - in /incubator/qpid/branches/qpid.0-9/python: qpid/client.py qpid/codec.py qpid/peer.py qpid/reference.py qpid/testlib.py tests/message.py
Date Wed, 07 Feb 2007 15:36:02 GMT
Author: gsim
Date: Wed Feb  7 07:36:01 2007
New Revision: 504590

URL: http://svn.apache.org/viewvc?view=rev&rev=504590
Log:
Added support for receiving and sending of references
Added asynchronous mode to channels (responses can be tracked via a future, rather than blocking
on each request)
Added ability to override server suggested connection tune params
Added two tests for reference functionality (more to follow)


Added:
    incubator/qpid/branches/qpid.0-9/python/qpid/reference.py   (with props)
Modified:
    incubator/qpid/branches/qpid.0-9/python/qpid/client.py
    incubator/qpid/branches/qpid.0-9/python/qpid/codec.py
    incubator/qpid/branches/qpid.0-9/python/qpid/peer.py
    incubator/qpid/branches/qpid.0-9/python/qpid/testlib.py
    incubator/qpid/branches/qpid.0-9/python/tests/message.py

Modified: incubator/qpid/branches/qpid.0-9/python/qpid/client.py
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/python/qpid/client.py?view=diff&rev=504590&r1=504589&r2=504590
==============================================================================
--- incubator/qpid/branches/qpid.0-9/python/qpid/client.py (original)
+++ incubator/qpid/branches/qpid.0-9/python/qpid/client.py Wed Feb  7 07:36:01 2007
@@ -28,6 +28,7 @@
 from connection import Connection, Frame, connect
 from spec import load
 from queue import Queue
+from reference import ReferenceId, References
 
 
 class Client:
@@ -69,13 +70,14 @@
       self.lock.release()
     return q
 
-  def start(self, response, mechanism="AMQPLAIN", locale="en_US"):
+  def start(self, response, mechanism="AMQPLAIN", locale="en_US", tune_params=None):
     self.mechanism = mechanism
     self.response = response
     self.locale = locale
+    self.tune_params = tune_params
 
     self.conn = Connection(connect(self.host, self.port), self.spec)
-    self.peer = Peer(self.conn, ClientDelegate(self))
+    self.peer = Peer(self.conn, ClientDelegate(self), self.opened)
 
     self.conn.init()
     self.peer.start()
@@ -85,6 +87,9 @@
   def channel(self, id):
     return self.peer.channel(id)
 
+  def opened(self, ch):
+    ch.references = References()
+
 class ClientDelegate(Delegate):
 
   def __init__(self, client):
@@ -97,11 +102,28 @@
                  locale=self.client.locale)
 
   def connection_tune(self, ch, msg):
-    msg.tune_ok(*msg.frame.args)
+    if self.client.tune_params:
+      #todo: just override the params, i.e. don't require them
+      #      all to be included in tune_params
+      msg.tune_ok(**self.client.tune_params)
+    else:  
+      msg.tune_ok(*msg.frame.args)
     self.client.started.set()
 
   def message_transfer(self, ch, msg):
-    self.client.queue(msg.destination).put(msg)
+    if isinstance(msg.body, ReferenceId):
+      self.client.queue(msg.destination).put(ch.references.get(msg.body.id))
+    else:
+      self.client.queue(msg.destination).put(msg)
+
+  def message_open(self, ch, msg):
+    ch.references.open(msg.reference)
+
+  def message_close(self, ch, msg):
+    ch.references.close(msg.reference)
+
+  def message_append(self, ch, msg):
+    ch.references.get(msg.reference).append(msg.bytes)
 
   def basic_deliver(self, ch, msg):
     self.client.queue(msg.consumer_tag).put(msg)

Modified: incubator/qpid/branches/qpid.0-9/python/qpid/codec.py
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/python/qpid/codec.py?view=diff&rev=504590&r1=504589&r2=504590
==============================================================================
--- incubator/qpid/branches/qpid.0-9/python/qpid/codec.py (original)
+++ incubator/qpid/branches/qpid.0-9/python/qpid/codec.py Wed Feb  7 07:36:01 2007
@@ -26,6 +26,7 @@
 
 from cStringIO import StringIO
 from struct import *
+from reference import ReferenceId
 
 class EOF(Exception):
   pass
@@ -195,14 +196,24 @@
     return self.decode_longlong()
 
   def encode_content(self, s):
-    # XXX
-    self.encode_octet(0)
-    self.encode_longstr(s)
-
-  def decode_content(self):
-    # XXX
-    self.decode_octet()
-    return self.decode_longstr()
+    # content can be passed as a string in which case it is assumed to
+    # be inline data, or as an instance of ReferenceId indicating it is
+    # a reference id    
+    if isinstance(s, ReferenceId):
+      self.encode_octet(1)
+      self.encode_longstr(s.id)
+    else:      
+      self.encode_octet(0)
+      self.encode_longstr(s)
+
+  def decode_content(self):    
+    # return a string for inline data and a ReferenceId instance for
+    # references
+    type = self.decode_octet()
+    if type == 0:
+      return self.decode_longstr()
+    else:
+      return ReferenceId(self.decode_longstr())
 
 def test(type, value):
   if isinstance(value, (list, tuple)):

Modified: incubator/qpid/branches/qpid.0-9/python/qpid/peer.py
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/python/qpid/peer.py?view=diff&rev=504590&r1=504589&r2=504590
==============================================================================
--- incubator/qpid/branches/qpid.0-9/python/qpid/peer.py (original)
+++ incubator/qpid/branches/qpid.0-9/python/qpid/peer.py Wed Feb  7 07:36:01 2007
@@ -50,13 +50,14 @@
 
 class Peer:
 
-  def __init__(self, conn, delegate):
+  def __init__(self, conn, delegate, channel_callback=None):
     self.conn = conn
     self.delegate = delegate
     self.outgoing = Queue(0)
     self.work = Queue(0)
     self.channels = {}
     self.lock = thread.allocate_lock()
+    self.channel_callback = channel_callback #notified when channels are created
 
   def channel(self, id):
     self.lock.acquire()
@@ -66,6 +67,8 @@
       except KeyError:
         ch = Channel(id, self.outgoing, self.conn.spec)
         self.channels[id] = ch
+        if self.channel_callback:
+          self.channel_callback(ch)
     finally:
       self.lock.release()
     return ch
@@ -177,6 +180,7 @@
 
     # XXX: better switch
     self.reliable = True
+    self.synchronous = True
 
   def close(self, reason):
     if self.closed:
@@ -238,6 +242,12 @@
     content = kwargs.pop("content", None)
     frame = Method(type, type.arguments(*args, **kwargs))
     if self.reliable:
+      if not self.synchronous:
+        future = Future()
+        self.request(frame, future.put_response, content)
+        if not frame.method.responses: return None
+        else: return future
+      
       self.request(frame, self.queue_response, content)
       if not frame.method.responses:
         return None
@@ -304,3 +314,18 @@
     buf.write(content)
     read += len(content)
   return Content(buf.getvalue(), children, header.properties.copy())
+
+class Future:
+  def __init__(self):
+    self.completed = threading.Event()
+
+  def put_response(self, channel, response):
+    self.response = response
+    self.completed.set()
+
+  def get_response(self, timeout=None):
+    self.completed.wait(timeout)
+    return self.response
+
+  def is_complete(self):
+    return self.completed.isSet()

Added: incubator/qpid/branches/qpid.0-9/python/qpid/reference.py
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/python/qpid/reference.py?view=auto&rev=504590
==============================================================================
--- incubator/qpid/branches/qpid.0-9/python/qpid/reference.py (added)
+++ incubator/qpid/branches/qpid.0-9/python/qpid/reference.py Wed Feb  7 07:36:01 2007
@@ -0,0 +1,117 @@
+#!/usr/bin/env python
+
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+# 
+#   http://www.apache.org/licenses/LICENSE-2.0
+# 
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+"""
+Support for amqp 'reference' content (as opposed to inline content)
+"""
+
+import threading
+from queue import Queue, Closed
+
+class NotOpened(Exception): pass
+
+class AlreadyOpened(Exception): pass
+
+"""
+A representation of a reference id; can be passed wherever amqp
+content is required in place of inline data
+"""
+class ReferenceId:
+
+    def __init__(self, id):
+        self.id = id
+
+"""
+Holds content received through 'reference api'. Instances of this
+class will be placed in the consumers queue on receiving a transfer
+(assuming the reference has been opened). Data can be retrieved in
+chunks (as append calls are received) or in full (after reference has
+been closed signalling data s complete).
+"""
+
+class Reference:
+
+    def __init__(self, id):
+        self.id = id
+        self.chunks = Queue(0)
+
+    def close(self):
+        self.chunks.close()
+
+    def append(self, bytes):    
+        self.chunks.put(bytes)
+
+    def get_chunk(self):
+        return self.chunks.get()
+
+    def get_complete(self):
+        data = ""
+        for chunk in self:
+            data += chunk
+        return data
+
+    def next(self):
+        try:
+            return self.get_chunk()
+        except Closed, e:
+            raise StopIteration
+
+    def __iter__(self):
+        return self
+
+"""
+Manages a set of opened references. New references can be opened and
+existing references can be retrieved or closed.
+"""
+class References:
+
+    def __init__(self):
+        self.map = {}
+        self.lock = threading.Lock()
+
+    def get(self, id):
+        self.lock.acquire()
+        try:
+            try:
+                ref = self.map[id]
+            except KeyError:
+                raise NotOpened()
+        finally:
+            self.lock.release()
+        return ref
+
+    def open(self, id):
+        self.lock.acquire()
+        try:
+            if id in self.map: raise AlreadyOpened()
+            self.map[id] = Reference(id)
+        finally:
+            self.lock.release()
+
+
+    def close(self, id):
+        self.get(id).close()
+        self.lock.acquire()
+        try:
+            del map[id]
+        finally:
+            self.lock.release()
+        

Propchange: incubator/qpid/branches/qpid.0-9/python/qpid/reference.py
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: incubator/qpid/branches/qpid.0-9/python/qpid/testlib.py
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/python/qpid/testlib.py?view=diff&rev=504590&r1=504589&r2=504590
==============================================================================
--- incubator/qpid/branches/qpid.0-9/python/qpid/testlib.py (original)
+++ incubator/qpid/branches/qpid.0-9/python/qpid/testlib.py Wed Feb  7 07:36:01 2007
@@ -129,7 +129,7 @@
             print "======================================="
         return result.wasSuccessful()
 
-    def connect(self, host=None, port=None, spec=None, errata=None, user=None, password=None):
+    def connect(self, host=None, port=None, spec=None, errata=None, user=None, password=None,
tune_params=None):
         """Connect to the broker, returns a qpid.client.Client"""
         host = host or self.host
         port = port or self.port
@@ -138,7 +138,7 @@
         user = user or self.user
         password = password or self.password
         client = qpid.client.Client(host, port, qpid.spec.load(spec, errata))
-        client.start({"LOGIN": user, "PASSWORD": password})
+        client.start({"LOGIN": user, "PASSWORD": password}, tune_params=tune_params)
         return client
 
 

Modified: incubator/qpid/branches/qpid.0-9/python/tests/message.py
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/python/tests/message.py?view=diff&rev=504590&r1=504589&r2=504590
==============================================================================
--- incubator/qpid/branches/qpid.0-9/python/tests/message.py (original)
+++ incubator/qpid/branches/qpid.0-9/python/tests/message.py Wed Feb  7 07:36:01 2007
@@ -20,6 +20,7 @@
 from qpid.queue import Empty
 from qpid.content import Content
 from qpid.testlib import testrunner, TestBase
+from qpid.reference import Reference, ReferenceId
 
 class MessageTests(TestBase):
     """Tests for 'methods' on the amqp message 'class'"""
@@ -413,3 +414,66 @@
         reply = channel.message_get(no_ack=True)
         self.assertEqual(reply.method.klass.name, "message")
         self.assertEqual(reply.method.name, "get-empty")
+
+    def test_reference_simple(self):
+        """
+        Test basic ability to handle references
+        """
+        channel = self.channel
+        channel.queue_declare(queue="ref_queue", exclusive=True)
+        channel.message_consume(queue="ref_queue", destination="c1")
+        queue = self.client.queue("c1")
+
+        refId = "myref"
+        channel.message_open(reference=refId)
+        channel.message_append(reference=refId, bytes="abcd")
+        channel.synchronous = False
+        ack = channel.message_transfer(routing_key="ref_queue", body=ReferenceId(refId))
+        channel.synchronous = True
+
+        channel.message_append(reference=refId, bytes="efgh")
+        channel.message_append(reference=refId, bytes="ijkl")
+        channel.message_close(reference=refId)
+
+        #first, wait for the ok for the transfer
+        ack.get_response(timeout=1)
+        
+        msg = queue.get(timeout=1)
+        if isinstance(msg, Reference):
+            #should we force broker to deliver as reference by frame
+            #size limit? or test that separately? for compliance, 
+            #allowing either seems best for now...            
+            data = msg.get_complete()
+        else:
+            data = msg.body
+        self.assertEquals("abcdefghijkl", data)
+
+
+    def test_reference_large(self):
+        """
+        Test basic ability to handle references whose content exceeds max frame size
+        """
+        channel = self.channel
+        self.queue_declare(queue="ref_queue")
+
+        #generate a big data string (> max frame size of consumer):
+        data = "0123456789"
+        for i in range(0, 10):
+            data += data
+        #send it inline    
+        channel.synchronous = False
+        ack = channel.message_transfer(routing_key="ref_queue", body=data)
+        channel.synchronous = True
+        #first, wait for the ok for the transfer
+        ack.get_response(timeout=1)
+
+        #create a new connection for consumer, with specific max frame size (< data)
+        other = self.connect(tune_params={"channel_max":10, "frame_max":5120, "heartbeat":0})
+        ch2 = other.channel(1)
+        ch2.channel_open()
+        ch2.message_consume(queue="ref_queue", destination="c1")
+        queue = other.queue("c1")
+        
+        msg = queue.get(timeout=1)
+        self.assertTrue(isinstance(msg, Reference))
+        self.assertEquals(data, msg.get_complete())



Mime
View raw message