activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tab...@apache.org
Subject activemq git commit: https://issues.apache.org/jira/browse/AMQ-5822
Date Wed, 03 Jun 2015 14:11:14 GMT
Repository: activemq
Updated Branches:
  refs/heads/master 9810e61b1 -> 848adc4b5


https://issues.apache.org/jira/browse/AMQ-5822

Update the receive counter on reads to avoid dropping connections
wrongly.  

Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/848adc4b
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/848adc4b
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/848adc4b

Branch: refs/heads/master
Commit: 848adc4b5d0230b76686616724ae918527ec80e5
Parents: 9810e61
Author: Timothy Bish <tabish121@gmail.com>
Authored: Wed Jun 3 09:54:02 2015 -0400
Committer: Timothy Bish <tabish121@gmail.com>
Committed: Wed Jun 3 10:10:48 2015 -0400

----------------------------------------------------------------------
 .../activemq/transport/nio/NIOTransport.java    | 14 +++-
 .../org/apache/activemq/bugs/AMQ6000Test.java   | 83 ++++++++++++++++++++
 2 files changed, 93 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/848adc4b/activemq-client/src/main/java/org/apache/activemq/transport/nio/NIOTransport.java
----------------------------------------------------------------------
diff --git a/activemq-client/src/main/java/org/apache/activemq/transport/nio/NIOTransport.java
b/activemq-client/src/main/java/org/apache/activemq/transport/nio/NIOTransport.java
index 4cff500..6f7a1af 100644
--- a/activemq-client/src/main/java/org/apache/activemq/transport/nio/NIOTransport.java
+++ b/activemq-client/src/main/java/org/apache/activemq/transport/nio/NIOTransport.java
@@ -29,7 +29,6 @@ import java.nio.channels.SocketChannel;
 
 import javax.net.SocketFactory;
 
-import org.apache.activemq.command.Command;
 import org.apache.activemq.openwire.OpenWireFormat;
 import org.apache.activemq.transport.Transport;
 import org.apache.activemq.transport.tcp.TcpTransport;
@@ -39,8 +38,8 @@ import org.apache.activemq.wireformat.WireFormat;
 
 /**
  * An implementation of the {@link Transport} interface using raw tcp/ip
- * 
- * 
+ *
+ *
  */
 public class NIOTransport extends TcpTransport {
 
@@ -59,16 +58,19 @@ public class NIOTransport extends TcpTransport {
         super(wireFormat, socket);
     }
 
+    @Override
     protected void initializeStreams() throws IOException {
         channel = socket.getChannel();
         channel.configureBlocking(false);
 
         // listen for events telling us when the socket is readable.
         selection = SelectorManager.getInstance().register(channel, new SelectorManager.Listener()
{
+            @Override
             public void onSelect(SelectorSelection selection) {
                 serviceRead();
             }
 
+            @Override
             public void onError(SelectorSelection selection, Throwable error) {
                 if (error instanceof IOException) {
                     onException((IOException)error);
@@ -103,6 +105,8 @@ public class NIOTransport extends TcpTransport {
                     break;
                 }
 
+                this.receiveCounter += readSize;
+
                 if (currentBuffer.hasRemaining()) {
                     continue;
                 }
@@ -135,7 +139,7 @@ public class NIOTransport extends TcpTransport {
                     currentBuffer.flip();
 
                     Object command = wireFormat.unmarshal(new DataInputStream(new NIOInputStream(currentBuffer)));
-                    doConsume((Command)command);
+                    doConsume(command);
 
                     nextFrameSize = -1;
                     inputBuffer.clear();
@@ -152,12 +156,14 @@ public class NIOTransport extends TcpTransport {
         }
     }
 
+    @Override
     protected void doStart() throws Exception {
         connect();
         selection.setInterestOps(SelectionKey.OP_READ);
         selection.enable();
     }
 
+    @Override
     protected void doStop(ServiceStopper stopper) throws Exception {
         if (selection != null) {
             selection.close();

http://git-wip-us.apache.org/repos/asf/activemq/blob/848adc4b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ6000Test.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ6000Test.java b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ6000Test.java
new file mode 100644
index 0000000..39c6fe8
--- /dev/null
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ6000Test.java
@@ -0,0 +1,83 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.bugs;
+
+import javax.jms.BytesMessage;
+import javax.jms.DeliveryMode;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+
+import org.apache.activemq.ActiveMQConnection;
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class AMQ6000Test {
+
+    private static final Logger LOG = LoggerFactory.getLogger(AMQ6000Test.class);
+
+    private BrokerService brokerService;
+    private String connectionUri;
+
+    @Before
+    public void setUp() throws Exception {
+        brokerService = new BrokerService();
+        brokerService.setPersistent(false);
+        brokerService.setUseJmx(false);
+        brokerService.addConnector("nio://0.0.0.0:0?"
+            + "wireFormat.maxInactivityDurationInitalDelay=1000&wireFormat.maxInactivityDuration=300");
+        brokerService.start();
+
+        connectionUri = brokerService.getTransportConnectorByScheme("nio").getPublishableConnectString();
+    }
+
+    @After
+    public void tearDown() throws Exception {
+        brokerService.stop();
+        brokerService.waitUntilStopped();
+    }
+
+    @Test
+    public void testReadCounter() throws Exception {
+        LOG.info("Connecting to: {}", connectionUri);
+
+        byte[] payload = new byte[50 * 1024 * 1024];
+
+        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(connectionUri +
"?useInactivityMonitor=false");
+        final ActiveMQConnection connection = (ActiveMQConnection) factory.createConnection();
+        connection.start();
+
+        LOG.info("Connected to: {}", connection.getTransport());
+
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        Queue queue = session.createQueue("test");
+        MessageProducer producer = session.createProducer(queue);
+        BytesMessage message = session.createBytesMessage();
+
+        message.writeBytes(payload);
+
+        producer.setDeliveryMode(DeliveryMode.PERSISTENT);
+        producer.send(message);
+
+        connection.close();
+    }
+}


Mime
View raw message