qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From acon...@apache.org
Subject svn commit: r1206352 - in /qpid/branches/qpid-3603/qpid/cpp/src: ./ qpid/broker/ qpid/ha/
Date Fri, 25 Nov 2011 21:52:49 GMT
Author: aconway
Date: Fri Nov 25 21:52:46 2011
New Revision: 1206352

URL: http://svn.apache.org/viewvc?rev=1206352&view=rev
Log:
QPID-3603: Move broker::ReplicatingSubscription to ha namespace and ha plugin.

Added:
    qpid/branches/qpid-3603/qpid/cpp/src/qpid/broker/ConsumerFactory.h   (with props)
    qpid/branches/qpid-3603/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp   (contents,
props changed)
      - copied, changed from r1206351, qpid/branches/qpid-3603/qpid/cpp/src/qpid/broker/ReplicatingSubscription.cpp
    qpid/branches/qpid-3603/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h   (contents, props
changed)
      - copied, changed from r1206351, qpid/branches/qpid-3603/qpid/cpp/src/qpid/broker/ReplicatingSubscription.h
Removed:
    qpid/branches/qpid-3603/qpid/cpp/src/qpid/broker/ReplicatingSubscription.cpp
    qpid/branches/qpid-3603/qpid/cpp/src/qpid/broker/ReplicatingSubscription.h
Modified:
    qpid/branches/qpid-3603/qpid/cpp/src/Makefile.am
    qpid/branches/qpid-3603/qpid/cpp/src/ha.mk
    qpid/branches/qpid-3603/qpid/cpp/src/qpid/broker/Broker.h
    qpid/branches/qpid-3603/qpid/cpp/src/qpid/broker/Consumer.h
    qpid/branches/qpid-3603/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp
    qpid/branches/qpid-3603/qpid/cpp/src/qpid/broker/DeliveryRecord.h
    qpid/branches/qpid-3603/qpid/cpp/src/qpid/broker/SemanticState.cpp
    qpid/branches/qpid-3603/qpid/cpp/src/qpid/broker/SemanticState.h

Modified: qpid/branches/qpid-3603/qpid/cpp/src/Makefile.am
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3603/qpid/cpp/src/Makefile.am?rev=1206352&r1=1206351&r2=1206352&view=diff
==============================================================================
--- qpid/branches/qpid-3603/qpid/cpp/src/Makefile.am (original)
+++ qpid/branches/qpid-3603/qpid/cpp/src/Makefile.am Fri Nov 25 21:52:46 2011
@@ -535,6 +535,7 @@ libqpidbroker_la_SOURCES = \
   qpid/broker/ConnectionState.h \
   qpid/broker/ConnectionToken.h \
   qpid/broker/Consumer.h \
+  qpid/broker/ConsumerFactory.h \
   qpid/broker/Daemon.cpp \
   qpid/broker/Daemon.h \
   qpid/broker/Deliverable.h \
@@ -622,8 +623,6 @@ libqpidbroker_la_SOURCES = \
   qpid/broker/QueuedMessage.h \
   qpid/broker/QueueFlowLimit.h \
   qpid/broker/QueueFlowLimit.cpp \
-  qpid/broker/ReplicatingSubscription.h \
-  qpid/broker/ReplicatingSubscription.cpp \
   qpid/broker/RateFlowcontrol.h \
   qpid/broker/RecoverableConfig.h \
   qpid/broker/RecoverableExchange.h \

Modified: qpid/branches/qpid-3603/qpid/cpp/src/ha.mk
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3603/qpid/cpp/src/ha.mk?rev=1206352&r1=1206351&r2=1206352&view=diff
==============================================================================
--- qpid/branches/qpid-3603/qpid/cpp/src/ha.mk (original)
+++ qpid/branches/qpid-3603/qpid/cpp/src/ha.mk Fri Nov 25 21:52:46 2011
@@ -31,6 +31,8 @@ ha_la_SOURCES =					\
   qpid/ha/Settings.h				\
   qpid/ha/QueueReplicator.h			\
   qpid/ha/QueueReplicator.cpp			\
+  qpid/ha/ReplicatingSubscription.h		\
+  qpid/ha/ReplicatingSubscription.cpp		\
   qpid/ha/WiringReplicator.cpp			\
   qpid/ha/WiringReplicator.h
 

Modified: qpid/branches/qpid-3603/qpid/cpp/src/qpid/broker/Broker.h
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3603/qpid/cpp/src/qpid/broker/Broker.h?rev=1206352&r1=1206351&r2=1206352&view=diff
==============================================================================
--- qpid/branches/qpid-3603/qpid/cpp/src/qpid/broker/Broker.h (original)
+++ qpid/branches/qpid-3603/qpid/cpp/src/qpid/broker/Broker.h Fri Nov 25 21:52:46 2011
@@ -37,6 +37,7 @@
 #include "qpid/broker/Vhost.h"
 #include "qpid/broker/System.h"
 #include "qpid/broker/ExpiryPolicy.h"
+#include "qpid/broker/ConsumerFactory.h"
 #include "qpid/management/Manageable.h"
 #include "qpid/management/ManagementAgent.h"
 #include "qmf/org/apache/qpid/broker/Broker.h"
@@ -199,6 +200,7 @@ public:
     bool inCluster, clusterUpdatee;
     boost::intrusive_ptr<ExpiryPolicy> expiryPolicy;
     ConnectionCounter connectionCounter;
+    ConsumerFactories consumerFactories;
 
   public:
     virtual ~Broker();
@@ -357,6 +359,8 @@ public:
                 const std::string& key,
                 const std::string& userId,
                 const std::string& connectionId);
+
+    ConsumerFactories&  getConsumerFactories() { return consumerFactories; }
 };
 
 }}

Modified: qpid/branches/qpid-3603/qpid/cpp/src/qpid/broker/Consumer.h
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3603/qpid/cpp/src/qpid/broker/Consumer.h?rev=1206352&r1=1206351&r2=1206352&view=diff
==============================================================================
--- qpid/branches/qpid-3603/qpid/cpp/src/qpid/broker/Consumer.h (original)
+++ qpid/branches/qpid-3603/qpid/cpp/src/qpid/broker/Consumer.h Fri Nov 25 21:52:46 2011
@@ -31,6 +31,9 @@ namespace broker {
 class Queue;
 class QueueListeners;
 
+/**
+ * Base class for consumers which represent a subscription to a queue.
+ */
 class Consumer {
     const bool acquires;
     // inListeners allows QueueListeners to efficiently track if this instance is registered

Added: qpid/branches/qpid-3603/qpid/cpp/src/qpid/broker/ConsumerFactory.h
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3603/qpid/cpp/src/qpid/broker/ConsumerFactory.h?rev=1206352&view=auto
==============================================================================
--- qpid/branches/qpid-3603/qpid/cpp/src/qpid/broker/ConsumerFactory.h (added)
+++ qpid/branches/qpid-3603/qpid/cpp/src/qpid/broker/ConsumerFactory.h Fri Nov 25 21:52:46
2011
@@ -0,0 +1,70 @@
+#ifndef QPID_BROKER_CONSUMERFACTORY_H
+#define QPID_BROKER_CONSUMERFACTORY_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+// TODO aconway 2011-11-25: it's ugly exposing SemanticState::ConsumerImpl in public.
+// Refactor to use a more abstract interface.
+
+#include "qpid/broker/SemanticState.h"
+
+namespace qpid {
+namespace broker {
+
+/**
+ * Base class for consumer factoires. Plugins can register a
+ * ConsumerFactory via Broker:: getConsumerFactories() Each time a
+ * conumer is created, each factory is tried in turn till one returns
+ * non-0.
+ */
+class ConsumerFactory
+{
+  public:
+    virtual ~ConsumerFactory() {}
+
+    virtual boost::shared_ptr<SemanticState::ConsumerImpl> create(
+        SemanticState* parent,
+        const std::string& name, boost::shared_ptr<Queue> queue,
+        bool ack, bool acquire, bool exclusive, const std::string& tag,
+        const std::string& resumeId, uint64_t resumeTtl, const framing::FieldTable&
arguments) = 0;
+};
+
+/** A set of factories held by the broker
+ * THREAD UNSAFE: see notes on member functions.
+ */
+class ConsumerFactories {
+  public:
+    typedef std::vector<boost::shared_ptr<ConsumerFactory> > Factories;
+
+    /** Thread safety: May only be called during plug-in initialization. */
+    void add(const boost::shared_ptr<ConsumerFactory>& cf) { factories.push_back(cf);
}
+
+    /** Thread safety: May only be called after plug-in initialization. */
+    const Factories& get() const { return factories; }
+
+  private:
+    Factories factories;
+};
+
+}} // namespace qpid::broker
+
+#endif  /*!QPID_BROKER_CONSUMERFACTORY_H*/

Propchange: qpid/branches/qpid-3603/qpid/cpp/src/qpid/broker/ConsumerFactory.h
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: qpid/branches/qpid-3603/qpid/cpp/src/qpid/broker/ConsumerFactory.h
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Modified: qpid/branches/qpid-3603/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3603/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp?rev=1206352&r1=1206351&r2=1206352&view=diff
==============================================================================
--- qpid/branches/qpid-3603/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp (original)
+++ qpid/branches/qpid-3603/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp Fri Nov 25 21:52:46
2011
@@ -37,7 +37,7 @@ DeliveryRecord::DeliveryRecord(const Que
                                bool _acquired,
                                bool accepted, 
                                bool _windowing,
-                               uint32_t _credit, bool _delayedCompletion) : msg(_msg),
+                               uint32_t _credit, bool _isDelayedCompletion) : msg(_msg),
                                                   queue(_queue), 
                                                   tag(_tag),
                                                   acquired(_acquired),
@@ -47,7 +47,7 @@ DeliveryRecord::DeliveryRecord(const Que
                                                   ended(accepted && acquired),
                                                   windowing(_windowing),
                                                   credit(msg.payload ? msg.payload->getRequiredCredit()
: _credit),
-                                                  delayedCompletion(_delayedCompletion)
+                                                  isDelayedCompletion(_isDelayedCompletion)
 {}
 
 bool DeliveryRecord::setEnded()
@@ -115,7 +115,7 @@ bool DeliveryRecord::accept(TransactionC
     if (!ended) {
         if (acquired) {
             queue->dequeue(ctxt, msg);
-        } else if (delayedCompletion) {
+        } else if (isDelayedCompletion) {
             //TODO: this is a nasty way to do this; change it
             msg.payload->getIngressCompletion().finishCompleter();
             QPID_LOG(debug, "Completed " << msg.payload.get());

Modified: qpid/branches/qpid-3603/qpid/cpp/src/qpid/broker/DeliveryRecord.h
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3603/qpid/cpp/src/qpid/broker/DeliveryRecord.h?rev=1206352&r1=1206351&r2=1206352&view=diff
==============================================================================
--- qpid/branches/qpid-3603/qpid/cpp/src/qpid/broker/DeliveryRecord.h (original)
+++ qpid/branches/qpid-3603/qpid/cpp/src/qpid/broker/DeliveryRecord.h Fri Nov 25 21:52:46
2011
@@ -63,7 +63,7 @@ class DeliveryRecord
      * after that).
      */
     uint32_t credit;
-    bool delayedCompletion;
+    bool isDelayedCompletion;
 
   public:
     QPID_BROKER_EXTERN DeliveryRecord(const QueuedMessage& msg,
@@ -73,7 +73,7 @@ class DeliveryRecord
                                       bool accepted,
                                       bool windowing,
                                       uint32_t credit=0,       // Only used if msg is empty.
-                                      bool delayedCompletion=false
+                                      bool isDelayedCompletion=false
     );
     
     bool coveredBy(const framing::SequenceSet* const range) const { return range->contains(id);
}

Modified: qpid/branches/qpid-3603/qpid/cpp/src/qpid/broker/SemanticState.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3603/qpid/cpp/src/qpid/broker/SemanticState.cpp?rev=1206352&r1=1206351&r2=1206352&view=diff
==============================================================================
--- qpid/branches/qpid-3603/qpid/cpp/src/qpid/broker/SemanticState.cpp (original)
+++ qpid/branches/qpid-3603/qpid/cpp/src/qpid/broker/SemanticState.cpp Fri Nov 25 21:52:46
2011
@@ -25,9 +25,7 @@
 #include "qpid/broker/DtxAck.h"
 #include "qpid/broker/DtxTimeout.h"
 #include "qpid/broker/Message.h"
-#include "qpid/ha/WiringReplicator.h"
 #include "qpid/broker/Queue.h"
-#include "qpid/broker/ReplicatingSubscription.h"
 #include "qpid/broker/SessionContext.h"
 #include "qpid/broker/SessionOutputException.h"
 #include "qpid/broker/TxAccept.h"
@@ -110,15 +108,25 @@ bool SemanticState::exists(const string&
 namespace {
     const std::string SEPARATOR("::");
 }
-    
+
 void SemanticState::consume(const string& tag,
                             Queue::shared_ptr queue, bool ackRequired, bool acquire,
-                            bool exclusive, const string& resumeId, uint64_t resumeTtl,
const FieldTable& arguments)
+                            bool exclusive, const string& resumeId, uint64_t resumeTtl,
+                            const FieldTable& arguments)
 {
     // "tag" is only guaranteed to be unique to this session (see AMQP 0-10 Message.subscribe,
destination).
     // Create a globally unique name so the broker can identify individual consumers
     std::string name = session.getSessionId().str() + SEPARATOR + tag;
-    ConsumerImpl::shared_ptr c(ConsumerImpl::create(this, name, queue, ackRequired, acquire,
exclusive, tag, resumeId, resumeTtl, arguments));
+    const ConsumerFactories::Factories& cf(
+        session.getBroker().getConsumerFactories().get());
+    ConsumerImpl::shared_ptr c;
+    for (ConsumerFactories::Factories::const_iterator i = cf.begin(); i != cf.end(); !c)
+        c = (*i)->create(this, name, queue, ackRequired, acquire, exclusive, tag,
+                         resumeId, resumeTtl, arguments);
+    if (!c)                     // Create plain consumer
+        c = ConsumerImpl::shared_ptr(
+            new ConsumerImpl(this, name, queue, ackRequired, acquire, exclusive, tag,
+                             resumeId, resumeTtl, arguments));
     queue->consume(c, exclusive);//may throw exception
     consumers[tag] = c;
 }
@@ -268,26 +276,6 @@ void SemanticState::record(const Deliver
 
 const std::string QPID_SYNC_FREQUENCY("qpid.sync_frequency");
 
-SemanticState::ConsumerImpl::shared_ptr SemanticState::ConsumerImpl::create(SemanticState*
parent,
-                                                                            const string&
name,
-                                                                            Queue::shared_ptr
queue,
-                                                                            bool ack,
-                                                                            bool acquire,
-                                                                            bool exclusive,
-                                                                            const string&
tag,
-                                                                            const string&
resumeId,
-                                                                            uint64_t resumeTtl,
-                                                                            const framing::FieldTable&
arguments)
-{
-    if (arguments.isSet("qpid.replicating-subscription")) {
-        shared_ptr result(new ReplicatingSubscription(parent, name, queue, ack, acquire,
exclusive, tag, resumeId, resumeTtl, arguments));
-        boost::dynamic_pointer_cast<ReplicatingSubscription>(result)->init();
-        return result;
-    } else {
-        return shared_ptr(new ConsumerImpl(parent, name, queue, ack, acquire, exclusive,
tag, resumeId, resumeTtl, arguments));
-    }
-}
-
 SemanticState::ConsumerImpl::ConsumerImpl(SemanticState* _parent,
                                           const string& _name,
                                           Queue::shared_ptr _queue,
@@ -299,7 +287,6 @@ SemanticState::ConsumerImpl::ConsumerImp
                                           uint64_t _resumeTtl,
                                           const framing::FieldTable& _arguments
 
-
 ) :
     Consumer(_name, _acquire),
     parent(_parent),
@@ -314,7 +301,7 @@ SemanticState::ConsumerImpl::ConsumerImp
     tag(_tag),
     resumeTtl(_resumeTtl),
     arguments(_arguments),
-    msgCredit(0),
+     msgCredit(0),
     byteCredit(0),
     notifyEnabled(true),
     syncFrequency(_arguments.getAsInt(QPID_SYNC_FREQUENCY)),
@@ -360,7 +347,7 @@ bool SemanticState::ConsumerImpl::delive
 {
     assertClusterSafe();
     allocateCredit(msg.payload);
-    DeliveryRecord record(msg, msg.queue->shared_from_this(), getTag(), acquire, !ackExpected,
windowing, 0, dynamic_cast<const ReplicatingSubscription*>(this));
+    DeliveryRecord record(msg, msg.queue->shared_from_this(), getTag(), acquire, !ackExpected,
windowing, 0, isDelayedCompletion());
     bool sync = syncFrequency && ++deliveryCount >= syncFrequency;
     if (sync) deliveryCount = 0;//reset
     parent->deliver(record, sync);

Modified: qpid/branches/qpid-3603/qpid/cpp/src/qpid/broker/SemanticState.h
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3603/qpid/cpp/src/qpid/broker/SemanticState.h?rev=1206352&r1=1206351&r2=1206352&view=diff
==============================================================================
--- qpid/branches/qpid-3603/qpid/cpp/src/qpid/broker/SemanticState.h (original)
+++ qpid/branches/qpid-3603/qpid/cpp/src/qpid/broker/SemanticState.h Fri Nov 25 21:52:46 2011
@@ -155,11 +155,10 @@ class SemanticState : private boost::non
         management::ManagementObject* GetManagementObject (void) const;
         management::Manageable::status_t ManagementMethod (uint32_t methodId, management::Args&
args, std::string& text);
 
-        static shared_ptr create(SemanticState* parent,
-                                 const std::string& name, boost::shared_ptr<Queue>
queue,
-                                 bool ack, bool acquire, bool exclusive, const std::string&
tag,
-                                 const std::string& resumeId, uint64_t resumeTtl, const
framing::FieldTable& arguments);
-
+        /** This consumer wants delayed completion.
+         * Overridden by ConsumerImpl subclasses.
+         */
+        virtual bool isDelayedCompletion() const { return false; }
     };
 
     typedef std::map<std::string, DtxBuffer::shared_ptr> DtxBufferMap;

Copied: qpid/branches/qpid-3603/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp (from r1206351,
qpid/branches/qpid-3603/qpid/cpp/src/qpid/broker/ReplicatingSubscription.cpp)
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3603/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp?p2=qpid/branches/qpid-3603/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp&p1=qpid/branches/qpid-3603/qpid/cpp/src/qpid/broker/ReplicatingSubscription.cpp&r1=1206351&r2=1206352&rev=1206352&view=diff
==============================================================================
--- qpid/branches/qpid-3603/qpid/cpp/src/qpid/broker/ReplicatingSubscription.cpp (original)
+++ qpid/branches/qpid-3603/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp Fri Nov 25 21:52:46
2011
@@ -20,18 +20,19 @@
  */
 
 #include "ReplicatingSubscription.h"
-#include "Queue.h"
+#include "qpid/broker/Queue.h"
 #include "qpid/framing/AMQFrame.h"
 #include "qpid/framing/MessageTransferBody.h"
 #include "qpid/log/Statement.h"
 
 namespace qpid {
-namespace broker {
+namespace ha {
 
 using namespace framing;
+using namespace broker;
 
 const std::string DOLLAR("$");
-const std::string INTERNAL("_internall");
+const std::string INTERNAL("_internal");
 
 class ReplicationStateInitialiser
 {

Propchange: qpid/branches/qpid-3603/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: qpid/branches/qpid-3603/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Copied: qpid/branches/qpid-3603/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h (from r1206351,
qpid/branches/qpid-3603/qpid/cpp/src/qpid/broker/ReplicatingSubscription.h)
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3603/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h?p2=qpid/branches/qpid-3603/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h&p1=qpid/branches/qpid-3603/qpid/cpp/src/qpid/broker/ReplicatingSubscription.h&r1=1206351&r2=1206352&rev=1206352&view=diff
==============================================================================
--- qpid/branches/qpid-3603/qpid/cpp/src/qpid/broker/ReplicatingSubscription.h (original)
+++ qpid/branches/qpid-3603/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h Fri Nov 25 21:52:46
2011
@@ -23,35 +23,48 @@
  */
 
 #include "qpid/broker/SemanticState.h"
+#include "qpid/broker/QueueObserver.h"
 
 namespace qpid {
+
 namespace broker {
+class Message;
+class Queue;
+class QueuedMessage;
+class OwnershipToken;
+}
+
+namespace ha {
 
 /**
  * Subscriber to a remote queue that replicates to a local queue.
  */
-class ReplicatingSubscription : public SemanticState::ConsumerImpl, public QueueObserver
+class ReplicatingSubscription : public broker::SemanticState::ConsumerImpl,
+                                public broker::QueueObserver
 {
   public:
-    ReplicatingSubscription(SemanticState* parent,
-                            const std::string& name, boost::shared_ptr<Queue> queue,
+    ReplicatingSubscription(broker::SemanticState* parent,
+                            const std::string& name, boost::shared_ptr<broker::Queue>
,
                             bool ack, bool acquire, bool exclusive, const std::string&
tag,
-                            const std::string& resumeId, uint64_t resumeTtl, const framing::FieldTable&
arguments);
+                            const std::string& resumeId, uint64_t resumeTtl,
+                            const framing::FieldTable& arguments);
     ~ReplicatingSubscription();
 
     void init();
     void cancel();
-    bool deliver(QueuedMessage& msg);
-    void enqueued(const QueuedMessage&);
-    void dequeued(const QueuedMessage&);
-    void acquired(const QueuedMessage&) {}
-    void requeued(const QueuedMessage&) {}
+    bool deliver(broker::QueuedMessage& msg);
+    void enqueued(const broker::QueuedMessage&);
+    void dequeued(const broker::QueuedMessage&);
+    void acquired(const broker::QueuedMessage&) {}
+    void requeued(const broker::QueuedMessage&) {}
+
+    bool isDelayedCompletion() const { return true; }
 
   protected:
     bool doDispatch();
   private:
-    boost::shared_ptr<Queue> events;
-    boost::shared_ptr<Consumer> consumer;
+    boost::shared_ptr<broker::Queue> events;
+    boost::shared_ptr<broker::Consumer> consumer;
     qpid::framing::SequenceSet range;
 
     void generateDequeueEvent();
@@ -60,11 +73,11 @@ class ReplicatingSubscription : public S
       public:
         DelegatingConsumer(ReplicatingSubscription&);
         ~DelegatingConsumer();
-        bool deliver(QueuedMessage& msg);
+        bool deliver(broker::QueuedMessage& msg);
         void notify();
-        bool filter(boost::intrusive_ptr<Message>);
-        bool accept(boost::intrusive_ptr<Message>);
-        OwnershipToken* getSession();
+        bool filter(boost::intrusive_ptr<broker::Message>);
+        bool accept(boost::intrusive_ptr<broker::Message>);
+        broker::OwnershipToken* getSession();
       private:
         ReplicatingSubscription& delegate;
     };

Propchange: qpid/branches/qpid-3603/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: qpid/branches/qpid-3603/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h
------------------------------------------------------------------------------
    svn:keywords = Rev Date



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org


Mime
View raw message