qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From g...@apache.org
Subject svn commit: r711256 - in /incubator/qpid/trunk/qpid/cpp: examples/failover/ src/ src/qpid/client/
Date Tue, 04 Nov 2008 13:59:44 GMT
Author: gsim
Date: Tue Nov  4 05:59:44 2008
New Revision: 711256

URL: http://svn.apache.org/viewvc?rev=711256&view=rev
Log:
Adding a couple of utilities (don't alter any existing functionality) that are useful in applications
handling failover.
Added example of their use that I've been using in testing.


Added:
    incubator/qpid/trunk/qpid/cpp/examples/failover/replaying_sender.cpp   (with props)
    incubator/qpid/trunk/qpid/cpp/examples/failover/resuming_receiver.cpp   (with props)
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/FailoverManager.cpp   (with props)
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/FailoverManager.h   (with props)
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/MessageReplayTracker.cpp   (with props)
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/MessageReplayTracker.h   (with props)
Modified:
    incubator/qpid/trunk/qpid/cpp/examples/failover/   (props changed)
    incubator/qpid/trunk/qpid/cpp/examples/failover/Makefile.am
    incubator/qpid/trunk/qpid/cpp/src/Makefile.am

Propchange: incubator/qpid/trunk/qpid/cpp/examples/failover/
------------------------------------------------------------------------------
--- svn:ignore (original)
+++ svn:ignore Tue Nov  4 05:59:44 2008
@@ -3,3 +3,5 @@
 Makefile
 declare_queues
 direct_producer
+resuming_receiver
+replaying_sender

Modified: incubator/qpid/trunk/qpid/cpp/examples/failover/Makefile.am
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/examples/failover/Makefile.am?rev=711256&r1=711255&r2=711256&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/examples/failover/Makefile.am (original)
+++ incubator/qpid/trunk/qpid/cpp/examples/failover/Makefile.am Tue Nov  4 05:59:44 2008
@@ -1,8 +1,9 @@
-examplesdir=$(pkgdatadir)/examples/direct
+examplesdir=$(pkgdatadir)/examples/failover
 
 include $(top_srcdir)/examples/makedist.mk
 
-noinst_PROGRAMS=direct_producer listener declare_queues
+noinst_PROGRAMS=direct_producer listener declare_queues resuming_receiver replaying_sender
+
 direct_producer_SOURCES=direct_producer.cpp
 direct_producer_LDADD=$(CLIENT_LIB)
 
@@ -12,10 +13,18 @@
 declare_queues_SOURCES=declare_queues.cpp
 declare_queues_LDADD=$(CLIENT_LIB)
 
-examples_DATA=               \
-	direct_producer.cpp  \
-	listener.cpp         \
-	declare_queues.cpp   \
+resuming_receiver_SOURCES=resuming_receiver.cpp
+resuming_receiver_LDADD=$(CLIENT_LIB)
+
+replaying_sender_SOURCES=replaying_sender.cpp
+replaying_sender_LDADD=$(CLIENT_LIB)
+
+examples_DATA=                \
+	direct_producer.cpp   \
+	listener.cpp          \
+	declare_queues.cpp    \
+	resuming_receiver.cpp \
+	replaying_sender.cpp  \
         $(MAKEDIST)
 
 # FIXME aconway 2008-10-10: add verify scripts.

Added: incubator/qpid/trunk/qpid/cpp/examples/failover/replaying_sender.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/examples/failover/replaying_sender.cpp?rev=711256&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/examples/failover/replaying_sender.cpp (added)
+++ incubator/qpid/trunk/qpid/cpp/examples/failover/replaying_sender.cpp Tue Nov  4 05:59:44
2008
@@ -0,0 +1,97 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <qpid/client/FailoverManager.h>
+#include <qpid/client/Session.h>
+#include <qpid/client/AsyncSession.h>
+#include <qpid/client/Message.h>
+#include <qpid/client/MessageReplayTracker.h>
+#include <qpid/Exception.h>
+
+#include <iostream>
+#include <sstream>
+
+using namespace qpid;
+using namespace qpid::client;
+using namespace qpid::framing;
+
+using namespace std;
+
+class Sender : public FailoverManager::Command
+{
+  public:
+    Sender(const std::string& queue, uint count);
+    void execute(AsyncSession& session, bool isRetry);
+    uint getSent();
+  private:
+    MessageReplayTracker sender;
+    const uint count;
+    uint sent;
+    Message message;
+    
+};
+
+Sender::Sender(const std::string& queue, uint count_) : sender(10), count(count_), sent(0)

+{
+    message.getDeliveryProperties().setRoutingKey(queue);
+}
+
+void Sender::execute(AsyncSession& session, bool isRetry)
+{
+    if (isRetry) sender.replay(session);
+    else sender.init(session);
+    while (sent < count) {
+        stringstream message_data;
+        message_data << ++sent;
+        message.setData(message_data.str());
+        message.getHeaders().setInt("sn", sent);
+        sender.send(message);
+        if (count > 1000 && !(sent % 1000)) {
+            std::cout << "sent " << sent << " of " << count <<
std::endl;
+        }
+    }
+    message.setData("That's all, folks!");
+    sender.send(message);
+}
+
+uint Sender::getSent()
+{
+    return sent;
+}
+
+int main(int argc, char ** argv) 
+{
+    ConnectionSettings settings;
+    if (argc > 1) settings.host = argv[1];
+    if (argc > 2) settings.port = atoi(argv[2]);
+    
+    FailoverManager connection(settings);
+    Sender sender("message_queue", argc > 3 ? atoi(argv[3]) : 1000);
+    try {
+        connection.execute(sender);
+        std::cout << "Sent " << sender.getSent() << " messages." <<
std::endl;
+        connection.close();
+        return 0;  
+    } catch(const std::exception& error) {
+        std::cout << "Failed: " << error.what() << std::endl;
+    }
+    return 1;
+}

Propchange: incubator/qpid/trunk/qpid/cpp/examples/failover/replaying_sender.cpp
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/trunk/qpid/cpp/examples/failover/replaying_sender.cpp
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: incubator/qpid/trunk/qpid/cpp/examples/failover/resuming_receiver.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/examples/failover/resuming_receiver.cpp?rev=711256&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/examples/failover/resuming_receiver.cpp (added)
+++ incubator/qpid/trunk/qpid/cpp/examples/failover/resuming_receiver.cpp Tue Nov  4 05:59:44
2008
@@ -0,0 +1,106 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <qpid/client/FailoverManager.h>
+#include <qpid/client/Session.h>
+#include <qpid/client/Message.h>
+#include <qpid/client/SubscriptionManager.h>
+
+#include <iostream>
+#include <fstream>
+
+
+using namespace qpid;
+using namespace qpid::client;
+using namespace qpid::framing;
+
+using namespace std;
+
+
+class Listener : public MessageListener, public FailoverManager::Command
+{
+  public:
+    Listener();
+    void received(Message& message);
+    void execute(AsyncSession& session, bool isRetry);
+  private:
+    Subscription subscription;
+    uint count;
+    uint skipped;
+    uint lastSn;
+};
+
+Listener::Listener() : count(0), skipped(0), lastSn(0) {}
+
+void Listener::received(Message & message) 
+{
+    if (message.getData() == "That's all, folks!") {
+        std::cout << "Shutting down listener for " << message.getDestination()
+                  << std::endl;
+
+        std::cout << "Listener received " << count << " messages (" <<
skipped << " skipped)" << std::endl;
+        subscription.cancel();
+    } else {
+        uint sn = message.getHeaders().getAsInt("sn");
+        if (lastSn < sn) {
+            if (sn - lastSn > 1) {
+                std::cout << "Warning: gap in sequence between " << lastSn <<
" and " << sn << std::endl;
+            }
+            lastSn = sn;
+            ++count;
+        } else {
+            ++skipped;
+        }
+    }
+}
+
+void Listener::execute(AsyncSession& session, bool isRetry)
+{
+    if (isRetry) {
+        std::cout << "Resuming from " << count << std::endl;
+    }
+    SubscriptionManager subs(session);
+    subscription = subs.subscribe(*this, "message_queue");
+    subs.run();
+}
+
+int main(int argc, char ** argv)
+{
+    ConnectionSettings settings;
+    if (argc > 1) settings.host = argv[1];
+    if (argc > 2) settings.port = atoi(argv[2]);
+    
+    FailoverManager connection(settings);
+    Listener listener;
+
+    try {
+        connection.execute(listener);
+        connection.close();
+        std::cout << "Completed without error." << std::endl;
+        return 0;
+    } catch(const std::exception& error) {
+        std::cout << "Failure: " << error.what() << std::endl;
+    }
+    return 1;
+}
+
+
+

Propchange: incubator/qpid/trunk/qpid/cpp/examples/failover/resuming_receiver.cpp
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/trunk/qpid/cpp/examples/failover/resuming_receiver.cpp
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Modified: incubator/qpid/trunk/qpid/cpp/src/Makefile.am
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/Makefile.am?rev=711256&r1=711255&r2=711256&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/Makefile.am (original)
+++ incubator/qpid/trunk/qpid/cpp/src/Makefile.am Tue Nov  4 05:59:44 2008
@@ -381,6 +381,7 @@
   qpid/client/Demux.cpp				\
   qpid/client/Dispatcher.cpp			\
   qpid/client/FailoverConnection.cpp            \
+  qpid/client/FailoverManager.cpp            \
   qpid/client/FailoverSession.cpp               \
   qpid/client/FailoverSubscriptionManager.cpp   \
   qpid/client/FailoverListener.h		\
@@ -393,6 +394,7 @@
   qpid/client/LocalQueue.cpp			\
   qpid/client/Message.cpp			\
   qpid/client/MessageListener.cpp		\
+  qpid/client/MessageReplayTracker.cpp		\
   qpid/client/QueueOptions.cpp			\
   qpid/client/Results.cpp			\
   qpid/client/SessionBase_0_10.cpp		\
@@ -519,6 +521,7 @@
   qpid/client/Dispatcher.h \
   qpid/client/Execution.h \
   qpid/client/FailoverConnection.h \
+  qpid/client/FailoverManager.h \
   qpid/client/FailoverSession.h \
   qpid/client/Subscription.h \
   qpid/client/SubscriptionImpl.h \
@@ -533,6 +536,7 @@
   qpid/client/QueueOptions.h \
   qpid/client/Message.h \
   qpid/client/MessageListener.h \
+  qpid/client/MessageReplayTracker.h \
   qpid/client/Results.h \
   qpid/client/SessionBase_0_10.h \
   qpid/client/Session.h \

Added: incubator/qpid/trunk/qpid/cpp/src/qpid/client/FailoverManager.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/FailoverManager.cpp?rev=711256&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/FailoverManager.cpp (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/FailoverManager.cpp Tue Nov  4 05:59:44
2008
@@ -0,0 +1,115 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "FailoverManager.h"
+#include "qpid/Exception.h"
+#include "qpid/log/Statement.h"
+
+
+namespace qpid {
+namespace client {
+
+using qpid::sys::Monitor;
+
+FailoverManager::FailoverManager(const ConnectionSettings& s) : settings(s), state(IDLE)
{}
+
+void FailoverManager::execute(Command& c)
+{
+    bool retry = false;
+    bool completed = false;
+    while (!completed) {
+        try {
+            AsyncSession session = connect().newSession();
+            c.execute(session, retry);
+            session.sync();//TODO: shouldn't be required, but seems there is a bug in session
+            session.close();
+            completed = true;
+        } catch(const TransportFailure&) {
+            retry = true;
+        }            
+    }
+}
+
+void FailoverManager::close()
+{
+    Monitor::ScopedLock l(lock);
+    connection.close();
+}
+
+Connection& FailoverManager::connect(std::vector<Url> brokers)
+{
+    Monitor::ScopedLock l(lock);
+    if (state == CANT_CONNECT) {
+        state = IDLE;//retry
+    }
+    while (!connection.isOpen()) {
+        if (state == CONNECTING) {
+            lock.wait();
+        } else if (state == CANT_CONNECT) {
+            throw CannotConnectException("Cannot establish a connection");
+        } else {
+            state = CONNECTING;
+            Connection c;
+            attempt(c, settings, brokers.empty() ? connection.getKnownBrokers() : brokers);
+            if (c.isOpen()) state = IDLE;
+            else state = CANT_CONNECT;
+            connection = c;
+            lock.notifyAll();
+        }
+    }
+    return connection;
+}
+
+Connection& FailoverManager::getConnection()
+{
+    Monitor::ScopedLock l(lock);
+    return connection;
+}
+
+void FailoverManager::attempt(Connection& c, ConnectionSettings s, std::vector<Url>
urls)
+{
+    Monitor::ScopedUnlock u(lock);
+    if (urls.empty()) {
+        attempt(c, s);
+    } else {
+        for (std::vector<Url>::const_iterator i = urls.begin(); i != urls.end() &&
!c.isOpen(); ++i) {
+            for (Url::const_iterator j = i->begin(); j != i->end() && !c.isOpen();
++j) {
+                const TcpAddress* tcp = j->get<TcpAddress>();
+                if (tcp) {
+                    s.host = tcp->host;
+                    s.port = tcp->port;
+                    attempt(c, s);
+                }
+            }
+        }
+    }
+}
+
+void FailoverManager::attempt(Connection& c, ConnectionSettings s)
+{
+    try {
+        c.open(s);
+    } catch (const Exception& e) {
+        QPID_LOG(info, "Could not connect to " << s.host << " on " << s.port
<< ": " << e.what()); 
+    }
+}
+
+
+}} // namespace qpid::client

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/client/FailoverManager.cpp
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/client/FailoverManager.cpp
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: incubator/qpid/trunk/qpid/cpp/src/qpid/client/FailoverManager.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/FailoverManager.h?rev=711256&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/FailoverManager.h (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/FailoverManager.h Tue Nov  4 05:59:44 2008
@@ -0,0 +1,70 @@
+#ifndef QPID_CLIENT_FAILOVERMANAGER_H
+#define QPID_CLIENT_FAILOVERMANAGER_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "Connection.h"
+#include "ConnectionSettings.h"
+#include "qpid/Exception.h"
+#include "qpid/client/AsyncSession.h"
+#include "qpid/sys/Monitor.h"
+#include <vector>
+
+namespace qpid {
+namespace client {
+
+struct CannotConnectException : qpid::Exception 
+{
+    CannotConnectException(const std::string& m) : qpid::Exception(m) {}
+};
+
+/**
+ * Utility to handle reconnection.
+ */
+class FailoverManager
+{
+  public:
+    struct Command
+    {
+        virtual void execute(AsyncSession& session, bool isRetry) = 0;
+        virtual ~Command() {}
+    };
+
+    FailoverManager(const ConnectionSettings& settings);
+    Connection& connect(std::vector<Url> brokers = std::vector<Url>());
+    Connection& getConnection();
+    void close();
+    void execute(Command&);
+  private:
+    enum State {IDLE, CONNECTING, CANT_CONNECT};
+
+    qpid::sys::Monitor lock;
+    Connection connection;
+    ConnectionSettings settings;
+    State state;
+
+    void attempt(Connection&, ConnectionSettings settings, std::vector<Url> urls);
+    void attempt(Connection&, ConnectionSettings settings);
+};
+}} // namespace qpid::client
+
+#endif  /*!QPID_CLIENT_FAILOVERMANAGER_H*/

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/client/FailoverManager.h
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/client/FailoverManager.h
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: incubator/qpid/trunk/qpid/cpp/src/qpid/client/MessageReplayTracker.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/MessageReplayTracker.cpp?rev=711256&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/MessageReplayTracker.cpp (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/MessageReplayTracker.cpp Tue Nov  4 05:59:44
2008
@@ -0,0 +1,81 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "MessageReplayTracker.h"
+#include <boost/bind.hpp>
+
+namespace qpid {
+namespace client {
+
+MessageReplayTracker::MessageReplayTracker(uint f) : flushInterval(f), count(0) {}
+
+void MessageReplayTracker::send(const Message& message, const std::string& destination)
+{
+    ReplayRecord record(message, destination);
+    record.send(*this);
+    if (flushInterval && ++count >= flushInterval) {
+        checkCompletion();
+        if (!buffer.empty()) session.flush();
+    }
+}
+void MessageReplayTracker::init(AsyncSession s)
+{
+    session = s;
+}
+
+void MessageReplayTracker::replay(AsyncSession s)
+{
+    session = s;
+    std::list<ReplayRecord> copy; 
+    buffer.swap(copy);
+    std::for_each(copy.begin(), copy.end(), boost::bind(&ReplayRecord::send, _1, boost::ref(*this)));
+    session.flush();
+    count = 0;
+}
+
+void MessageReplayTracker::setFlushInterval(uint f)
+{
+    flushInterval = f;
+}
+
+uint MessageReplayTracker::getFlushInterval()
+{
+    return flushInterval;
+}
+
+void MessageReplayTracker::checkCompletion()
+{
+    buffer.remove_if(boost::bind(&ReplayRecord::isComplete, _1));    
+}
+
+MessageReplayTracker::ReplayRecord::ReplayRecord(const Message& m, const std::string&
d) : message(m), destination(d) {}
+
+void MessageReplayTracker::ReplayRecord::send(MessageReplayTracker& tracker)
+{
+    status = tracker.session.messageTransfer(arg::destination=destination, arg::content=message);
+    tracker.buffer.push_back(*this);    
+}
+
+bool MessageReplayTracker::ReplayRecord::isComplete()
+{
+    return status.isComplete();
+}
+
+}} // namespace qpid::client

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/client/MessageReplayTracker.cpp
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/client/MessageReplayTracker.cpp
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: incubator/qpid/trunk/qpid/cpp/src/qpid/client/MessageReplayTracker.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/MessageReplayTracker.h?rev=711256&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/MessageReplayTracker.h (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/MessageReplayTracker.h Tue Nov  4 05:59:44
2008
@@ -0,0 +1,66 @@
+#ifndef QPID_CLIENT_MESSAGEREPLAYTRACKER_H
+#define QPID_CLIENT_MESSAGEREPLAYTRACKER_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "AsyncSession.h"
+#include "Message.h"
+
+#include <list>
+#include <string>
+
+namespace qpid {
+namespace client {
+
+/**
+ * Utility to track messages sent asynchronously, allowing those that
+ * are indoubt to be replayed over a new session.
+ */
+class MessageReplayTracker
+{
+  public:
+    MessageReplayTracker(uint flushInterval);
+    void send(const Message& message, const std::string& destination = "");
+    void init(AsyncSession session);
+    void replay(AsyncSession session);
+    void setFlushInterval(uint interval);
+    uint getFlushInterval();
+    void checkCompletion();
+  private:
+    struct ReplayRecord
+    {
+        Completion status;
+        Message message;
+        std::string destination;
+
+        ReplayRecord(const Message& message, const std::string& destination);
+        void send(MessageReplayTracker&);
+        bool isComplete();
+    };
+
+    AsyncSession session;
+    uint flushInterval;
+    uint count;
+    std::list<ReplayRecord> buffer;
+};
+}} // namespace qpid::client
+
+#endif  /*!QPID_CLIENT_MESSAGEREPLAYTRACKER_H*/

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/client/MessageReplayTracker.h
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/client/MessageReplayTracker.h
------------------------------------------------------------------------------
    svn:keywords = Rev Date



Mime
View raw message