activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tab...@apache.org
Subject git commit: https://issues.apache.org/jira/browse/AMQCPP-543
Date Thu, 14 Aug 2014 22:11:57 GMT
Repository: activemq-cpp
Updated Branches:
  refs/heads/trunk 6b9bd99a1 -> b8a98a3c0


https://issues.apache.org/jira/browse/AMQCPP-543

Implemented this based on the provided patch with some tweaks.  

Project: http://git-wip-us.apache.org/repos/asf/activemq-cpp/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-cpp/commit/b8a98a3c
Tree: http://git-wip-us.apache.org/repos/asf/activemq-cpp/tree/b8a98a3c
Diff: http://git-wip-us.apache.org/repos/asf/activemq-cpp/diff/b8a98a3c

Branch: refs/heads/trunk
Commit: b8a98a3c0d8887045ed717c0bbb1c132f383eef5
Parents: 6b9bd99
Author: Timothy Bish <tabish121@gmail.com>
Authored: Thu Aug 14 18:09:46 2014 -0400
Committer: Timothy Bish <tabish121@gmail.com>
Committed: Thu Aug 14 18:09:46 2014 -0400

----------------------------------------------------------------------
 .../src/main/activemq/core/ActiveMQConnection.cpp  | 17 ++++++++++++++++-
 .../src/main/activemq/core/ActiveMQConnection.h    |  8 ++++++++
 .../core/kernels/ActiveMQProducerKernel.cpp        |  7 +++++--
 .../activemq/wireformat/openwire/OpenWireFormat.h  |  2 +-
 4 files changed, 30 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-cpp/blob/b8a98a3c/activemq-cpp/src/main/activemq/core/ActiveMQConnection.cpp
----------------------------------------------------------------------
diff --git a/activemq-cpp/src/main/activemq/core/ActiveMQConnection.cpp b/activemq-cpp/src/main/activemq/core/ActiveMQConnection.cpp
index ce30892..7cd9061 100644
--- a/activemq-cpp/src/main/activemq/core/ActiveMQConnection.cpp
+++ b/activemq-cpp/src/main/activemq/core/ActiveMQConnection.cpp
@@ -37,6 +37,7 @@
 #include <activemq/util/IdGenerator.h>
 #include <activemq/transport/failover/FailoverTransport.h>
 #include <activemq/transport/ResponseCallback.h>
+#include <activemq/wireformat/openwire/OpenWireFormat.h>
 
 #include <decaf/lang/Math.h>
 #include <decaf/lang/Boolean.h>
@@ -83,6 +84,7 @@ using namespace activemq::exceptions;
 using namespace activemq::threads;
 using namespace activemq::transport;
 using namespace activemq::transport::failover;
+using namespace activemq::wireformat::openwire;
 using namespace decaf;
 using namespace decaf::io;
 using namespace decaf::util;
@@ -199,6 +201,7 @@ namespace core {
         Pointer<commands::BrokerInfo> brokerInfo;
         Pointer<commands::WireFormatInfo> brokerWireFormatInfo;
         Pointer<AtomicInteger> transportInterruptionProcessingComplete;
+        Pointer<AtomicInteger> protocolVersion;
         Pointer<CountDownLatch> brokerInfoReceived;
         Pointer<AdvisoryConsumer> advisoryConsumer;
 
@@ -286,6 +289,7 @@ namespace core {
             connectionId->setValue(uniqueId);
 
             this->transportInterruptionProcessingComplete.reset(new AtomicInteger());
+            this->protocolVersion.reset(new AtomicInteger(OpenWireFormat::MAX_SUPPORTED_VERSION));
             this->executor.reset(
                 new ThreadPoolExecutor(1, 1, 5, TimeUnit::SECONDS,
                     new LinkedBlockingQueue<Runnable*>(),
@@ -1083,7 +1087,7 @@ void ActiveMQConnection::onCommand(const Pointer<Command> command)
{
             }
 
         } else if (command->isWireFormatInfo()) {
-            this->config->brokerWireFormatInfo = command.dynamicCast<WireFormatInfo>();
+            this->onWireFormatInfo(command);
         } else if (command->isBrokerInfo()) {
             this->config->brokerInfo = command.dynamicCast<BrokerInfo>();
             this->config->brokerInfoReceived->countDown();
@@ -1116,6 +1120,12 @@ void ActiveMQConnection::onCommand(const Pointer<Command> command)
{
 }
 
 ////////////////////////////////////////////////////////////////////////////////
+void ActiveMQConnection::onWireFormatInfo(Pointer<commands::Command> command AMQCPP_UNUSED)
{
+    this->config->brokerWireFormatInfo = command.dynamicCast<WireFormatInfo>();
+    this->config->protocolVersion->set(this->config->brokerWireFormatInfo->getVersion());
+}
+
+////////////////////////////////////////////////////////////////////////////////
 void ActiveMQConnection::onControlCommand(Pointer<commands::Command> command AMQCPP_UNUSED)
{
     // Don't need to do anything yet as close and shutdown are applicable yet.
 }
@@ -1909,3 +1919,8 @@ bool ActiveMQConnection::isAlwaysSessionAsync() const {
 void ActiveMQConnection::setAlwaysSessionAsync(bool alwaysSessionAsync) {
     this->config->alwaysSessionAsync = alwaysSessionAsync;
 }
+
+////////////////////////////////////////////////////////////////////////////////
+int ActiveMQConnection::getProtocolVersion() const {
+    return this->config->protocolVersion->get();
+}

http://git-wip-us.apache.org/repos/asf/activemq-cpp/blob/b8a98a3c/activemq-cpp/src/main/activemq/core/ActiveMQConnection.h
----------------------------------------------------------------------
diff --git a/activemq-cpp/src/main/activemq/core/ActiveMQConnection.h b/activemq-cpp/src/main/activemq/core/ActiveMQConnection.h
index 08e5d24..12fd1d4 100644
--- a/activemq-cpp/src/main/activemq/core/ActiveMQConnection.h
+++ b/activemq-cpp/src/main/activemq/core/ActiveMQConnection.h
@@ -799,6 +799,11 @@ namespace core {
          */
         void setAlwaysSessionAsync(bool alwaysSessionAsync);
 
+        /**
+         * @returns the current connection's OpenWire protocol version.
+         */
+        int getProtocolVersion() const;
+
     public: // TransportListener
 
         /**
@@ -1066,6 +1071,9 @@ namespace core {
         // Allow subclasses to access the original Properties object for this connection.
         const decaf::util::Properties& getProperties() const;
 
+        // Process the WireFormatInfo command
+        void onWireFormatInfo(Pointer<commands::Command> command);
+
         // Process the ControlCommand command
         void onControlCommand(Pointer<commands::Command> command);
 

http://git-wip-us.apache.org/repos/asf/activemq-cpp/blob/b8a98a3c/activemq-cpp/src/main/activemq/core/kernels/ActiveMQProducerKernel.cpp
----------------------------------------------------------------------
diff --git a/activemq-cpp/src/main/activemq/core/kernels/ActiveMQProducerKernel.cpp b/activemq-cpp/src/main/activemq/core/kernels/ActiveMQProducerKernel.cpp
index 902f302..848a026 100644
--- a/activemq-cpp/src/main/activemq/core/kernels/ActiveMQProducerKernel.cpp
+++ b/activemq-cpp/src/main/activemq/core/kernels/ActiveMQProducerKernel.cpp
@@ -81,8 +81,11 @@ ActiveMQProducerKernel::ActiveMQProducerKernel(ActiveMQSessionKernel* session,
         this->destination = destination.dynamicCast<cms::Destination>();
     }
 
-    // TODO - Check for need of MemoryUsage if there's a producer Windows size
-    //        and the Protocol version is greater than 3.
+    // Enable producer window flow control if protocol >= 3 and the window
+    // size > 0
+    if (session->getConnection()->getProtocolVersion() >= 3 && session->getConnection()->getProducerWindowSize()
> 0) {
+        this->memoryUsage.reset(new MemoryUsage(session->getConnection()->getProducerWindowSize()));
+    }
 }
 
 ////////////////////////////////////////////////////////////////////////////////

http://git-wip-us.apache.org/repos/asf/activemq-cpp/blob/b8a98a3c/activemq-cpp/src/main/activemq/wireformat/openwire/OpenWireFormat.h
----------------------------------------------------------------------
diff --git a/activemq-cpp/src/main/activemq/wireformat/openwire/OpenWireFormat.h b/activemq-cpp/src/main/activemq/wireformat/openwire/OpenWireFormat.h
index 8ad0847..e9e76bd 100644
--- a/activemq-cpp/src/main/activemq/wireformat/openwire/OpenWireFormat.h
+++ b/activemq-cpp/src/main/activemq/wireformat/openwire/OpenWireFormat.h
@@ -41,7 +41,7 @@ namespace marshal {
     using decaf::lang::Pointer;
 
     class AMQCPP_API OpenWireFormat : public wireformat::WireFormat {
-    protected:
+    public:
 
         // Declared here to make life easier.
         static const unsigned char NULL_TYPE;


Mime
View raw message