qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From acon...@apache.org
Subject svn commit: r589731 - in /incubator/qpid/trunk/qpid/cpp/src/qpid: broker/Broker.cpp framing/SessionState.cpp framing/SessionState.h
Date Mon, 29 Oct 2007 16:50:46 GMT
Author: aconway
Date: Mon Oct 29 09:50:45 2007
New Revision: 589731

URL: http://svn.apache.org/viewvc?rev=589731&view=rev
Log:
##-*-text-*-

Added qpidd --ack option  to set ack/solicit-ack interval. 0 disabled acks.

Sessions with 0 timeout never ack and don't store replay frames.

Modified:
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/framing/SessionState.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/framing/SessionState.h

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp?rev=589731&r1=589730&r2=589731&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp Mon Oct 29 09:50:45 2007
@@ -65,7 +65,7 @@
     storeAsync(false),
     enableMgmt(0),
     mgmtPubInterval(10),
-    ack(100)	    
+    ack(100)
 {
     addOptions()
         ("port,p", optValue(port,"PORT"),
@@ -87,7 +87,9 @@
         ("mgmt,m", optValue(enableMgmt,"yes|no"),
          "Enable Management")
         ("mgmt-pub-interval", optValue(mgmtPubInterval, "SECONDS"),
-         "Management Publish Interval");
+         "Management Publish Interval")
+        ("ack", optValue(ack, "N"),
+         "Send ack/solicit-ack at least every N frames. 0 disables voluntary acks/solitict-ack");
 }
 
 const std::string empty;

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/framing/SessionState.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/framing/SessionState.cpp?rev=589731&r1=589730&r2=589731&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/framing/SessionState.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/framing/SessionState.cpp Mon Oct 29 09:50:45 2007
@@ -40,11 +40,22 @@
     ackInterval(ack),
     sendAckAt(lastReceived+ackInterval),
     solicitAckAt(lastSent+ackInterval),
-    ackSolicited(false)
+    ackSolicited(false),
+    resumable(true)
+{}
+
+SessionState::SessionState(const Uuid& uuid) :
+    state(ATTACHED),
+    id(uuid),
+    lastReceived(-1),
+    lastSent(-1),
+    ackInterval(0),
+    sendAckAt(0),
+    solicitAckAt(0),
+    ackSolicited(false),
+    resumable(false)
 {
-    assert(ackInterval > 0);
 }
-
 namespace {
 bool isSessionCommand(const AMQFrame& f) {
     return f.getMethod() && f.getMethod()->amqpClassId() == SESSION_CLASS_ID;
@@ -58,10 +69,9 @@
         throw CommandInvalidException(
             QPID_MSG("Invalid frame: Resuming session, expected session-ack"));
     assert(state = ATTACHED);
-    assert(lastReceived<sendAckAt);
     ++lastReceived;
     QPID_LOG(trace, "Recv # "<< lastReceived << " " << id);
-    if (lastReceived == sendAckAt)
+    if (ackInterval && lastReceived == sendAckAt)
         return sendingAck();
     else
         return boost::none;
@@ -70,10 +80,12 @@
 bool SessionState::sent(const AMQFrame& f) {
     if (isSessionCommand(f))
         return false;
-    unackedOut.push_back(f);
+    if (resumable)
+        unackedOut.push_back(f);
     ++lastSent;
     QPID_LOG(trace, "Sent # "<< lastSent << " " << id);
-    return (state!=RESUMING) &&
+    return ackInterval &&
+        (state!=RESUMING) &&
         (lastSent == solicitAckAt) &&
         sendingSolicit();
 }
@@ -105,10 +117,12 @@
     if (ackSolicited)
         return false;
     solicitAckAt = lastSent + ackInterval;
-    return true;
+    return ackInterval != 0;
 }
 
 SequenceNumber SessionState::resuming() {
+    if (!resumable)
+        throw InternalErrorException("Session is not resumable");
     state = RESUMING;
     return sendingAck();
 }

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/framing/SessionState.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/framing/SessionState.h?rev=589731&r1=589730&r2=589731&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/framing/SessionState.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/framing/SessionState.h Mon Oct 29 09:50:45 2007
@@ -35,12 +35,11 @@
 
 /**
  * Session state common to client and broker.
- * Implements session ack/resume protcools.
+ * Stores replay frames, implements session ack/resume protcools.
  *
  * A SessionState is always associated with an _open_ session (attached or
  * suspended) it is destroyed when the session is closed.
  *
- * A template to make it protocol independent and easy to test.
  */
 class SessionState
 {
@@ -58,9 +57,16 @@
      *Create a newly opened active session.
      *@param ackInterval send/solicit an ack whenever N unacked frames
      * have been received/sent.
-     *@pre ackInterval > 0
+     * 
+     * N=0 disables voluntary send/solict ack.
+     */
+    SessionState(uint32_t ackInterval, const framing::Uuid& id=framing::Uuid(true));
+
+    /**
+     * Create a non-resumable session. Does not store session frames,
+     * never volunteers ack or solicit-ack.
      */
-    SessionState(uint32_t ackInterval=1, const framing::Uuid& id=framing::Uuid(true));
+    SessionState(const framing::Uuid& id=framing::Uuid(true));
 
     const framing::Uuid& getId() const { return id; }
     State getState() const { return state; }
@@ -103,6 +109,7 @@
 
     SequenceNumber getLastSent() const { return lastSent; }
     SequenceNumber getLastReceived() const { return lastReceived; }
+
   private:
     typedef std::deque<AMQFrame> Unacked;
 
@@ -110,6 +117,7 @@
 
     State state;
     framing::Uuid id;
+
     Unacked unackedOut;
     SequenceNumber lastReceived;
     SequenceNumber lastSent;
@@ -118,6 +126,7 @@
     SequenceNumber solicitAckAt;
     bool ackSolicited;
     bool suspending;
+    bool resumable;
 };
 
 



Mime
View raw message