qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From acon...@apache.org
Subject svn commit: r1433055 - in /qpid/trunk/qpid/cpp/src/qpid/client: SessionImpl.cpp SessionImpl.h
Date Mon, 14 Jan 2013 19:08:53 GMT
Author: aconway
Date: Mon Jan 14 19:08:53 2013
New Revision: 1433055

URL: http://svn.apache.org/viewvc?rev=1433055&view=rev
Log:
QPID-4514: Clean up cluster code: SessionImpl

Clean up obsolete code in SessionImpl class used only by defunct cluster code:
- Remove SessionImpl::send reframe parameter.
- Remove doClearDeliveryPropertiesExchange flag.
- Remove disableAutoDetach and autoDetach flag.

Modified:
    qpid/trunk/qpid/cpp/src/qpid/client/SessionImpl.cpp
    qpid/trunk/qpid/cpp/src/qpid/client/SessionImpl.h

Modified: qpid/trunk/qpid/cpp/src/qpid/client/SessionImpl.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/client/SessionImpl.cpp?rev=1433055&r1=1433054&r2=1433055&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/client/SessionImpl.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/client/SessionImpl.cpp Mon Jan 14 19:08:53 2013
@@ -7,9 +7,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -61,9 +61,7 @@ SessionImpl::SessionImpl(const std::stri
       ioHandler(*this),
       proxy(ioHandler),
       nextIn(0),
-      nextOut(0),
-      doClearDeliveryPropertiesExchange(true),
-      autoDetach(true)
+      nextOut(0)
 {
     channel.next = connection.get();
 }
@@ -72,12 +70,10 @@ SessionImpl::~SessionImpl() {
     {
         Lock l(state);
         if (state != DETACHED && state != DETACHING) {
-            if (autoDetach) {
-                QPID_LOG(warning, "Session was not closed cleanly: " << id);
-                // Inform broker but don't wait for detached as that deadlocks.
-                // The detached will be ignored as the channel will be invalid.
-                try { detach(); } catch (...) {}    // ignore errors.
-            }
+            QPID_LOG(warning, "Session was not closed cleanly: " << id);
+            // Inform broker but don't wait for detached as that deadlocks.
+            // The detached will be ignored as the channel will be invalid.
+            try { detach(); } catch (...) {}    // ignore errors.
             setState(DETACHED);
             handleClosed();
             state.waitWaiters();
@@ -136,10 +132,10 @@ void SessionImpl::resume(boost::shared_p
 void SessionImpl::suspend() //user thread
 {
     Lock l(state);
-    detach();    
+    detach();
 }
 
-void SessionImpl::detach() //call with lock held 
+void SessionImpl::detach() //call with lock held
 {
     if (state == ATTACHED) {
         setState(DETACHING);
@@ -149,8 +145,8 @@ void SessionImpl::detach() //call with l
 
 
 uint16_t SessionImpl::getChannel() const // user thread
-{ 
-    return channel; 
+{
+    return channel;
 }
 
 void SessionImpl::setChannel(uint16_t c) // user thread
@@ -182,7 +178,7 @@ void SessionImpl::waitForCompletionImpl(
 
 bool SessionImpl::isComplete(const SequenceNumber& id)
 {
-    Lock l(state);    
+    Lock l(state);
     return !incompleteOut.contains(id);
 }
 
@@ -219,7 +215,7 @@ framing::SequenceNumber SessionImpl::get
     return --firstIncomplete;
 }
 
-struct MarkCompleted 
+struct MarkCompleted
 {
     const SequenceNumber& id;
     SequenceSet& completedIn;
@@ -230,7 +226,7 @@ struct MarkCompleted 
     {
         if (id >= end) {
             completedIn.add(start, end);
-        } else if (id >= start) { 
+        } else if (id >= start) {
             completedIn.add(start, id);
         }
     }
@@ -244,13 +240,13 @@ void SessionImpl::markCompleted(const Se
     completedIn.add(ids);
     if (notifyPeer) {
         sendCompletion();
-    }    
+    }
 }
 
 void SessionImpl::markCompleted(const SequenceNumber& id, bool cumulative, bool notifyPeer)
 {
     Lock l(state);
-    if (cumulative) {        
+    if (cumulative) {
         //everything in incompleteIn less than or equal to id is now complete
         MarkCompleted f(id, completedIn);
         incompleteIn.for_each(f);
@@ -260,11 +256,11 @@ void SessionImpl::markCompleted(const Se
         incompleteIn.remove(completedIn);
     } else if (incompleteIn.contains(id)) {
         incompleteIn.remove(id);
-        completedIn.add(id);            
+        completedIn.add(id);
     }
     if (notifyPeer) {
         sendCompletion();
-    }    
+    }
 }
 
 void SessionImpl::setException(const sys::ExceptionHolder& ex) {
@@ -310,42 +306,24 @@ namespace {
 struct SendContentFn {
     FrameHandler& handler;
     void operator()(const AMQFrame& f) {
-        if (!f.getMethod()) 
+        if (!f.getMethod())
             handler(const_cast<AMQFrame&>(f));
     }
     SendContentFn(FrameHandler& h) : handler(h) {}
 };
 
-// Adaptor to make FrameSet look like MethodContent; used in cluster update client
-struct MethodContentAdaptor : MethodContent
-{
-    AMQHeaderBody header;
-    const std::string content;
-
-    MethodContentAdaptor(const FrameSet& f) : header(*f.getHeaders()), content(f.getContent())
{}
-
-    const AMQHeaderBody& getHeader() const
-    {
-        return header;
-    }
-    const std::string& getData() const
-    {
-        return content;
-    }
-};
-
 }
-    
-Future SessionImpl::send(const AMQBody& command, const FrameSet& content, bool reframe)
{
+
+Future SessionImpl::send(const AMQBody& command, const FrameSet& content) {
     Acquire a(sendLock);
     SequenceNumber id = nextOut++;
     {
         Lock l(state);
-        checkOpen();    
+        checkOpen();
         incompleteOut.add(id);
     }
     Future f(id);
-    if (command.getMethod()->resultExpected()) {        
+    if (command.getMethod()->resultExpected()) {
         Lock l(state);
         //result listener must be set before the command is sent
         f.setFutureResult(results.listenForResult(id));
@@ -353,14 +331,8 @@ Future SessionImpl::send(const AMQBody& 
     AMQFrame frame(command);
     frame.setEof(false);
     handleOut(frame);
-
-    if (reframe) {
-        MethodContentAdaptor c(content);
-        sendContent(c);
-    } else {
-        SendContentFn send(out);
-        content.map(send);
-    }
+    SendContentFn send(out);
+    content.map(send);
     return f;
 }
 
@@ -375,11 +347,11 @@ Future SessionImpl::sendCommand(const AM
     SequenceNumber id = nextOut++;
     {
         Lock l(state);
-        checkOpen();    
+        checkOpen();
         incompleteOut.add(id);
     }
     Future f(id);
-    if (command.getMethod()->resultExpected()) {        
+    if (command.getMethod()->resultExpected()) {
         Lock l(state);
         //result listener must be set before the command is sent
         f.setFutureResult(results.listenForResult(id));
@@ -399,23 +371,13 @@ void SessionImpl::sendContent(const Meth
 {
     AMQFrame header(content.getHeader());
 
-    // doClearDeliveryPropertiesExchange is set by cluster update client so
-    // it can send messages with delivery-properties.exchange set.
-    //
-    if (doClearDeliveryPropertiesExchange) {
-        // Normal client is not allowed to set the delivery-properties.exchange
-        // so clear it here.
-        AMQHeaderBody* headerp = static_cast<AMQHeaderBody*>(header.getBody());
-        if (headerp && headerp->get<DeliveryProperties>())
-            headerp->get<DeliveryProperties>(true)->clearExchangeFlag();
-    }
     header.setFirstSegment(false);
     uint64_t data_length = content.getData().length();
     if(data_length > 0){
         header.setLastSegment(false);
-        handleOut(header);   
+        handleOut(header);
         /*Note: end of frame marker included in overhead but not in size*/
-        const uint32_t frag_size = maxFrameSize - AMQFrame::frameOverhead(); 
+        const uint32_t frag_size = maxFrameSize - AMQFrame::frameOverhead();
 
         if(data_length < frag_size){
             AMQFrame frame((AMQContentBody(content.getData())));
@@ -442,7 +404,7 @@ void SessionImpl::sendContent(const Meth
             }
         }
     } else {
-        handleOut(header);   
+        handleOut(header);
     }
 }
 
@@ -462,7 +424,7 @@ bool isContentFrame(AMQFrame& frame)
 {
     AMQBody* body = frame.getBody();
     uint8_t type = body->type();
-    return type == HEADER_BODY || type == CONTENT_BODY || isMessageMethod(body); 
+    return type == HEADER_BODY || type == CONTENT_BODY || isMessageMethod(body);
 }
 
 void SessionImpl::handleIn(AMQFrame& frame) // network thread
@@ -585,7 +547,7 @@ void SessionImpl::timeout(uint32_t t)
 void SessionImpl::commandPoint(const framing::SequenceNumber& id, uint64_t offset)
 {
     if (offset) throw NotImplementedException("Non-zero byte offset not yet supported for
command-point");
-    
+
     Lock l(state);
     nextIn = id;
 }
@@ -677,10 +639,10 @@ void SessionImpl::exception(uint16_t err
 {
     Lock l(state);
     setExceptionLH(createSessionException(errorCode, description));
-    QPID_LOG(warning, "Exception received from broker: " << exceptionHolder.what()

+    QPID_LOG(warning, "Exception received from broker: " << exceptionHolder.what()
              << " [caused by " << commandId << " " << classCode <<
":" << commandCode << "]");
 
-    if (detachedLifetime) 
+    if (detachedLifetime)
         setTimeout(0);
 }
 
@@ -748,6 +710,4 @@ boost::shared_ptr<ConnectionImpl> Sessio
     return connection;
 }
 
-void SessionImpl::disableAutoDetach() { autoDetach = false; }
-
 }}

Modified: qpid/trunk/qpid/cpp/src/qpid/client/SessionImpl.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/client/SessionImpl.h?rev=1433055&r1=1433054&r2=1433055&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/client/SessionImpl.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/client/SessionImpl.h Mon Jan 14 19:08:53 2013
@@ -87,15 +87,7 @@ public:
 
     Future send(const framing::AMQBody& command);
     Future send(const framing::AMQBody& command, const framing::MethodContent& content);
-    /**
-     * This method takes the content as a FrameSet; if reframe=false,
-     * the caller is resposnible for ensuring that the header and
-     * content frames in that set are correct for this connection
-     * (right flags, right fragmentation etc). If reframe=true, then
-     * the header and content from the frameset will be copied and
-     * reframed correctly for the connection.
-     */
-    QPID_CLIENT_EXTERN Future send(const framing::AMQBody& command, const framing::FrameSet&
content, bool reframe=false);
+    QPID_CLIENT_EXTERN Future send(const framing::AMQBody& command, const framing::FrameSet&
content);
     void sendRawFrame(framing::AMQFrame& frame);
 
     Demux& getDemux();
@@ -125,11 +117,6 @@ public:
      */
     boost::shared_ptr<ConnectionImpl> getConnection();
 
-    void setDoClearDeliveryPropertiesExchange(bool b=true) { doClearDeliveryPropertiesExchange
= b; }
-
-    /** Suppress sending detach in destructor. Used by cluster to build session state */
-    void disableAutoDetach();
-
 private:
     enum State {
         INACTIVE,
@@ -225,10 +212,6 @@ private:
 
     SessionState sessionState;
 
-    bool doClearDeliveryPropertiesExchange;
-
-    bool autoDetach;
-    
   friend class client::SessionHandler;
 };
 



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


Mime
View raw message