activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From andytay...@apache.org
Subject [1/2] activemq-artemis git commit: ARTEMIS-262 Fix Bridge OOM exception
Date Mon, 19 Oct 2015 13:03:49 GMT
Repository: activemq-artemis
Updated Branches:
  refs/heads/master 360338a36 -> b0b567bc8


ARTEMIS-262 Fix Bridge OOM exception

Netty 4.x uses pooled buffers.  These buffers can run out of memory when
transferring large amounts of data over connection.  This was causing an
OutOfMemory exception to be thrown on the CoreBridge when tranferring
large messages.  Netty provides a callback handler to notify listeners
when a Connection is writable.  This patch adds the ability to register
connection writable listeners to the Netty connection and registers the
relevant callback from the Bridge to avoid writing when the buffers are
full.


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/98c2aa43
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/98c2aa43
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/98c2aa43

Branch: refs/heads/master
Commit: 98c2aa433f3373533c39eb12b6446bfc8a556aff
Parents: 360338a
Author: Andy Taylor <andy.tayls67@gmail.com>
Authored: Thu Oct 15 14:39:51 2015 +0100
Committer: Martyn Taylor <mtaylor@redhat.com>
Committed: Mon Oct 19 10:32:59 2015 +0100

----------------------------------------------------------------------
 .../client/impl/ClientSessionFactoryImpl.java   |  24 ++++
 .../impl/ClientSessionFactoryInternal.java      |   3 +
 .../core/client/impl/ClientSessionImpl.java     |   6 +
 .../core/client/impl/ClientSessionInternal.java |   3 +
 .../core/client/impl/DelegatingSession.java     |   6 +
 .../remoting/impl/netty/NettyConnector.java     |   1 +
 .../core/server/cluster/impl/BridgeImpl.java    |  34 +++++-
 .../integration/cluster/bridge/BridgeTest.java  | 111 ++++++++++++++++++-
 8 files changed, 185 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/98c2aa43/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionFactoryImpl.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionFactoryImpl.java
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionFactoryImpl.java
index ed588ef..6e1ccbd 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionFactoryImpl.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionFactoryImpl.java
@@ -147,6 +147,13 @@ public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal,
C
 
    private String liveNodeID;
 
+   private Set<ConnectionLifeCycleListener> lifeCycleListeners;
+
+   // We need to cache this value here since some listeners may be registered after connectionReadyForWrites
was called.
+   private boolean connectionReadyForWrites;
+
+   private final Object connectionReadyLock = new Object();
+
    public ClientSessionFactoryImpl(final ServerLocatorInternal serverLocator,
                                    final TransportConfiguration connectorConfig,
                                    final long callTimeout,
@@ -214,6 +221,9 @@ public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal,
C
 
       confirmationWindowWarning = new ConfirmationWindowWarning(serverLocator.getConfirmationWindowSize()
< 0);
 
+      lifeCycleListeners = new HashSet<ConnectionLifeCycleListener>();
+
+      connectionReadyForWrites = true;
    }
 
    public void disableFinalizeCheck() {
@@ -225,6 +235,14 @@ public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal,
C
       return newFailoverLock;
    }
 
+   @Override
+   public void addLifeCycleListener(ConnectionLifeCycleListener lifeCycleListener) {
+      synchronized (connectionReadyLock) {
+         lifeCycleListener.connectionReadyForWrites(connection.getTransportConnection().getID(),
connectionReadyForWrites);
+         lifeCycleListeners.add(lifeCycleListener);
+      }
+   }
+
    public void connect(final int initialConnectAttempts,
                        final boolean failoverOnInitialConnection) throws ActiveMQException
{
       // Get the connection
@@ -356,6 +374,12 @@ public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal,
C
    }
 
    public void connectionReadyForWrites(final Object connectionID, final boolean ready) {
+      synchronized (connectionReadyLock) {
+         connectionReadyForWrites = ready;
+         for (ConnectionLifeCycleListener lifeCycleListener : lifeCycleListeners) {
+            lifeCycleListener.connectionReadyForWrites(connectionID, ready);
+         }
+      }
    }
 
    public synchronized int numConnections() {

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/98c2aa43/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionFactoryInternal.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionFactoryInternal.java
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionFactoryInternal.java
index ba2dab7..3671618 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionFactoryInternal.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionFactoryInternal.java
@@ -22,6 +22,7 @@ import org.apache.activemq.artemis.api.core.ActiveMQException;
 import org.apache.activemq.artemis.api.core.TransportConfiguration;
 import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
 import org.apache.activemq.artemis.api.core.client.SessionFailureListener;
+import org.apache.activemq.artemis.spi.core.remoting.ConnectionLifeCycleListener;
 import org.apache.activemq.artemis.utils.ConfirmationWindowWarning;
 
 public interface ClientSessionFactoryInternal extends ClientSessionFactory {
@@ -57,4 +58,6 @@ public interface ClientSessionFactoryInternal extends ClientSessionFactory
{
    ConfirmationWindowWarning getConfirmationWindowWarning();
 
    Lock lockFailover();
+
+   void addLifeCycleListener(ConnectionLifeCycleListener lifeCycleListener);
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/98c2aa43/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionImpl.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionImpl.java
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionImpl.java
index 5b028b2..acb0725 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionImpl.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionImpl.java
@@ -44,6 +44,7 @@ import org.apache.activemq.artemis.core.client.ActiveMQClientLogger;
 import org.apache.activemq.artemis.core.client.ActiveMQClientMessageBundle;
 import org.apache.activemq.artemis.core.remoting.FailureListener;
 import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
+import org.apache.activemq.artemis.spi.core.remoting.ConnectionLifeCycleListener;
 import org.apache.activemq.artemis.spi.core.remoting.ConsumerContext;
 import org.apache.activemq.artemis.spi.core.remoting.SessionContext;
 import org.apache.activemq.artemis.utils.ConfirmationWindowWarning;
@@ -631,6 +632,11 @@ public final class ClientSessionImpl implements ClientSessionInternal,
FailureLi
       return sessionFactory.getLiveNodeId();
    }
 
+   @Override
+   public void addLifeCycleListener(ConnectionLifeCycleListener lifeCycleListener) {
+      sessionFactory.addLifeCycleListener(lifeCycleListener);
+   }
+
    // ClientSessionInternal implementation
    // ------------------------------------------------------------
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/98c2aa43/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionInternal.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionInternal.java
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionInternal.java
index 3a85245..06d6024 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionInternal.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionInternal.java
@@ -23,6 +23,7 @@ import org.apache.activemq.artemis.api.core.client.ClientConsumer;
 import org.apache.activemq.artemis.api.core.client.ClientSession;
 import org.apache.activemq.artemis.api.core.client.SendAcknowledgementHandler;
 import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
+import org.apache.activemq.artemis.spi.core.remoting.ConnectionLifeCycleListener;
 import org.apache.activemq.artemis.spi.core.remoting.ConsumerContext;
 
 public interface ClientSessionInternal extends ClientSession {
@@ -122,4 +123,6 @@ public interface ClientSessionInternal extends ClientSession {
    boolean isClosing();
 
    String getNodeId();
+
+   void addLifeCycleListener(ConnectionLifeCycleListener lifeCycleListener);
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/98c2aa43/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/DelegatingSession.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/DelegatingSession.java
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/DelegatingSession.java
index a4296bc..2d6f4a4 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/DelegatingSession.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/DelegatingSession.java
@@ -33,6 +33,7 @@ import org.apache.activemq.artemis.api.core.client.SendAcknowledgementHandler;
 import org.apache.activemq.artemis.api.core.client.SessionFailureListener;
 import org.apache.activemq.artemis.core.client.ActiveMQClientLogger;
 import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
+import org.apache.activemq.artemis.spi.core.remoting.ConnectionLifeCycleListener;
 import org.apache.activemq.artemis.spi.core.remoting.ConsumerContext;
 import org.apache.activemq.artemis.utils.ConcurrentHashSet;
 
@@ -97,6 +98,11 @@ public class DelegatingSession implements ClientSessionInternal {
       session.acknowledge(consumer, message);
    }
 
+   @Override
+   public void addLifeCycleListener(ConnectionLifeCycleListener lifeCycleListener) {
+      session.addLifeCycleListener(lifeCycleListener);
+   }
+
    public void individualAcknowledge(final ClientConsumer consumer, final Message message)
throws ActiveMQException {
       session.individualAcknowledge(consumer, message);
    }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/98c2aa43/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnector.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnector.java
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnector.java
index 7dad143..decac89 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnector.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnector.java
@@ -924,6 +924,7 @@ public class NettyConnector extends AbstractConnector {
       }
 
       public void connectionReadyForWrites(Object connectionID, boolean ready) {
+         listener.connectionReadyForWrites(connectionID, ready);
       }
 
    }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/98c2aa43/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/BridgeImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/BridgeImpl.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/BridgeImpl.java
index eb99fcc..a8b048c 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/BridgeImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/BridgeImpl.java
@@ -45,6 +45,7 @@ import org.apache.activemq.artemis.core.client.impl.ServerLocatorInternal;
 import org.apache.activemq.artemis.core.filter.Filter;
 import org.apache.activemq.artemis.core.message.impl.MessageImpl;
 import org.apache.activemq.artemis.core.persistence.StorageManager;
+import org.apache.activemq.artemis.core.server.ActiveMQComponent;
 import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
 import org.apache.activemq.artemis.core.server.HandleStatus;
 import org.apache.activemq.artemis.core.server.LargeServerMessage;
@@ -57,6 +58,8 @@ import org.apache.activemq.artemis.core.server.impl.QueueImpl;
 import org.apache.activemq.artemis.core.server.management.Notification;
 import org.apache.activemq.artemis.core.server.management.NotificationService;
 import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
+import org.apache.activemq.artemis.spi.core.remoting.Connection;
+import org.apache.activemq.artemis.spi.core.remoting.ConnectionLifeCycleListener;
 import org.apache.activemq.artemis.utils.FutureLatch;
 import org.apache.activemq.artemis.utils.ReusableLatch;
 import org.apache.activemq.artemis.utils.TypedProperties;
@@ -66,7 +69,7 @@ import org.apache.activemq.artemis.utils.UUID;
  * A Core BridgeImpl
  */
 
-public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowledgementHandler
{
+public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowledgementHandler,
ConnectionLifeCycleListener {
    // Constants -----------------------------------------------------
 
    private static final boolean isTrace = ActiveMQServerLogger.LOGGER.isTraceEnabled();
@@ -132,6 +135,8 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled
 
    private volatile ClientProducer producer;
 
+   private volatile boolean connectionWritable = false;
+
    private volatile boolean started;
 
    private volatile boolean stopping = false;
@@ -481,7 +486,7 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled
       }
 
       synchronized (this) {
-         if (!active) {
+         if (!active || !connectionWritable) {
             if (ActiveMQServerLogger.LOGGER.isDebugEnabled()) {
                ActiveMQServerLogger.LOGGER.debug(this + "::Ignoring reference on bridge as
it is set to inactive ref=" + ref);
             }
@@ -532,6 +537,29 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled
       }
    }
 
+   @Override
+   public void connectionCreated(ActiveMQComponent component, Connection connection, String
protocol) {
+
+   }
+
+   @Override
+   public void connectionDestroyed(Object connectionID) {
+
+   }
+
+   @Override
+   public void connectionException(Object connectionID, ActiveMQException me) {
+
+   }
+
+   @Override
+   public void connectionReadyForWrites(Object connectionID, boolean ready) {
+      connectionWritable = ready;
+      if (connectionWritable) {
+         queue.deliverAsync();
+      }
+   }
+
    // FailureListener implementation --------------------------------
 
    public void proceedDeliver(MessageReference ref) {
@@ -840,6 +868,8 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled
 
             session.setSendAcknowledgementHandler(BridgeImpl.this);
 
+            session.addLifeCycleListener(BridgeImpl.this);
+
             afterConnect();
 
             active = true;

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/98c2aa43/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/bridge/BridgeTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/bridge/BridgeTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/bridge/BridgeTest.java
index f7592be..37338aa 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/bridge/BridgeTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/bridge/BridgeTest.java
@@ -137,6 +137,115 @@ public class BridgeTest extends ActiveMQTestBase {
       internaltestSimpleBridge(true, true);
    }
 
+   @Test
+   public void testLargeMessageBridge() throws Exception {
+      long time = System.currentTimeMillis();
+      Map<String, Object> server0Params = new HashMap<String, Object>();
+      server0 = createClusteredServerWithParams(isNetty(), 0, true, server0Params);
+
+      Map<String, Object> server1Params = new HashMap<String, Object>();
+      addTargetParameters(server1Params);
+      server1 = createClusteredServerWithParams(isNetty(), 1, true, server1Params);
+
+      final String testAddress = "testAddress";
+      final String queueName0 = "queue0";
+      final String forwardAddress = "forwardAddress";
+      final String queueName1 = "queue1";
+
+      // Map<String, TransportConfiguration> connectors = new HashMap<String, TransportConfiguration>();
+      TransportConfiguration server0tc = new TransportConfiguration(getConnector(), server0Params);
+
+      TransportConfiguration server1tc = new TransportConfiguration(getConnector(), server1Params);
+
+      HashMap<String, TransportConfiguration> connectors = new HashMap<String, TransportConfiguration>();
+      connectors.put(server1tc.getName(), server1tc);
+      server0.getConfiguration().setConnectorConfigurations(connectors);
+
+//      final int messageSize = 1024 * 1024 * 5;
+      final int messageSize = 1024 * 10;
+
+      final int numMessages = 100000;
+
+      ArrayList<String> connectorConfig = new ArrayList<String>();
+      connectorConfig.add(server1tc.getName());
+      BridgeConfiguration bridgeConfiguration = new BridgeConfiguration().setName("bridge1").setQueueName(queueName0).setForwardingAddress(forwardAddress).setRetryInterval(1000).setReconnectAttemptsOnSameNode(-1).setUseDuplicateDetection(false).setConfirmationWindowSize(numMessages
* messageSize / 2).setStaticConnectors(connectorConfig);
+
+      List<BridgeConfiguration> bridgeConfigs = new ArrayList<BridgeConfiguration>();
+      bridgeConfigs.add(bridgeConfiguration);
+      server0.getConfiguration().setBridgeConfigurations(bridgeConfigs);
+
+      CoreQueueConfiguration queueConfig0 = new CoreQueueConfiguration().setAddress(testAddress).setName(queueName0);
+      List<CoreQueueConfiguration> queueConfigs0 = new ArrayList<CoreQueueConfiguration>();
+      queueConfigs0.add(queueConfig0);
+      server0.getConfiguration().setQueueConfigurations(queueConfigs0);
+
+      CoreQueueConfiguration queueConfig1 = new CoreQueueConfiguration().setAddress(forwardAddress).setName(queueName1);
+      List<CoreQueueConfiguration> queueConfigs1 = new ArrayList<CoreQueueConfiguration>();
+      queueConfigs1.add(queueConfig1);
+      server1.getConfiguration().setQueueConfigurations(queueConfigs1);
+
+      server1.start();
+      server0.start();
+      locator = addServerLocator(ActiveMQClient.createServerLocatorWithoutHA(server0tc, server1tc));
+      ClientSessionFactory sf0 = addSessionFactory(locator.createSessionFactory(server0tc));
+
+      ClientSessionFactory sf1 = addSessionFactory(locator.createSessionFactory(server1tc));
+
+      ClientSession session0 = sf0.createSession(false, true, true);
+
+      ClientSession session1 = sf1.createSession(false, true, true);
+
+      ClientProducer producer0 = session0.createProducer(new SimpleString(testAddress));
+
+      ClientConsumer consumer1 = session1.createConsumer(queueName1);
+
+      session1.start();
+
+      final byte[] bytes = new byte[messageSize];
+
+      final SimpleString propKey = new SimpleString("testkey");
+
+      for (int i = 0; i < numMessages; i++) {
+         ClientMessage message = session0.createMessage(true);
+
+         message.putIntProperty(propKey, i);
+
+         message.getBodyBuffer().writeBytes(bytes);
+
+         producer0.send(message);
+      }
+
+      for (int i = 0; i < numMessages; i++) {
+         ClientMessage message = consumer1.receive(500000);
+
+         Assert.assertNotNull(message);
+
+         Assert.assertEquals(i, message.getObjectProperty(propKey));
+
+         readLargeMessages(message, 10);
+
+         message.acknowledge();
+      }
+
+      Assert.assertNull(consumer1.receiveImmediate());
+
+      session0.close();
+
+      session1.close();
+
+      sf0.close();
+
+      sf1.close();
+
+      closeFields();
+      if (server0.getConfiguration().isPersistenceEnabled()) {
+         assertEquals(0, loadQueues(server0).size());
+      }
+      long timeTaken = System.currentTimeMillis() - time;
+      System.out.println(timeTaken + "ms");
+   }
+
+
    public void internaltestSimpleBridge(final boolean largeMessage, final boolean useFiles)
throws Exception {
       Map<String, Object> server0Params = new HashMap<String, Object>();
       server0 = createClusteredServerWithParams(isNetty(), 0, useFiles, server0Params);
@@ -161,7 +270,7 @@ public class BridgeTest extends ActiveMQTestBase {
 
       final int messageSize = 1024;
 
-      final int numMessages = 10;
+      final int numMessages = 10000;
 
       ArrayList<String> connectorConfig = new ArrayList<String>();
       connectorConfig.add(server1tc.getName());


Mime
View raw message