activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From clebertsuco...@apache.org
Subject [1/2] activemq-artemis git commit: ARTEMIS-2159 Fixing OpenWire Blocker Producer
Date Thu, 01 Nov 2018 21:12:04 GMT
Repository: activemq-artemis
Updated Branches:
  refs/heads/2.6.x b427087a7 -> 09d7cb460


ARTEMIS-2159 Fixing OpenWire Blocker Producer

Previous change on Flow control in OpenWire broke Blocked cases
This is a better fix.

(cherry picked from commit c62146802ed1a8a3128e1e3b847a02eac247aa01)


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/30bb9dd0
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/30bb9dd0
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/30bb9dd0

Branch: refs/heads/2.6.x
Commit: 30bb9dd0f4c884750f2678d596010355191f5865
Parents: b427087
Author: Clebert Suconic <clebertsuconic@apache.org>
Authored: Thu Nov 1 15:33:03 2018 -0400
Committer: Clebert Suconic <clebertsuconic@apache.org>
Committed: Thu Nov 1 15:46:50 2018 -0400

----------------------------------------------------------------------
 .../core/protocol/openwire/amq/AMQSession.java  | 100 +++++++++----------
 .../artemis/core/paging/PagingStore.java        |   2 +
 .../core/paging/impl/PagingStoreImpl.java       |   8 +-
 .../storage/PersistMultiThreadTest.java         |   5 +
 4 files changed, 63 insertions(+), 52 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/30bb9dd0/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java
b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java
index a107ba7..0429297 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java
@@ -443,63 +443,63 @@ public class AMQSession implements SessionCallback {
                                         final AtomicInteger count,
                                         final org.apache.activemq.artemis.api.core.Message
coreMsg,
                                         final SimpleString address) throws ResourceAllocationException
{
-      if (!store.checkMemory(null)) {
-         this.connection.getContext().setDontSendReponse(false);
-         connection.enableTtl();
-         throw new ResourceAllocationException("Queue is full " + address);
-      }
+      if (!store.checkMemory(false, () -> {
+         Exception exceptionToSend = null;
 
-      Exception exceptionToSend = null;
-
-      try {
-         getCoreSession().send(coreMsg, false, dest.isTemporary());
-      } catch (Exception e) {
-         logger.warn(e.getMessage(), e);
-         exceptionToSend = e;
-      }
-      connection.enableTtl();
-      if (count == null || count.decrementAndGet() == 0) {
-         if (exceptionToSend != null) {
-            this.connection.getContext().setDontSendReponse(false);
-            connection.sendException(exceptionToSend);
-         } else {
-            server.getStorageManager().afterCompleteOperations(new IOCallback() {
-               @Override
-               public void done() {
-                  if (sendProducerAck) {
-                     try {
-                        ProducerAck ack = new ProducerAck(producerInfo.getProducerId(), messageSend.getSize());
-                        connection.dispatchAsync(ack);
-                     } catch (Exception e) {
+         try {
+            getCoreSession().send(coreMsg, false, dest.isTemporary());
+         } catch (Exception e) {
+            logger.warn(e.getMessage(), e);
+            exceptionToSend = e;
+         }
+         connection.enableTtl();
+         if (count == null || count.decrementAndGet() == 0) {
+            if (exceptionToSend != null) {
+               this.connection.getContext().setDontSendReponse(false);
+               connection.sendException(exceptionToSend);
+            } else {
+               server.getStorageManager().afterCompleteOperations(new IOCallback() {
+                  @Override
+                  public void done() {
+                     if (sendProducerAck) {
+                        try {
+                           ProducerAck ack = new ProducerAck(producerInfo.getProducerId(),
messageSend.getSize());
+                           connection.dispatchAsync(ack);
+                        } catch (Exception e) {
+                           connection.getContext().setDontSendReponse(false);
+                           ActiveMQServerLogger.LOGGER.warn(e.getMessage(), e);
+                           connection.sendException(e);
+                        }
+                     } else {
                         connection.getContext().setDontSendReponse(false);
-                        ActiveMQServerLogger.LOGGER.warn(e.getMessage(), e);
-                        connection.sendException(e);
-                     }
-                  } else {
-                     connection.getContext().setDontSendReponse(false);
-                     try {
-                        Response response = new Response();
-                        response.setCorrelationId(messageSend.getCommandId());
-                        connection.dispatchAsync(response);
-                     } catch (Exception e) {
-                        ActiveMQServerLogger.LOGGER.warn(e.getMessage(), e);
-                        connection.sendException(e);
+                        try {
+                           Response response = new Response();
+                           response.setCorrelationId(messageSend.getCommandId());
+                           connection.dispatchAsync(response);
+                        } catch (Exception e) {
+                           ActiveMQServerLogger.LOGGER.warn(e.getMessage(), e);
+                           connection.sendException(e);
+                        }
                      }
                   }
-               }
 
-               @Override
-               public void onError(int errorCode, String errorMessage) {
-                  try {
-                     final IOException e = new IOException(errorMessage);
-                     ActiveMQServerLogger.LOGGER.warn(errorMessage);
-                     connection.serviceException(e);
-                  } catch (Exception ex) {
-                     ActiveMQServerLogger.LOGGER.debug(ex);
+                  @Override
+                  public void onError(int errorCode, String errorMessage) {
+                     try {
+                        final IOException e = new IOException(errorMessage);
+                        ActiveMQServerLogger.LOGGER.warn(errorMessage);
+                        connection.serviceException(e);
+                     } catch (Exception ex) {
+                        ActiveMQServerLogger.LOGGER.debug(ex);
+                     }
                   }
-               }
-            });
+               });
+            }
          }
+      })) {
+         this.connection.getContext().setDontSendReponse(false);
+         connection.enableTtl();
+         throw new ResourceAllocationException("Queue is full " + address);
       }
    }
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/30bb9dd0/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagingStore.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagingStore.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagingStore.java
index 38c59dc..5a290ba 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagingStore.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagingStore.java
@@ -127,6 +127,8 @@ public interface PagingStore extends ActiveMQComponent, RefCountMessageListener
 
    boolean checkMemory(Runnable runnable);
 
+   boolean checkMemory(boolean runOnFailure, Runnable runnable);
+
    boolean isFull();
 
    boolean isRejectingMessages();

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/30bb9dd0/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java
index 908ab9f..89cd5b6 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java
@@ -652,13 +652,17 @@ public class PagingStoreImpl implements PagingStore {
       }
    }
 
-
    @Override
    public boolean checkMemory(final Runnable runWhenAvailable) {
+      return checkMemory(true, runWhenAvailable);
+   }
+
+   @Override
+   public boolean checkMemory(boolean runOnFailure, final Runnable runWhenAvailable) {
 
       if (addressFullMessagePolicy == AddressFullMessagePolicy.FAIL && (maxSize !=
-1 || usingGlobalMaxSize || pagingManager.isDiskFull())) {
          if (isFull()) {
-            if (runWhenAvailable != null) {
+            if (runOnFailure && runWhenAvailable != null) {
                onMemoryFreedRunnables.add(AtomicRunnable.checkAtomic(runWhenAvailable));
             }
             return false;

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/30bb9dd0/tests/performance-tests/src/test/java/org/apache/activemq/artemis/tests/performance/storage/PersistMultiThreadTest.java
----------------------------------------------------------------------
diff --git a/tests/performance-tests/src/test/java/org/apache/activemq/artemis/tests/performance/storage/PersistMultiThreadTest.java
b/tests/performance-tests/src/test/java/org/apache/activemq/artemis/tests/performance/storage/PersistMultiThreadTest.java
index 813765d..420c278 100644
--- a/tests/performance-tests/src/test/java/org/apache/activemq/artemis/tests/performance/storage/PersistMultiThreadTest.java
+++ b/tests/performance-tests/src/test/java/org/apache/activemq/artemis/tests/performance/storage/PersistMultiThreadTest.java
@@ -411,6 +411,11 @@ public class PersistMultiThreadTest extends ActiveMQTestBase {
       }
 
       @Override
+      public boolean checkMemory(boolean runOnFailure, Runnable runnable) {
+         return false;
+      }
+
+      @Override
       public boolean checkMemory(Runnable runnable) {
          return false;
       }


Mime
View raw message