activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From clebertsuco...@apache.org
Subject [1/2] activemq-artemis git commit: ARTEMIS-2055 Set Live LM to Null after route
Date Mon, 27 Aug 2018 12:16:00 GMT
Repository: activemq-artemis
Updated Branches:
  refs/heads/2.6.x 4a7b5252f -> dc2e4dd54


ARTEMIS-2055 Set Live LM to Null after route

The ServerSessionPacketHandler has a close() callback handler which will
delete any pending large messages.  However, there is a race where a
large message can be routed, then the close delete the associated large
message resulting in data loss.

(cherry picked from commit 490ef71e1dccc88e51d862cc51af468d37d416ce)


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/f2d26dc1
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/f2d26dc1
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/f2d26dc1

Branch: refs/heads/2.6.x
Commit: f2d26dc1be26ea1d7c7ac505403d26a045ca5da9
Parents: 4a7b525
Author: Martyn Taylor <mtaylor@redhat.com>
Authored: Mon Aug 27 09:18:02 2018 +0100
Committer: Clebert Suconic <clebertsuconic@apache.org>
Committed: Mon Aug 27 08:15:49 2018 -0400

----------------------------------------------------------------------
 .../core/ServerSessionPacketHandler.java        |   4 +-
 .../byteman/LargeMessageOnShutdownTest.java     | 137 +++++++++++++++++++
 2 files changed, 139 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/f2d26dc1/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java
index 36273f8..3b0433e 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java
@@ -974,9 +974,9 @@ public class ServerSessionPacketHandler implements ChannelHandler {
             currentLargeMessage.putLongProperty(Message.HDR_LARGE_BODY_SIZE, messageBodySize);
          }
 
-         session.doSend(session.getCurrentTransaction(), currentLargeMessage, null, false,
false);
-
+         LargeServerMessage message = currentLargeMessage;
          currentLargeMessage = null;
+         session.doSend(session.getCurrentTransaction(), message, null, false, false);
       }
    }
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/f2d26dc1/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/byteman/LargeMessageOnShutdownTest.java
----------------------------------------------------------------------
diff --git a/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/byteman/LargeMessageOnShutdownTest.java
b/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/byteman/LargeMessageOnShutdownTest.java
new file mode 100644
index 0000000..ebbd7de
--- /dev/null
+++ b/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/byteman/LargeMessageOnShutdownTest.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.extras.byteman;
+
+import java.io.ByteArrayInputStream;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.activemq.artemis.api.core.ActiveMQBuffers;
+import org.apache.activemq.artemis.api.core.RoutingType;
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.api.core.client.ActiveMQClient;
+import org.apache.activemq.artemis.api.core.client.ClientConsumer;
+import org.apache.activemq.artemis.api.core.client.ClientMessage;
+import org.apache.activemq.artemis.api.core.client.ClientProducer;
+import org.apache.activemq.artemis.api.core.client.ClientSession;
+import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
+import org.apache.activemq.artemis.tests.util.Wait;
+import org.jboss.byteman.contrib.bmunit.BMRule;
+import org.jboss.byteman.contrib.bmunit.BMRules;
+import org.jboss.byteman.contrib.bmunit.BMUnitRunner;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+
+@RunWith(BMUnitRunner.class)
+public class LargeMessageOnShutdownTest extends ActiveMQTestBase {
+
+   private static final SimpleString queueName = new SimpleString("largeMessageShutdownQueue");
+   private static ActiveMQServer server;
+
+   @Before
+   public void setUp() throws Exception {
+      super.setUp();
+
+      server = createServer(true, createDefaultNettyConfig());
+      startServer();
+      server.createQueue(queueName, RoutingType.MULTICAST, queueName, null, true, false);
+   }
+
+   @After
+   public void tearDown() throws Exception {
+      super.tearDown();
+      stopServer();
+   }
+
+   @Test
+   @BMRules(
+      rules = {
+         @BMRule(
+            name = "BlockOnFinalLargeMessagePacket",
+            targetClass = "org.apache.activemq.artemis.core.server.impl.ServerSessionImpl",
+            targetMethod = "doSend(Transaction,Message,SimpleString,boolean,boolean)",
+            targetLocation = "EXIT",
+            condition = "!flagged(\"testLargeMessageOnShutdown\")",
+            action =
+               "org.apache.activemq.artemis.tests.extras.byteman.LargeMessageOnShutdownTest.stopServer();"
+
+               "waitFor(\"testLargeMessageOnShutdown\");" +
+               "flag(\"testLargeMessageOnShutdown\")"
+         ),
+         @BMRule(
+            name = "ReleaseBlockOnSessionCleanup",
+            targetClass = "org.apache.activemq.artemis.core.protocol.core.ServerSessionPacketHandler",
+            targetMethod = "clearLargeMessage()",
+            targetLocation = "EXIT",
+            action = "signalWake(\"testLargeMessageOnShutdown\")"
+         )
+      }
+   )
+   public void testLargeMessageOnShutdown() throws Exception {
+
+      byte[] payload = new byte[ActiveMQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE * 2];
+
+      // Send Large Message
+      ClientSessionFactory csf1 = createSessionFactory(createNettyNonHALocator());
+      try {
+         ClientSession session1 = csf1.createSession();
+         ClientProducer producer1 = session1.createProducer(queueName);
+         ClientMessage message = session1.createMessage(true);
+
+         message.setBodyInputStream(new ByteArrayInputStream(payload));
+         producer1.send(message);
+      } catch (Exception e) {
+         // Expected due to shutdown.
+      }
+      finally {
+         csf1.close();
+      }
+
+      waitForStoppedServer();
+      startServer();
+
+      // Consume Large Message
+      ClientSessionFactory csf2 = createSessionFactory(createNettyNonHALocator());
+      try {
+         ClientSession session2 = csf2.createSession();
+         session2.start();
+         ClientConsumer consumer2 = session2.createConsumer(queueName);
+         ClientMessage rmessage = consumer2.receive(10000);
+
+         assertEquals(payload.length, rmessage.getBodyBuffer().readableBytes());
+         assertEqualsBuffers(payload.length, ActiveMQBuffers.wrappedBuffer(payload), rmessage.getBodyBuffer());
+      } finally {
+         csf2.close();
+      }
+   }
+
+   public static void stopServer() throws Exception {
+      server.stop();
+      waitForStoppedServer();
+   }
+
+   public static void startServer() throws Exception {
+      server.start();
+      server.waitForActivation(30, TimeUnit.SECONDS);
+   }
+
+   public static void waitForStoppedServer() throws Exception {
+      Wait.waitFor(() -> server.getState() == ActiveMQServer.SERVER_STATE.STOPPED);
+   }
+}


Mime
View raw message