activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jbert...@apache.org
Subject [1/4] activemq-6 git commit: ACTIVEMQ6-94: HornetQ Bridge does not handle large messages
Date Thu, 23 Apr 2015 20:03:12 GMT
Repository: activemq-6
Updated Branches:
  refs/heads/master 16137eabc -> f1faacb11


ACTIVEMQ6-94: HornetQ Bridge does not handle large messages

  When sending a large message that exceeds the size of
Integer.MAX_VALUE, the bridge will get negative chunk size during
fowarding. And the resend cache is not limited so there is a
potential that it may get OutOfMemory exception.


Project: http://git-wip-us.apache.org/repos/asf/activemq-6/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-6/commit/c1111cc1
Tree: http://git-wip-us.apache.org/repos/asf/activemq-6/tree/c1111cc1
Diff: http://git-wip-us.apache.org/repos/asf/activemq-6/diff/c1111cc1

Branch: refs/heads/master
Commit: c1111cc156684b15938ab3f8e34df9f4b64f57c4
Parents: 147a552
Author: Howard Gao <hgao@redhat.com>
Authored: Mon Mar 30 14:09:34 2015 +0800
Committer: Clebert Suconic <clebertsuconic@apache.org>
Committed: Thu Apr 23 15:02:01 2015 -0400

----------------------------------------------------------------------
 .../core/client/impl/ClientProducerImpl.java    |   6 +-
 .../activemq/core/protocol/core/Channel.java    |  11 +
 .../core/impl/ActiveMQSessionContext.java       |  30 +++
 .../core/protocol/core/impl/ChannelImpl.java    |  68 ++++--
 .../spi/core/remoting/SessionContext.java       |   2 +
 .../integration/cluster/bridge/BridgeTest.java  | 234 +++++++++++++++++++
 .../cluster/util/BackupSyncDelay.java           |   6 +
 7 files changed, 338 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-6/blob/c1111cc1/activemq-core-client/src/main/java/org/apache/activemq/core/client/impl/ClientProducerImpl.java
----------------------------------------------------------------------
diff --git a/activemq-core-client/src/main/java/org/apache/activemq/core/client/impl/ClientProducerImpl.java
b/activemq-core-client/src/main/java/org/apache/activemq/core/client/impl/ClientProducerImpl.java
index 7623cdb..3ff5fae 100644
--- a/activemq-core-client/src/main/java/org/apache/activemq/core/client/impl/ClientProducerImpl.java
+++ b/activemq-core-client/src/main/java/org/apache/activemq/core/client/impl/ClientProducerImpl.java
@@ -415,11 +415,11 @@ public class ClientProducerImpl implements ClientProducerInternal
       try
       {
 
-         for (int pos = 0; pos < bodySize; )
+         for (long pos = 0; pos < bodySize; )
          {
             final boolean lastChunk;
 
-            final int chunkLength = Math.min((int) (bodySize - pos), minLargeMessageSize);
+            final int chunkLength = (int)Math.min((bodySize - pos), (long)minLargeMessageSize);
 
             final ActiveMQBuffer bodyBuffer = ActiveMQBuffers.fixedBuffer(chunkLength);
 
@@ -430,7 +430,7 @@ public class ClientProducerImpl implements ClientProducerInternal
             lastChunk = pos >= bodySize;
             SendAcknowledgementHandler messageHandler = lastChunk ? handler : null;
 
-            int creditsUsed = sessionContext.sendLargeMessageChunk(msgI, -1, sendBlocking,
lastChunk, bodyBuffer.toByteBuffer().array(), messageHandler);
+            int creditsUsed = sessionContext.sendServerLargeMessageChunk(msgI, -1, sendBlocking,
lastChunk, bodyBuffer.toByteBuffer().array(), messageHandler);
 
             try
             {

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/c1111cc1/activemq-core-client/src/main/java/org/apache/activemq/core/protocol/core/Channel.java
----------------------------------------------------------------------
diff --git a/activemq-core-client/src/main/java/org/apache/activemq/core/protocol/core/Channel.java
b/activemq-core-client/src/main/java/org/apache/activemq/core/protocol/core/Channel.java
index 7546952..c876419 100644
--- a/activemq-core-client/src/main/java/org/apache/activemq/core/protocol/core/Channel.java
+++ b/activemq-core-client/src/main/java/org/apache/activemq/core/protocol/core/Channel.java
@@ -201,4 +201,15 @@ public interface Channel
     * @param transferring whether the channel is transferring
     */
    void setTransferring(boolean transferring);
+
+   /**
+    * for large message server send, each entry in resend cache will hold a reference to
+    * a chunk of bytes which can cause OOM if the cache quickly build up. This method
+    * make sure the resent cache size can't be more than one by blocking the call.
+    *
+    * @param timeout max waiting time for the resend cache
+    *
+    * @return true if the resend cache gets cleared
+    */
+   boolean largeServerCheck(long timeout);
 }

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/c1111cc1/activemq-core-client/src/main/java/org/apache/activemq/core/protocol/core/impl/ActiveMQSessionContext.java
----------------------------------------------------------------------
diff --git a/activemq-core-client/src/main/java/org/apache/activemq/core/protocol/core/impl/ActiveMQSessionContext.java
b/activemq-core-client/src/main/java/org/apache/activemq/core/protocol/core/impl/ActiveMQSessionContext.java
index 8df0011..a3d532d 100644
--- a/activemq-core-client/src/main/java/org/apache/activemq/core/protocol/core/impl/ActiveMQSessionContext.java
+++ b/activemq-core-client/src/main/java/org/apache/activemq/core/protocol/core/impl/ActiveMQSessionContext.java
@@ -109,6 +109,7 @@ import static org.apache.activemq.core.protocol.core.impl.PacketImpl.SESS_RECEIV
 
 public class ActiveMQSessionContext extends SessionContext
 {
+   private static final long MAX_RESENDCACHE_WAITING_TIME = 10000L;//10 sec
    private final Channel sessionChannel;
    private final int serverVersion;
    private int confirmationWindow;
@@ -428,6 +429,27 @@ public class ActiveMQSessionContext extends SessionContext
    {
       final boolean requiresResponse = lastChunk && sendBlocking;
       final SessionSendContinuationMessage chunkPacket =
+              new SessionSendContinuationMessage(msgI, chunk, !lastChunk,
+                      requiresResponse, messageBodySize, messageHandler);
+
+      if (requiresResponse)
+      {
+         // When sending it blocking, only the last chunk will be blocking.
+         sessionChannel.sendBlocking(chunkPacket, PacketImpl.NULL_RESPONSE);
+      }
+      else
+      {
+         sessionChannel.send(chunkPacket);
+      }
+
+      return chunkPacket.getPacketSize();
+   }
+
+   @Override
+   public int sendServerLargeMessageChunk(MessageInternal msgI, long messageBodySize, boolean
sendBlocking, boolean lastChunk, byte[] chunk, SendAcknowledgementHandler messageHandler)
throws ActiveMQException
+   {
+      final boolean requiresResponse = lastChunk && sendBlocking;
+      final SessionSendContinuationMessage chunkPacket =
          new SessionSendContinuationMessage(msgI, chunk, !lastChunk,
                                             requiresResponse, messageBodySize, messageHandler);
 
@@ -439,6 +461,14 @@ public class ActiveMQSessionContext extends SessionContext
       else
       {
          sessionChannel.send(chunkPacket);
+         if (!sessionChannel.largeServerCheck(MAX_RESENDCACHE_WAITING_TIME))
+         {
+            ActiveMQClientLogger.LOGGER.warn("Bridge detected that the target server is slow
to " +
+                    " send back chunk confirmations. It 's possible the bridge may take more
memory" +
+                    " during sending of a large message. It may be a temporary situation
if this warning" +
+                    " occasionally shows up.");
+         }
+
       }
 
       return chunkPacket.getPacketSize();

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/c1111cc1/activemq-core-client/src/main/java/org/apache/activemq/core/protocol/core/impl/ChannelImpl.java
----------------------------------------------------------------------
diff --git a/activemq-core-client/src/main/java/org/apache/activemq/core/protocol/core/impl/ChannelImpl.java
b/activemq-core-client/src/main/java/org/apache/activemq/core/protocol/core/impl/ChannelImpl.java
index a71aed5..25f6f81 100644
--- a/activemq-core-client/src/main/java/org/apache/activemq/core/protocol/core/impl/ChannelImpl.java
+++ b/activemq-core-client/src/main/java/org/apache/activemq/core/protocol/core/impl/ChannelImpl.java
@@ -226,6 +226,27 @@ public final class ChannelImpl implements Channel
       this.transferring = transferring;
    }
 
+   @Override
+   public boolean largeServerCheck(long timeout)
+   {
+      if (resendCache == null) return true;
+
+      synchronized (resendCache)
+      {
+         if (resendCache.size() >= 1)
+         {
+            try
+            {
+               resendCache.wait(timeout);
+            }
+            catch (InterruptedException e)
+            {
+            }
+         }
+      }
+      return resendCache.size() == 0;
+   }
+
    // This must never called by more than one thread concurrently
    public boolean send(final Packet packet, final boolean flush, final boolean batch)
    {
@@ -607,7 +628,12 @@ public final class ChannelImpl implements Channel
 
          firstStoredCommandID = 0;
 
-         resendCache.clear();
+         synchronized (resendCache)
+         {
+            resendCache.clear();
+            resendCache.notifyAll();
+         }
+
       }
    }
 
@@ -672,28 +698,38 @@ public final class ChannelImpl implements Channel
 
       int sizeToFree = 0;
 
-      for (int i = 0; i < numberToClear; i++)
+      try
       {
-         final Packet packet = resendCache.poll();
-
-         if (packet == null)
+         for (int i = 0; i < numberToClear; i++)
          {
-            if (lastReceivedCommandID > 0)
+            final Packet packet = resendCache.poll();
+
+            if (packet == null)
             {
-               ActiveMQClientLogger.LOGGER.cannotFindPacketToClear(lastReceivedCommandID,
firstStoredCommandID);
+               if (lastReceivedCommandID > 0)
+               {
+                  ActiveMQClientLogger.LOGGER.cannotFindPacketToClear(lastReceivedCommandID,
firstStoredCommandID);
+               }
+               firstStoredCommandID = lastReceivedCommandID + 1;
+               return;
             }
-            firstStoredCommandID = lastReceivedCommandID + 1;
-            return;
-         }
 
-         if (packet.getType() != PacketImpl.PACKETS_CONFIRMED)
-         {
-            sizeToFree += packet.getPacketSize();
-         }
+            if (packet.getType() != PacketImpl.PACKETS_CONFIRMED)
+            {
+               sizeToFree += packet.getPacketSize();
+            }
 
-         if (commandConfirmationHandler != null)
+            if (commandConfirmationHandler != null)
+            {
+               commandConfirmationHandler.commandConfirmed(packet);
+            }
+         }
+      }
+      finally
+      {
+         synchronized (resendCache)
          {
-            commandConfirmationHandler.commandConfirmed(packet);
+            resendCache.notifyAll();
          }
       }
 

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/c1111cc1/activemq-core-client/src/main/java/org/apache/activemq/spi/core/remoting/SessionContext.java
----------------------------------------------------------------------
diff --git a/activemq-core-client/src/main/java/org/apache/activemq/spi/core/remoting/SessionContext.java
b/activemq-core-client/src/main/java/org/apache/activemq/spi/core/remoting/SessionContext.java
index af0d4f3..c642837 100644
--- a/activemq-core-client/src/main/java/org/apache/activemq/spi/core/remoting/SessionContext.java
+++ b/activemq-core-client/src/main/java/org/apache/activemq/spi/core/remoting/SessionContext.java
@@ -149,6 +149,8 @@ public abstract class SessionContext
 
    public abstract int sendLargeMessageChunk(MessageInternal msgI, long messageBodySize,
boolean sendBlocking, boolean lastChunk, byte[] chunk, SendAcknowledgementHandler messageHandler)
throws ActiveMQException;
 
+   public abstract int sendServerLargeMessageChunk(MessageInternal msgI, long messageBodySize,
boolean sendBlocking, boolean lastChunk, byte[] chunk, SendAcknowledgementHandler messageHandler)
throws ActiveMQException;
+
 
    public abstract void setSendAcknowledgementHandler(final SendAcknowledgementHandler handler);
 

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/c1111cc1/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/cluster/bridge/BridgeTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/cluster/bridge/BridgeTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/cluster/bridge/BridgeTest.java
index f9ac024..ff4adf3 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/cluster/bridge/BridgeTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/cluster/bridge/BridgeTest.java
@@ -16,6 +16,12 @@
  */
 package org.apache.activemq.tests.integration.cluster.bridge;
 
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
@@ -1754,6 +1760,234 @@ public class BridgeTest extends ServiceTestBase
    }
 
    @Test
+   public void testBridgeWithVeryLargeMessage() throws Exception
+   {
+      ActiveMQServer server0 = null;
+      ActiveMQServer server1 = null;
+
+      final int PAGE_MAX = 1024 * 1024;
+
+      final int PAGE_SIZE = 10 * 1024;
+      ServerLocator locator = null;
+      try
+      {
+
+         Map<String, Object> server0Params = new HashMap<String, Object>();
+         server0 = createClusteredServerWithParams(isNetty(), 0, true, PAGE_SIZE, PAGE_MAX,
server0Params);
+
+         Map<String, Object> server1Params = new HashMap<String, Object>();
+         addTargetParameters(server1Params);
+         server1 = createClusteredServerWithParams(isNetty(), 1, true, server1Params);
+
+         final String testAddress = "testAddress";
+         final String queueName0 = "queue0";
+         final String forwardAddress = "forwardAddress";
+         final String queueName1 = "queue1";
+
+         Map<String, TransportConfiguration> connectors = new HashMap<String, TransportConfiguration>();
+         TransportConfiguration server0tc = new TransportConfiguration(getConnector(), server0Params);
+
+         TransportConfiguration server1tc = new TransportConfiguration(getConnector(), server1Params);
+         connectors.put(server1tc.getName(), server1tc);
+
+         server0.getConfiguration().setConnectorConfigurations(connectors);
+
+         ArrayList<String> staticConnectors = new ArrayList<String>();
+         staticConnectors.add(server1tc.getName());
+
+         int minLargeMessageSize = 50 * 1024 * 1024; //50M
+
+         BridgeConfiguration bridgeConfiguration = new BridgeConfiguration()
+                 .setName("bridge1")
+                 .setQueueName(queueName0)
+                 .setForwardingAddress(forwardAddress)
+                 .setRetryInterval(1000)
+                 .setReconnectAttemptsOnSameNode(-1)
+                 .setUseDuplicateDetection(false)
+                 .setConfirmationWindowSize(1024)
+                 .setStaticConnectors(staticConnectors)
+                 .setMinLargeMessageSize(minLargeMessageSize);
+
+         List<BridgeConfiguration> bridgeConfigs = new ArrayList<BridgeConfiguration>();
+         bridgeConfigs.add(bridgeConfiguration);
+         server0.getConfiguration().setBridgeConfigurations(bridgeConfigs);
+
+         CoreQueueConfiguration queueConfig0 = new CoreQueueConfiguration()
+                 .setAddress(testAddress)
+                 .setName(queueName0);
+         List<CoreQueueConfiguration> queueConfigs0 = new ArrayList<CoreQueueConfiguration>();
+         queueConfigs0.add(queueConfig0);
+         server0.getConfiguration().setQueueConfigurations(queueConfigs0);
+
+
+         CoreQueueConfiguration queueConfig1 = new CoreQueueConfiguration()
+                 .setAddress(forwardAddress)
+                 .setName(queueName1);
+         List<CoreQueueConfiguration> queueConfigs1 = new ArrayList<CoreQueueConfiguration>();
+         queueConfigs1.add(queueConfig1);
+         server1.getConfiguration().setQueueConfigurations(queueConfigs1);
+
+         server1.start();
+         server0.start();
+
+         locator = addServerLocator(ActiveMQClient.createServerLocatorWithoutHA(server0tc,
server1tc));
+
+         ClientSessionFactory sf0 = locator.createSessionFactory(server0tc);
+
+         ClientSessionFactory sf1 = locator.createSessionFactory(server1tc);
+
+         ClientSession session0 = sf0.createSession(false, true, true);
+
+         ClientSession session1 = sf1.createSession(false, true, true);
+
+         ClientProducer producer0 = session0.createProducer(new SimpleString(testAddress));
+
+         ClientConsumer consumer1 = session1.createConsumer(queueName1);
+
+         session1.start();
+
+         //create a large message bigger than Integer.MAX_VALUE
+         final long largeMessageSize = 3L * 1024L * 1024L * 1024L;
+
+         File destDir = createDestDir("testBridgeWithVeryLargeMessage");
+         ClientMessage largeMessage = createLargeMessage(session0, largeMessageSize, destDir);
+
+         producer0.send(largeMessage);
+
+         session0.commit();
+
+         //check target queue for large message arriving
+         ClientSession.QueueQuery query = session1.queueQuery(new SimpleString(queueName1));
+         long messageCount = query.getMessageCount();
+         int count = 0;
+         //wait for 300 sec max
+         while (messageCount == 0 && count < 300)
+         {
+            count++;
+            Thread.sleep(1000);
+            query = session1.queueQuery(new SimpleString(queueName1));
+            messageCount = query.getMessageCount();
+         }
+
+         if (messageCount == 0)
+         {
+            fail("large message didn't arrived after 5 min!");
+         }
+
+         //receive the message
+         ClientMessage message = consumer1.receive(5000);
+         message.acknowledge();
+
+         File outputFile = new File(destDir, "huge_message_received.dat");
+
+         System.out.println("-----message save to: " + outputFile.getAbsolutePath());
+         FileOutputStream fileOutputStream = new FileOutputStream(outputFile);
+
+         BufferedOutputStream bufferedOutput = new BufferedOutputStream(fileOutputStream);
+
+         message.setOutputStream(bufferedOutput);
+
+         if (!message.waitOutputStreamCompletion(5 * 60 * 1000))
+         {
+            fail("message didn't get received to disk in 5 min. Is the machine slow?");
+         }
+         session1.commit();
+
+         Assert.assertNull(consumer1.receiveImmediate());
+
+         session0.close();
+
+         session1.close();
+
+         sf0.close();
+
+         sf1.close();
+
+      }
+      finally
+      {
+         if (locator != null)
+         {
+            locator.close();
+         }
+         try
+         {
+            server0.stop();
+         }
+         catch (Throwable ignored)
+         {
+         }
+
+         try
+         {
+            server1.stop();
+         }
+         catch (Throwable ignored)
+         {
+         }
+      }
+
+      assertEquals(0, loadQueues(server0).size());
+   }
+
+   private File createDestDir(String dirName)
+   {
+      File clientDir = new File(getClientLargeMessagesDir());
+      if (!clientDir.exists())
+      {
+         if (!clientDir.mkdirs())
+         {
+            throw new IllegalStateException("Can't create dir " + clientDir.getAbsolutePath());
+         }
+      }
+
+      File destDir = new File(clientDir, dirName);
+      if (!destDir.mkdir())
+      {
+         throw new IllegalStateException("Can't create dir " + destDir.getAbsolutePath());
+      }
+      return destDir;
+   }
+
+
+   private ClientMessage createLargeMessage(ClientSession session, long largeMessageSize,
File destDir) throws Exception
+   {
+
+      File fileInput = new File(destDir, "huge_message_to_send.dat");
+
+      createFile(fileInput, largeMessageSize);
+
+      System.out.println("File created at: " + fileInput.getAbsolutePath());
+
+      ClientMessage message = session.createMessage(ClientMessage.BYTES_TYPE, true);
+
+      FileInputStream fileInputStream = new FileInputStream(fileInput);
+      BufferedInputStream bufferedInput = new BufferedInputStream(fileInputStream);
+
+      message.setBodyInputStream(bufferedInput);
+
+      return message;
+   }
+
+   private static void createFile(final File file, final long fileSize) throws IOException
+   {
+      if (file.exists())
+      {
+         System.out.println("---file already there " + file.length());
+         return;
+      }
+      FileOutputStream fileOut = new FileOutputStream(file);
+      BufferedOutputStream buffOut = new BufferedOutputStream(fileOut);
+      byte[] outBuffer = new byte[1024 * 1024];
+      System.out.println(" --- creating file, size: " + fileSize);
+      for (long i = 0; i < fileSize; i += outBuffer.length)
+      {
+         buffOut.write(outBuffer);
+      }
+      buffOut.close();
+   }
+
+   @Test
    public void testNullForwardingAddress() throws Exception
    {
       Map<String, Object> server0Params = new HashMap<String, Object>();

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/c1111cc1/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/cluster/util/BackupSyncDelay.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/cluster/util/BackupSyncDelay.java
b/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/cluster/util/BackupSyncDelay.java
index 710c4d3..95506d5 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/cluster/util/BackupSyncDelay.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/cluster/util/BackupSyncDelay.java
@@ -376,6 +376,12 @@ public class BackupSyncDelay implements Interceptor
       }
 
       @Override
+      public boolean largeServerCheck(long timeout)
+      {
+         return true;
+      }
+
+      @Override
       public boolean supports(byte packetID)
       {
          return true;


Mime
View raw message