qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From g...@apache.org
Subject svn commit: r1485909 - in /qpid/trunk/qpid/cpp/src: ./ qpid/broker/
Date Fri, 24 May 2013 00:50:48 GMT
Author: gsim
Date: Fri May 24 00:50:47 2013
New Revision: 1485909

URL: http://svn.apache.org/r1485909
Log:
QPID-4859: ensure flush is called on journals

Added:
    qpid/trunk/qpid/cpp/src/qpid/broker/IngressCompletion.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/IngressCompletion.h
Modified:
    qpid/trunk/qpid/cpp/src/CMakeLists.txt
    qpid/trunk/qpid/cpp/src/Makefile.am
    qpid/trunk/qpid/cpp/src/qpid/broker/Message.h
    qpid/trunk/qpid/cpp/src/qpid/broker/PersistableMessage.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/PersistableMessage.h
    qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp

Modified: qpid/trunk/qpid/cpp/src/CMakeLists.txt
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/CMakeLists.txt?rev=1485909&r1=1485908&r2=1485909&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/CMakeLists.txt (original)
+++ qpid/trunk/qpid/cpp/src/CMakeLists.txt Fri May 24 00:50:47 2013
@@ -1198,6 +1198,7 @@ set (qpidbroker_SOURCES
      qpid/broker/ExchangeRegistry.cpp
      qpid/broker/FanOutExchange.cpp
      qpid/broker/HeadersExchange.cpp
+     qpid/broker/IngressCompletion.cpp
      qpid/broker/Link.cpp
      qpid/broker/LinkRegistry.cpp
      qpid/broker/LossyQueue.cpp

Modified: qpid/trunk/qpid/cpp/src/Makefile.am
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/Makefile.am?rev=1485909&r1=1485908&r2=1485909&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/Makefile.am (original)
+++ qpid/trunk/qpid/cpp/src/Makefile.am Fri May 24 00:50:47 2013
@@ -635,6 +635,8 @@ libqpidbroker_la_SOURCES = \
   qpid/broker/HeadersExchange.cpp \
   qpid/broker/HeadersExchange.h \
   qpid/broker/AsyncCompletion.h \
+  qpid/broker/IngressCompletion.h \
+  qpid/broker/IngressCompletion.cpp \
   qpid/broker/IndexedDeque.h \
   qpid/broker/Link.cpp \
   qpid/broker/Link.h \

Added: qpid/trunk/qpid/cpp/src/qpid/broker/IngressCompletion.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/IngressCompletion.cpp?rev=1485909&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/IngressCompletion.cpp (added)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/IngressCompletion.cpp Fri May 24 00:50:47 2013
@@ -0,0 +1,45 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "IngressCompletion.h"
+#include "Queue.h"
+
+namespace qpid {
+namespace broker {
+IngressCompletion::~IngressCompletion() {}
+
+void IngressCompletion::enqueueAsync(boost::shared_ptr<Queue> q)
+{
+    qpid::sys::Mutex::ScopedLock l(lock);
+    queues.push_back(q);
+}
+
+void IngressCompletion::flush()
+{
+    Queues copy;
+    {
+        qpid::sys::Mutex::ScopedLock l(lock);
+        queues.swap(copy);
+    }
+    for (Queues::const_iterator i = copy.begin(); i != copy.end(); ++i) {
+        (*i)->flush();
+    }
+}
+}} // namespace qpid::broker

Added: qpid/trunk/qpid/cpp/src/qpid/broker/IngressCompletion.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/IngressCompletion.h?rev=1485909&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/IngressCompletion.h (added)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/IngressCompletion.h Fri May 24 00:50:47 2013
@@ -0,0 +1,51 @@
+#ifndef QPID_BROKER_INGRESSCOMPLETION_H
+#define QPID_BROKER_INGRESSCOMPLETION_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "AsyncCompletion.h"
+#include "qpid/sys/Mutex.h"
+#include <boost/shared_ptr.hpp>
+#include <vector>
+
+namespace qpid {
+namespace broker {
+
+class Queue;
+/**
+ * An AsyncCompletion object for async enqueues, that can be flushed
+ * when needed
+ */
+class IngressCompletion : public AsyncCompletion
+{
+  public:
+    virtual ~IngressCompletion();
+
+    void enqueueAsync(boost::shared_ptr<Queue>);
+    void flush();
+  private:
+    typedef std::vector<boost::shared_ptr<Queue> > Queues;
+    Queues queues;
+    qpid::sys::Mutex lock;
+};
+}} // namespace qpid::broker
+
+#endif  /*!QPID_BROKER_INGRESSCOMPLETION_H*/

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Message.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Message.h?rev=1485909&r1=1485908&r2=1485909&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Message.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Message.h Fri May 24 00:50:47 2013
@@ -50,7 +50,7 @@ enum MessageState
 
 class Message {
 public:
-    class Encoding : public AsyncCompletion
+    class Encoding : public IngressCompletion
     {
       public:
         virtual ~Encoding() {}

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/PersistableMessage.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/PersistableMessage.cpp?rev=1485909&r1=1485908&r2=1485909&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/PersistableMessage.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/PersistableMessage.cpp Fri May 24 00:50:47 2013
@@ -1,3 +1,4 @@
+
 /*
  *
  * Licensed to the Apache Software Foundation (ASF) under one
@@ -21,7 +22,7 @@
 
 
 #include "qpid/broker/PersistableMessage.h"
-#include "qpid/broker/MessageStore.h"
+#include "qpid/broker/Queue.h"
 #include <iostream>
 
 using namespace qpid::broker;
@@ -32,7 +33,7 @@ namespace broker {
 PersistableMessage::~PersistableMessage() {}
 PersistableMessage::PersistableMessage() : ingressCompletion(0), persistenceId(0) {}
 
-void PersistableMessage::setIngressCompletion(boost::intrusive_ptr<AsyncCompletion>
i)
+void PersistableMessage::setIngressCompletion(boost::intrusive_ptr<IngressCompletion>
i)
 {
     ingressCompletion = i.get();
     /**
@@ -57,21 +58,13 @@ void PersistableMessage::setIngressCompl
     }
 }
 
-
-void PersistableMessage::flush()
-{
-    //TODO: is this really the right place for this?
-}
-
-
-void PersistableMessage::enqueueAsync(PersistableQueue::shared_ptr, MessageStore*)
+void PersistableMessage::enqueueAsync(boost::shared_ptr<Queue> q)
 {
     enqueueStart();
+    ingressCompletion->enqueueAsync(q);
 }
 
-bool PersistableMessage::isDequeueComplete() { return false; }
 void PersistableMessage::dequeueComplete() {}
-void PersistableMessage::dequeueAsync(PersistableQueue::shared_ptr, MessageStore*) {}
 
 }}
 

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/PersistableMessage.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/PersistableMessage.h?rev=1485909&r1=1485908&r2=1485909&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/PersistableMessage.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/PersistableMessage.h Fri May 24 00:50:47 2013
@@ -31,8 +31,8 @@
 #include "qpid/framing/amqp_types.h"
 #include "qpid/framing/amqp_framing.h"
 #include "qpid/sys/Mutex.h"
-#include "qpid/broker/PersistableQueue.h"
-#include "qpid/broker/AsyncCompletion.h"
+#include "qpid/broker/IngressCompletion.h"
+#include <boost/shared_ptr.hpp>
 
 namespace qpid {
 namespace types {
@@ -57,36 +57,27 @@ class PersistableMessage : public Persis
      * operations have completed, the transfer of this message from the client
      * may be considered complete.
      */
-    AsyncCompletion* ingressCompletion;
-    boost::intrusive_ptr<AsyncCompletion> holder;
+    IngressCompletion* ingressCompletion;
+    boost::intrusive_ptr<IngressCompletion> holder;
     mutable uint64_t persistenceId;
 
   public:
     virtual ~PersistableMessage();
     PersistableMessage();
 
-    void flush();
-
-    QPID_BROKER_EXTERN void setStore(MessageStore*);
-
     virtual QPID_BROKER_EXTERN bool isPersistent() const = 0;
 
     /** track the progress of a message received by the broker - see ingressCompletion above
*/
     QPID_BROKER_INLINE_EXTERN bool isIngressComplete() { return ingressCompletion->isDone();
}
-    QPID_BROKER_INLINE_EXTERN AsyncCompletion& getIngressCompletion() { return *ingressCompletion;
}
-    QPID_BROKER_EXTERN void setIngressCompletion(boost::intrusive_ptr<AsyncCompletion>
i);
+    QPID_BROKER_INLINE_EXTERN IngressCompletion& getIngressCompletion() { return *ingressCompletion;
}
+    QPID_BROKER_EXTERN void setIngressCompletion(boost::intrusive_ptr<IngressCompletion>
i);
 
     QPID_BROKER_INLINE_EXTERN void enqueueStart() { ingressCompletion->startCompleter();
}
     QPID_BROKER_INLINE_EXTERN void enqueueComplete() { ingressCompletion->finishCompleter();
}
 
-    QPID_BROKER_EXTERN void enqueueAsync(PersistableQueue::shared_ptr queue,
-                                         MessageStore* _store);
-
+    QPID_BROKER_EXTERN void enqueueAsync(boost::shared_ptr<Queue> queue);
 
-    QPID_BROKER_EXTERN bool isDequeueComplete();
     QPID_BROKER_EXTERN void dequeueComplete();
-    QPID_BROKER_EXTERN void dequeueAsync(PersistableQueue::shared_ptr queue,
-                                         MessageStore* _store);
 
     uint64_t getPersistenceId() const { return persistenceId; }
     void setPersistenceId(uint64_t _persistenceId) const { persistenceId = _persistenceId;
}

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp?rev=1485909&r1=1485908&r2=1485909&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp Fri May 24 00:50:47 2013
@@ -846,7 +846,7 @@ bool Queue::enqueue(TransactionContext* 
         // when it considers the message stored.
         boost::intrusive_ptr<PersistableMessage> pmsg = msg.getPersistentContext();
         assert(pmsg);
-        pmsg->enqueueAsync(shared_from_this(), store);
+        pmsg->enqueueAsync(shared_from_this());
         try {
             store->enqueue(ctxt, pmsg, *this);
         } catch (...) {



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


Mime
View raw message