qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From acon...@apache.org
Subject svn commit: r590751 - in /incubator/qpid/trunk/qpid/cpp/src: qpid/broker/ qpid/client/ qpid/framing/ tests/
Date Wed, 31 Oct 2007 16:57:16 GMT
Author: aconway
Date: Wed Oct 31 09:56:57 2007
New Revision: 590751

URL: http://svn.apache.org/viewvc?rev=590751&view=rev
Log:

Simplify SessionState, preparing for session thread safety fixes.

Modified:
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandler.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandler.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionManager.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionCore.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/framing/SessionState.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/framing/SessionState.h
    incubator/qpid/trunk/qpid/cpp/src/tests/SessionState.cpp

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandler.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandler.cpp?rev=590751&r1=590750&r2=590751&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandler.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandler.cpp Wed Oct 31 09:56:57 2007
@@ -38,7 +38,8 @@
       connection(c), channel(ch, &c.getOutput()),
       proxy(out),               // Via my own handleOut() for L2 data.
       peerSession(channel),     // Direct to channel for L2 commands.
-      ignoring(false) {}
+      ignoring(false),
+      resuming(false) {}
 
 SessionHandler::~SessionHandler() {}
 
@@ -114,7 +115,8 @@
     assertClosed("resume");
     session = connection.broker.getSessionManager().resume(id);
     session->attach(*this);
-    SequenceNumber seq = session->resuming();
+    resuming=true;
+    SequenceNumber seq = session->sendingAck();
     peerSession.attached(session->getId(), session->getTimeout());
     proxy.getSession().ack(seq, SequenceNumberSet());
 }
@@ -148,7 +150,7 @@
 }
 
 void SessionHandler::localSuspend() {
-    if (session.get() && session->getState() == SessionState::ATTACHED) {
+    if (session.get()) {
         session->detach();
         connection.broker.getSessionManager().suspend(session);
     }
@@ -166,7 +168,8 @@
                           const SequenceNumberSet& /*seenFrameSet*/)
 {
     assertAttached("ack");
-    if (session->getState() == SessionState::RESUMING) {
+    if (resuming) {
+        resuming=false;
         session->receivedAck(cumulativeSeenMark);
         framing::SessionState::Replay replay=session->replay();
         std::for_each(replay.begin(), replay.end(),

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandler.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandler.h?rev=590751&r1=590750&r2=590751&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandler.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandler.h Wed Oct 31 09:56:57 2007
@@ -92,6 +92,7 @@
     framing::AMQP_ClientProxy proxy;
     framing::AMQP_ClientProxy::Session peerSession;
     bool ignoring;
+    bool resuming;
     std::auto_ptr<SessionState> session;
 };
 

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionManager.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionManager.cpp?rev=590751&r1=590750&r2=590751&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionManager.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionManager.cpp Wed Oct 31 09:56:57 2007
@@ -56,7 +56,6 @@
 void  SessionManager::suspend(std::auto_ptr<SessionState> session) {
     Mutex::ScopedLock l(lock);
     active.erase(session->getId());
-    session->suspend();
     session->expiry = AbsTime(now(),session->getTimeout()*TIME_SEC);
     suspended.push_back(session.release()); // In expiry order
     eraseExpired();

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionCore.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionCore.cpp?rev=590751&r1=590750&r2=590751&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionCore.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionCore.cpp Wed Oct 31 09:56:57 2007
@@ -52,7 +52,6 @@
         break;
       case RESUMING:
         assert(session);
-        assert(session->getState() == SessionState::RESUMING);
         assert(code==REPLY_SUCCESS);
         assert(connection);
         assert(channel.get());
@@ -143,7 +142,6 @@
     if (state != CLOSED) {
         invariant();
         detach(code, text);
-        session->suspend();
         setState(SUSPENDED);
     }
 }
@@ -202,7 +200,7 @@
         if (state==OPEN)
             doSuspend(REPLY_SUCCESS, OK);
         check(state==SUSPENDED, COMMAND_INVALID, QPID_MSG("Session cannot be resumed."));
-        SequenceNumber sendAck=session->resuming();
+        SequenceNumber sendAck=session->sendingAck();
         attaching(c);
         proxy.resume(getId());
         waitFor(OPEN);

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/framing/SessionState.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/framing/SessionState.cpp?rev=590751&r1=590750&r2=590751&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/framing/SessionState.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/framing/SessionState.cpp Wed Oct 31 09:56:57 2007
@@ -33,7 +33,6 @@
 namespace framing {
 
 SessionState::SessionState(uint32_t ack,  const Uuid& uuid) :
-    state(ATTACHED),
     id(uuid),
     lastReceived(-1),
     lastSent(-1),
@@ -45,7 +44,6 @@
 {}
 
 SessionState::SessionState(const Uuid& uuid) :
-    state(ATTACHED),
     id(uuid),
     lastReceived(-1),
     lastSent(-1),
@@ -65,10 +63,6 @@
 boost::optional<SequenceNumber> SessionState::received(const AMQFrame& f) {
     if (isSessionCommand(f))
         return boost::none;
-    if (state==RESUMING)
-        throw CommandInvalidException(
-            QPID_MSG("Invalid frame: Resuming session, expected session-ack"));
-    assert(state = ATTACHED);
     ++lastReceived;
     QPID_LOG(trace, "Recv # "<< lastReceived << " " << id);
     if (ackInterval && lastReceived == sendAckAt)
@@ -85,7 +79,6 @@
     ++lastSent;
     QPID_LOG(trace, "Sent # "<< lastSent << " " << id);
     return ackInterval &&
-        (state!=RESUMING) &&
         (lastSent == solicitAckAt) &&
         sendingSolicit();
 }
@@ -97,8 +90,6 @@
 }
 
 void SessionState::receivedAck(SequenceNumber acked) {
-    if (state==RESUMING) state=ATTACHED;
-    assert(state==ATTACHED);
      if (lastSent < acked)
         throw InvalidArgumentException("Invalid sequence number in ack");
     size_t keep = lastSent - acked;
@@ -113,22 +104,10 @@
 }
 
 bool SessionState::sendingSolicit() {
-    assert(state == ATTACHED);
     if (ackSolicited)
         return false;
     solicitAckAt = lastSent + ackInterval;
     return ackInterval != 0;
-}
-
-SequenceNumber SessionState::resuming() {
-    if (!resumable)
-        throw InternalErrorException("Session is not resumable");
-    state = RESUMING;
-    return sendingAck();
-}
-
-void SessionState::suspend() {
-    state = SUSPENDED;
 }
 
 }} // namespace qpid::framing

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/framing/SessionState.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/framing/SessionState.h?rev=590751&r1=590750&r2=590751&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/framing/SessionState.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/framing/SessionState.h Wed Oct 31 09:56:57 2007
@@ -35,7 +35,10 @@
 
 /**
  * Session state common to client and broker.
- * Stores replay frames, implements session ack/resume protcools.
+ * 
+ * Stores data needed to resume a session: replay frames, implements
+ * session ack/resume protcools. Stores handler chains for the session,
+ * handlers may themselves store state.
  *
  * A SessionState is always associated with an _open_ session (attached or
  * suspended) it is destroyed when the session is closed.
@@ -46,13 +49,6 @@
   public:
     typedef std::vector<AMQFrame> Replay;
 
-    /** States of a session. */
-    enum State {
-        SUSPENDED, ///< Suspended, detached from any channel.
-        RESUMING, ///< Resuming: waiting for initial ack from peer.
-        ATTACHED ///< Attached to channel and operating normally.
-    };
-
     /**
      *Create a newly opened active session.
      *@param ackInterval send/solicit an ack whenever N unacked frames
@@ -60,7 +56,8 @@
      * 
      * N=0 disables voluntary send/solict ack.
      */
-    SessionState(uint32_t ackInterval, const framing::Uuid& id=framing::Uuid(true));
+    SessionState(uint32_t ackInterval,
+                 const framing::Uuid& id=framing::Uuid(true));
 
     /**
      * Create a non-resumable session. Does not store session frames,
@@ -69,7 +66,6 @@
     SessionState(const framing::Uuid& id=framing::Uuid(true));
 
     const framing::Uuid& getId() const { return id; }
-    State getState() const { return state; }
     
     /** Received incoming L3 frame.
      * @return SequenceNumber if an ack should be sent, empty otherwise.
@@ -92,13 +88,6 @@
      */
     Replay replay();
 
-    /** Suspend the session. */
-    void suspend();
-
-    /** Start resume protocol for the session.
-     *@returns sequence number to ack immediately.  */
-    SequenceNumber resuming();
-
     /** About to send an unscheduled ack, e.g. to respond to a solicit-ack.
      * 
      * Note: when received() returns a sequence number this function
@@ -115,9 +104,7 @@
 
     bool sendingSolicit();
 
-    State state;
     framing::Uuid id;
-
     Unacked unackedOut;
     SequenceNumber lastReceived;
     SequenceNumber lastSent;

Modified: incubator/qpid/trunk/qpid/cpp/src/tests/SessionState.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/SessionState.cpp?rev=590751&r1=590750&r2=590751&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/SessionState.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/SessionState.cpp Wed Oct 31 09:56:57 2007
@@ -97,21 +97,16 @@
     // Replay of all frames.
     SessionState session(100);
     sent(session, "abc"); 
-    session.suspend(); session.resuming();
     session.receivedAck(-1);
     BOOST_CHECK_EQUAL(replayChars(session.replay()), "abc");
 
     // Replay with acks
     session.receivedAck(0); // ack a.
-    session.suspend();
-    session.resuming();
     session.receivedAck(1); // ack b.
     BOOST_CHECK_EQUAL(replayChars(session.replay()), "c");
 
     // Replay after further frames.
     sent(session, "def");
-    session.suspend();
-    session.resuming();
     session.receivedAck(3);
     BOOST_CHECK_EQUAL(replayChars(session.replay()), "ef");
 



Mime
View raw message