qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From r..@apache.org
Subject svn commit: r668345 - in /incubator/qpid/trunk/qpid/python: qpid/framer.py tests/framer.py
Date Mon, 16 Jun 2008 23:27:54 GMT
Author: rhs
Date: Mon Jun 16 16:27:54 2008
New Revision: 668345

URL: http://svn.apache.org/viewvc?rev=668345&view=rev
Log:
QPID-1143: added buffering, we now only issue one write per assembly

Modified:
    incubator/qpid/trunk/qpid/python/qpid/framer.py
    incubator/qpid/trunk/qpid/python/tests/framer.py

Modified: incubator/qpid/trunk/qpid/python/qpid/framer.py
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/python/qpid/framer.py?rev=668345&r1=668344&r2=668345&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/python/qpid/framer.py (original)
+++ incubator/qpid/trunk/qpid/python/qpid/framer.py Mon Jun 16 16:27:54 2008
@@ -20,7 +20,7 @@
 import struct, socket
 from exceptions import Closed
 from packer import Packer
-from threading import Lock
+from threading import RLock
 from logging import getLogger
 
 raw = getLogger("qpid.io.raw")
@@ -75,12 +75,25 @@
 
   def __init__(self, sock):
     self.sock = sock
-    self.sock_lock = Lock()
+    self.sock_lock = RLock()
+    self._buf = ""
 
   def aborted(self):
     return False
 
   def write(self, buf):
+    self._buf += buf
+
+  def flush(self):
+    self.sock_lock.acquire()
+    try:
+      self._write(self._buf)
+      self._buf = ""
+      frm.debug("FLUSHED")
+    finally:
+      self.sock_lock.release()
+
+  def _write(self, buf):
     while buf:
       try:
         n = self.sock.send(buf)
@@ -120,6 +133,7 @@
     self.sock_lock.acquire()
     try:
       self.pack(Framer.HEADER, "AMQP", 1, 1, major, minor)
+      self.flush()
     finally:
       self.sock_lock.release()
 
@@ -130,6 +144,8 @@
       track = frame.track & 0x0F
       self.pack(Frame.HEADER, frame.flags, frame.type, size, track, frame.channel)
       self.write(frame.payload)
+      if frame.isLastSegment() and frame.isLastFrame():
+        self.flush()
       frm.debug("SENT %s", frame)
     finally:
       self.sock_lock.release()

Modified: incubator/qpid/trunk/qpid/python/tests/framer.py
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/python/tests/framer.py?rev=668345&r1=668344&r2=668345&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/python/tests/framer.py (original)
+++ incubator/qpid/trunk/qpid/python/tests/framer.py Mon Jun 16 16:27:54 2008
@@ -37,6 +37,7 @@
           while True:
             frame = conn.read_frame()
             conn.write_frame(frame)
+            conn.flush()
         except Closed:
           pass
 
@@ -60,6 +61,7 @@
     c.write_frame(Frame(0, 1, 2, 3, "IS"))
     c.write_frame(Frame(0, 1, 2, 3, "A"))
     c.write_frame(Frame(LAST_FRM, 1, 2, 3, "TEST"))
+    c.flush()
 
     f = c.read_frame()
     assert f.flags & FIRST_FRM



Mime
View raw message