activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From clebertsuco...@apache.org
Subject [2/4] activemq-artemis git commit: ARTEMIS-332 - Duplicate delivery over Bridges under OME scenarios, paging and other failures
Date Tue, 05 Jan 2016 14:15:10 GMT
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/96849a42/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
index b0a5a7e..ae93a97 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
@@ -415,7 +415,7 @@ public class ActiveMQServerImpl implements ActiveMQServer {
             backupActivationThread.start();
          }
          else {
-            ActiveMQServerLogger.LOGGER.serverStarted(getVersion().getFullVersion(), configuration.getName(),
nodeManager.getNodeId(),  identity != null ? identity : "" );
+            ActiveMQServerLogger.LOGGER.serverStarted(getVersion().getFullVersion(), configuration.getName(),
nodeManager.getNodeId(), identity != null ? identity : "");
          }
          // start connector service
          connectorsService = new ConnectorsService(configuration, storageManager, scheduledPool,
postOffice, serviceRegistry);
@@ -508,18 +508,19 @@ public class ActiveMQServerImpl implements ActiveMQServer {
     * Stops the server in a different thread.
     */
    public final void stopTheServer(final boolean criticalIOError) {
-      ExecutorService executor = Executors.newSingleThreadExecutor();
-      executor.submit(new Runnable() {
+      Thread thread = new Thread() {
          @Override
          public void run() {
             try {
-               stop(false, criticalIOError, false);
+               ActiveMQServerImpl.this.stop(false, criticalIOError, false);
             }
             catch (Exception e) {
                ActiveMQServerLogger.LOGGER.errorStoppingServer(e);
             }
          }
-      });
+      };
+
+      thread.start();
    }
 
    @Override
@@ -722,7 +723,6 @@ public class ActiveMQServerImpl implements ActiveMQServer {
          }
       }
 
-
       pagingManager = null;
       securityStore = null;
       resourceManager = null;
@@ -1016,7 +1016,7 @@ public class ActiveMQServerImpl implements ActiveMQServer {
       if (securityStore != null) {
          X509Certificate[] certificates = null;
          if (connection.getTransportConnection() instanceof NettyConnection) {
-            certificates = CertificateUtil.getCertsFromChannel(((NettyConnection)connection.getTransportConnection()).getChannel());
+            certificates = CertificateUtil.getCertsFromChannel(((NettyConnection) connection.getTransportConnection()).getChannel());
          }
          securityStore.authenticate(username, password, certificates);
       }
@@ -1428,7 +1428,7 @@ public class ActiveMQServerImpl implements ActiveMQServer {
          throw ActiveMQMessageBundle.BUNDLE.bindingNotDivert(name);
       }
 
-      postOffice.removeBinding(name, null);
+      postOffice.removeBinding(name, null, true);
    }
 
    @Override
@@ -1954,11 +1954,16 @@ public class ActiveMQServerImpl implements ActiveMQServer {
       boolean failedAlready = false;
 
       @Override
-      public synchronized void onIOException(Exception cause, String message, SequentialFile
file) {
+      public synchronized void onIOException(Throwable cause, String message, SequentialFile
file) {
          if (!failedAlready) {
             failedAlready = true;
 
-            ActiveMQServerLogger.LOGGER.ioCriticalIOError(message, file.toString(), cause);
+            if (file == null) {
+               ActiveMQServerLogger.LOGGER.ioCriticalIOError(message, "NULL", cause);
+            }
+            else {
+               ActiveMQServerLogger.LOGGER.ioCriticalIOError(message, file.toString(), cause);
+            }
 
             stopTheServer(true);
          }
@@ -2021,10 +2026,7 @@ public class ActiveMQServerImpl implements ActiveMQServer {
     * move any older data away and log a warning about it.
     */
    void moveServerData() {
-      File[] dataDirs = new File[]{configuration.getBindingsLocation(),
-                                   configuration.getJournalLocation(),
-                                   configuration.getPagingLocation(),
-                                   configuration.getLargeMessagesLocation()};
+      File[] dataDirs = new File[]{configuration.getBindingsLocation(), configuration.getJournalLocation(),
configuration.getPagingLocation(), configuration.getLargeMessagesLocation()};
 
       boolean allEmpty = true;
       int lowestSuffixForMovedData = 1;

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/96849a42/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java
index 5420688..c6d5aee 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java
@@ -21,6 +21,7 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.Executor;
 import java.util.concurrent.ScheduledExecutorService;
 
+import org.apache.activemq.artemis.api.core.ActiveMQException;
 import org.apache.activemq.artemis.api.core.Message;
 import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.core.filter.Filter;
@@ -66,7 +67,15 @@ public class LastValueQueue extends QueueImpl {
 
    @Override
    public synchronized void addTail(final MessageReference ref, final boolean direct) {
-      SimpleString prop = ref.getMessage().getSimpleStringProperty(Message.HDR_LAST_VALUE_NAME);
+      SimpleString prop;
+
+      try {
+         prop = ref.getMessage().getSimpleStringProperty(Message.HDR_LAST_VALUE_NAME);
+      }
+      catch (ActiveMQException e) {
+         criticalError(e);
+         throw new IllegalStateException(e);
+      }
 
       if (prop != null) {
          HolderReference hr = map.get(prop);
@@ -103,45 +112,59 @@ public class LastValueQueue extends QueueImpl {
 
    @Override
    public synchronized void addHead(final MessageReference ref) {
-      SimpleString prop = ref.getMessage().getSimpleStringProperty(Message.HDR_LAST_VALUE_NAME);
+      try {
+         SimpleString prop = ref.getMessage().getSimpleStringProperty(Message.HDR_LAST_VALUE_NAME);
 
-      if (prop != null) {
-         HolderReference hr = map.get(prop);
+         if (prop != null) {
+            HolderReference hr = map.get(prop);
 
-         if (hr != null) {
-            // We keep the current ref and ack the one we are returning
+            if (hr != null) {
+               // We keep the current ref and ack the one we are returning
 
-            super.referenceHandled();
+               super.referenceHandled();
 
-            try {
-               super.acknowledge(ref);
+               try {
+                  super.acknowledge(ref);
+               }
+               catch (Exception e) {
+                  ActiveMQServerLogger.LOGGER.errorAckingOldReference(e);
+               }
             }
-            catch (Exception e) {
-               ActiveMQServerLogger.LOGGER.errorAckingOldReference(e);
+            else {
+               map.put(prop, (HolderReference) ref);
+
+               super.addHead(ref);
             }
          }
          else {
-            map.put(prop, (HolderReference) ref);
-
             super.addHead(ref);
          }
       }
-      else {
-         super.addHead(ref);
+      catch (ActiveMQException e) {
+         criticalError(e);
+         throw new IllegalStateException(e);
       }
    }
 
    @Override
    protected void refRemoved(MessageReference ref) {
-      synchronized (this) {
-         SimpleString prop = ref.getMessage().getSimpleStringProperty(Message.HDR_LAST_VALUE_NAME);
+      try {
 
-         if (prop != null) {
-            map.remove(prop);
+         synchronized (this) {
+            SimpleString prop = ref.getMessage().getSimpleStringProperty(Message.HDR_LAST_VALUE_NAME);
+
+            if (prop != null) {
+               map.remove(prop);
+            }
          }
+
+         super.refRemoved(ref);
+      }
+      catch (ActiveMQException e) {
+         criticalError(e);
+         throw new IllegalStateException(e);
       }
 
-      super.refRemoved(ref);
    }
 
    private class HolderReference implements MessageReference {
@@ -200,7 +223,13 @@ public class LastValueQueue extends QueueImpl {
 
       @Override
       public ServerMessage getMessage() {
-         return ref.getMessage();
+         try {
+            return ref.getMessage();
+         }
+         catch (ActiveMQException e) {
+            criticalError(e);
+            throw new IllegalStateException(e);
+         }
       }
 
       @Override
@@ -256,7 +285,13 @@ public class LastValueQueue extends QueueImpl {
        */
       @Override
       public int getMessageMemoryEstimate() {
-         return ref.getMessage().getMemoryEstimate();
+         try {
+            return ref.getMessage().getMemoryEstimate();
+         }
+         catch (ActiveMQException e) {
+            criticalError(e);
+            throw new IllegalStateException(e);
+         }
       }
 
       /* (non-Javadoc)

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/96849a42/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/MessageReferenceImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/MessageReferenceImpl.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/MessageReferenceImpl.java
index 96413f7..fd04b6d 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/MessageReferenceImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/MessageReferenceImpl.java
@@ -208,7 +208,7 @@ public class MessageReferenceImpl implements MessageReference {
       }
 
       if (other instanceof MessageReferenceImpl) {
-         MessageReference reference = (MessageReferenceImpl) other;
+         MessageReferenceImpl reference = (MessageReferenceImpl) other;
 
          if (this.getMessage().equals(reference.getMessage()))
             return true;

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/96849a42/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
index 18eb0b8..c963e4d 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
@@ -40,6 +40,7 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 
+import org.apache.activemq.artemis.api.core.ActiveMQException;
 import org.apache.activemq.artemis.api.core.Message;
 import org.apache.activemq.artemis.api.core.Pair;
 import org.apache.activemq.artemis.api.core.SimpleString;
@@ -893,7 +894,7 @@ public class QueueImpl implements Queue {
    }
 
    @Override
-   public synchronized MessageReference getReference(final long id1) {
+   public synchronized MessageReference getReference(final long id1) throws ActiveMQException
{
       LinkedListIterator<MessageReference> iterator = iterator();
 
       try {
@@ -1053,7 +1054,13 @@ public class QueueImpl implements Queue {
 
    @Override
    public void cancel(final Transaction tx, final MessageReference reference, boolean ignoreRedeliveryCheck)
{
-      getRefsOperation(tx, ignoreRedeliveryCheck).addAck(reference);
+      try {
+         getRefsOperation(tx, ignoreRedeliveryCheck).addAck(reference);
+      }
+      catch (ActiveMQException e) {
+         criticalError(e);
+         getPageSubscription().getPagingStore().criticalException(e);
+      }
    }
 
    @Override
@@ -1102,7 +1109,7 @@ public class QueueImpl implements Queue {
    }
 
    @Override
-   public void deliverScheduledMessages() {
+   public void deliverScheduledMessages() throws ActiveMQException {
       List<MessageReference> scheduledMessages = scheduledDeliveryHandler.cancel(null);
       if (scheduledMessages != null && scheduledMessages.size() > 0) {
          for (MessageReference ref : scheduledMessages) {
@@ -1311,7 +1318,7 @@ public class QueueImpl implements Queue {
       Transaction tx = new BindingsTransactionImpl(storageManager);
 
       try {
-         postOffice.removeBinding(name, tx);
+         postOffice.removeBinding(name, tx, true);
 
          deleteAllReferences();
 
@@ -1770,7 +1777,12 @@ public class QueueImpl implements Queue {
 
    private synchronized void internalAddTail(final MessageReference ref) {
       refAdded(ref);
-      messageReferences.addTail(ref, ref.getMessage().getPriority());
+      try {
+         messageReferences.addTail(ref, ref.getMessage().getPriority());
+      }
+      catch (ActiveMQException e) {
+         criticalError(e);
+      }
    }
 
    /**
@@ -1781,9 +1793,18 @@ public class QueueImpl implements Queue {
     * @param ref
     */
    private void internalAddHead(final MessageReference ref) {
-      queueMemorySize.addAndGet(ref.getMessageMemoryEstimate());
-      refAdded(ref);
-      messageReferences.addHead(ref, ref.getMessage().getPriority());
+      try {
+         queueMemorySize.addAndGet(ref.getMessageMemoryEstimate());
+         refAdded(ref);
+         messageReferences.addHead(ref, ref.getMessage().getPriority());
+      }
+      catch (ActiveMQException e) {
+         criticalError(e);
+      }
+   }
+
+   void criticalError(ActiveMQException e) {
+      storageManager.criticalError(e);
    }
 
    private synchronized void doInternalPoll() {
@@ -2011,14 +2032,17 @@ public class QueueImpl implements Queue {
          return null;
       }
       else {
-         // But we don't use the groupID on internal queues (clustered queues) otherwise
the group map would leak forever
-         return ref.getMessage().getSimpleStringProperty(Message.HDR_GROUP_ID);
+         try {
+            // But we don't use the groupID on internal queues (clustered queues) otherwise
the group map would leak forever
+            return ref.getMessage().getSimpleStringProperty(Message.HDR_GROUP_ID);
+         }
+         catch (ActiveMQException e) {
+            criticalError(e);
+            throw new IllegalStateException(e);
+         }
       }
    }
 
-   /**
-    * @param ref
-    */
    protected void refRemoved(MessageReference ref) {
       queueMemorySize.addAndGet(-ref.getMessageMemoryEstimate());
       if (ref.isPaged()) {
@@ -2026,9 +2050,6 @@ public class QueueImpl implements Queue {
       }
    }
 
-   /**
-    * @param ref
-    */
    protected void refAdded(final MessageReference ref) {
       if (ref.isPaged()) {
          pagedReferences.incrementAndGet();
@@ -2502,23 +2523,29 @@ public class QueueImpl implements Queue {
    }
 
    private boolean checkExpired(final MessageReference reference) {
-      if (reference.getMessage().isExpired()) {
-         if (isTrace) {
-            ActiveMQServerLogger.LOGGER.trace("Reference " + reference + " is expired");
-         }
-         reference.handled();
+      try {
+         if (reference.getMessage().isExpired()) {
+            if (isTrace) {
+               ActiveMQServerLogger.LOGGER.trace("Reference " + reference + " is expired");
+            }
+            reference.handled();
 
-         try {
-            expire(reference);
+            try {
+               expire(reference);
+            }
+            catch (Exception e) {
+               ActiveMQServerLogger.LOGGER.errorExpiringRef(e);
+            }
+
+            return true;
          }
-         catch (Exception e) {
-            ActiveMQServerLogger.LOGGER.errorExpiringRef(e);
+         else {
+            return false;
          }
-
-         return true;
       }
-      else {
-         return false;
+      catch (ActiveMQException e) {
+         criticalError(e);
+         throw new IllegalStateException(e);
       }
    }
 
@@ -2557,7 +2584,7 @@ public class QueueImpl implements Queue {
    }
 
    @Override
-   public void postAcknowledge(final MessageReference ref) {
+   public void postAcknowledge(final MessageReference ref) throws ActiveMQException {
       QueueImpl queue = (QueueImpl) ref.getQueue();
 
       queue.decDelivering();

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/96849a42/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/RefsOperation.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/RefsOperation.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/RefsOperation.java
index d117186..92d1a61 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/RefsOperation.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/RefsOperation.java
@@ -16,6 +16,13 @@
  */
 package org.apache.activemq.artemis.core.server.impl;
 
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.activemq.artemis.api.core.ActiveMQException;
 import org.apache.activemq.artemis.core.persistence.StorageManager;
 import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
 import org.apache.activemq.artemis.core.server.MessageReference;
@@ -25,12 +32,6 @@ import org.apache.activemq.artemis.core.transaction.Transaction;
 import org.apache.activemq.artemis.core.transaction.TransactionOperationAbstract;
 import org.apache.activemq.artemis.core.transaction.impl.TransactionImpl;
 
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-
 public class RefsOperation extends TransactionOperationAbstract {
 
    private final StorageManager storageManager;
@@ -55,7 +56,7 @@ public class RefsOperation extends TransactionOperationAbstract {
       ignoreRedeliveryCheck = true;
    }
 
-   synchronized void addAck(final MessageReference ref) {
+   synchronized void addAck(final MessageReference ref) throws ActiveMQException {
       refsToAck.add(ref);
       if (ref.isPaged()) {
          if (pagedMessagesToPostACK == null) {
@@ -147,7 +148,17 @@ public class RefsOperation extends TransactionOperationAbstract {
    public void afterCommit(final Transaction tx) {
       for (MessageReference ref : refsToAck) {
          synchronized (ref.getQueue()) {
-            queue.postAcknowledge(ref);
+            try {
+               queue.postAcknowledge(ref);
+            }
+            catch (ActiveMQException e) {
+               if (queue instanceof QueueImpl) {
+                  ((QueueImpl) queue).criticalError(e);
+               }
+               else {
+                  ActiveMQServerLogger.LOGGER.warn(e.getMessage(), e);
+               }
+            }
          }
       }
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/96849a42/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerImpl.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerImpl.java
index f9ee1ce..6b5e2e2 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerImpl.java
@@ -28,6 +28,7 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 
+import org.apache.activemq.artemis.api.core.ActiveMQException;
 import org.apache.activemq.artemis.core.filter.Filter;
 import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
 import org.apache.activemq.artemis.core.server.MessageReference;
@@ -97,7 +98,7 @@ public class ScheduledDeliveryHandlerImpl implements ScheduledDeliveryHandler
{
    }
 
    @Override
-   public List<MessageReference> cancel(final Filter filter) {
+   public List<MessageReference> cancel(final Filter filter) throws ActiveMQException
{
       List<MessageReference> refs = new ArrayList<>();
 
       synchronized (scheduledReferences) {
@@ -115,7 +116,7 @@ public class ScheduledDeliveryHandlerImpl implements ScheduledDeliveryHandler
{
    }
 
    @Override
-   public MessageReference removeReferenceWithID(final long id) {
+   public MessageReference removeReferenceWithID(final long id) throws ActiveMQException
{
       synchronized (scheduledReferences) {
          Iterator<RefScheduled> iter = scheduledReferences.iterator();
          while (iter.hasNext()) {

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/96849a42/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
index 4a8b16a..7d54d31 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
@@ -286,7 +286,7 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener
{
          // should go back into the
          // queue for delivery later.
          // TCP-flow control has to be done first than everything else otherwise we may lose
notifications
-         if (!callback.isWritable(this) || !started || transferring ) {
+         if (!callback.isWritable(this) || !started || transferring) {
             return HandleStatus.BUSY;
          }
 
@@ -733,25 +733,63 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener
{
       }
    }
 
-   @Override
-   public void individualAcknowledge(final Transaction tx, final long messageID) throws Exception
{
+   public void individualAcknowledge(Transaction tx,
+                                     final long messageID) throws Exception {
       if (browseOnly) {
          return;
       }
 
-      MessageReference ref = removeReferenceByID(messageID);
+      boolean startedTransaction = false;
 
-      if (ref == null) {
-         ActiveMQIllegalStateException ils = ActiveMQMessageBundle.BUNDLE.consumerNoReference(id,
messageID, messageQueue.getName());
-         if (tx != null) {
-            tx.markAsRollbackOnly(ils);
-         }
-         throw ils;
+      if (tx == null) {
+         startedTransaction = true;
+         tx = new TransactionImpl(storageManager);
       }
 
-      ackReference(tx, ref);
+      try {
+
+         MessageReference ref;
+         ref = removeReferenceByID(messageID);
+
+         if (ActiveMQServerLogger.LOGGER.isTraceEnabled()) {
+            ActiveMQServerLogger.LOGGER.trace("ACKing ref " + ref + " on tx= " + tx + ",
consumer=" + this);
+         }
+
+         if (ref == null) {
+            ActiveMQIllegalStateException ils = new ActiveMQIllegalStateException("Cannot
find ref to ack " + messageID);
+            if (tx != null) {
+               tx.markAsRollbackOnly(ils);
+            }
+            throw ils;
+         }
+
+         ackReference(tx, ref);
+
+         if (startedTransaction) {
+            tx.commit();
+         }
+      }
+      catch (ActiveMQException e) {
+         if (startedTransaction) {
+            tx.rollback();
+         }
+         else {
+            tx.markAsRollbackOnly(e);
+         }
+         throw e;
+      }
+      catch (Throwable e) {
+         ActiveMQServerLogger.LOGGER.errorAckingMessage((Exception) e);
+         ActiveMQIllegalStateException hqex = new ActiveMQIllegalStateException(e.getMessage());
+         if (startedTransaction) {
+            tx.rollback();
+         }
+         else {
+            tx.markAsRollbackOnly(hqex);
+         }
+         throw hqex;
+      }
 
-      acks++;
    }
 
    @Override

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/96849a42/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
index e21102c..ebe2f8a 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
@@ -316,6 +316,21 @@ public class ServerSessionImpl implements ServerSession, FailureListener
{
    }
 
    @Override
+   public void markTXFailed(Throwable e) {
+      Transaction currentTX = this.tx;
+      if (currentTX != null) {
+         if (e instanceof ActiveMQException) {
+            currentTX.markAsRollbackOnly((ActiveMQException) e);
+         }
+         else {
+            ActiveMQException exception = new ActiveMQException(e.getMessage());
+            exception.initCause(e);
+            currentTX.markAsRollbackOnly(exception);
+         }
+      }
+   }
+
+   @Override
    public boolean removeConsumer(final long consumerID) throws Exception {
       return consumers.remove(consumerID) != null;
    }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/96849a42/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java
b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java
index 49dcbe8..3f726f0 100644
--- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java
+++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java
@@ -229,7 +229,7 @@ public class ScheduledDeliveryHandlerTest extends Assert {
       }
    }
 
-   private void validateSequence(ScheduledDeliveryHandlerImpl handler) {
+   private void validateSequence(ScheduledDeliveryHandlerImpl handler) throws Exception {
       long lastSequence = -1;
       for (MessageReference ref : handler.getScheduledReferences()) {
          assertEquals(lastSequence + 1, ref.getMessage().getMessageID());
@@ -256,7 +256,7 @@ public class ScheduledDeliveryHandlerTest extends Assert {
       handler.checkAndSchedule(refImpl, tail);
    }
 
-   private void debugList(boolean fail, ScheduledDeliveryHandlerImpl handler, long numberOfExpectedMessages)
{
+   private void debugList(boolean fail, ScheduledDeliveryHandlerImpl handler, long numberOfExpectedMessages)
throws Exception {
       List<MessageReference> refs = handler.getScheduledReferences();
 
       HashSet<Long> messages = new HashSet<>();

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/96849a42/artemis-server/src/test/java/org/apache/activemq/artemis/core/transaction/impl/TransactionImplTest.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/transaction/impl/TransactionImplTest.java
b/artemis-server/src/test/java/org/apache/activemq/artemis/core/transaction/impl/TransactionImplTest.java
index 3909c3c..6c5cfe5 100644
--- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/transaction/impl/TransactionImplTest.java
+++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/transaction/impl/TransactionImplTest.java
@@ -208,6 +208,11 @@ public class TransactionImplTest extends ActiveMQTestBase {
       }
 
       @Override
+      public void criticalError(Throwable error) {
+         error.printStackTrace();
+      }
+
+      @Override
       public OperationContext newContext(Executor executor) {
          return null;
       }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/96849a42/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/DuplicateDetectionTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/DuplicateDetectionTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/DuplicateDetectionTest.java
index 05a48e9..5fe8953 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/DuplicateDetectionTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/DuplicateDetectionTest.java
@@ -16,10 +16,16 @@
  */
 package org.apache.activemq.artemis.tests.integration;
 
+import javax.transaction.xa.XAException;
+import javax.transaction.xa.XAResource;
+import javax.transaction.xa.Xid;
+
 import org.apache.activemq.artemis.api.core.ActiveMQDuplicateIdException;
 import org.apache.activemq.artemis.api.core.ActiveMQException;
 import org.apache.activemq.artemis.api.core.Message;
 import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.api.core.TransportConfiguration;
+import org.apache.activemq.artemis.api.core.client.ActiveMQClient;
 import org.apache.activemq.artemis.api.core.client.ClientConsumer;
 import org.apache.activemq.artemis.api.core.client.ClientMessage;
 import org.apache.activemq.artemis.api.core.client.ClientProducer;
@@ -37,10 +43,6 @@ import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 
-import javax.transaction.xa.XAException;
-import javax.transaction.xa.XAResource;
-import javax.transaction.xa.Xid;
-
 public class DuplicateDetectionTest extends ActiveMQTestBase {
 
    private static final IntegrationTestLogger log = IntegrationTestLogger.LOGGER;
@@ -213,6 +215,75 @@ public class DuplicateDetectionTest extends ActiveMQTestBase {
       Assert.assertEquals(0, ((PostOfficeImpl) server.getPostOffice()).getDuplicateIDCaches().size());
    }
 
+   // It is important to test the shrink with this rule
+   // because we could have this after crashes
+   // we would eventually have a higher number of caches while we couldn't have time to clear
previous ones
+   @Test
+   public void testShrinkCache() throws Exception {
+      server.stop();
+      server.getConfiguration().setIDCacheSize(150);
+      server.start();
+
+      final int TEST_SIZE = 200;
+
+      ServerLocator locator = ActiveMQClient.createServerLocatorWithoutHA(new TransportConfiguration(INVM_CONNECTOR_FACTORY));
+
+      locator.setBlockOnNonDurableSend(true);
+
+      ClientSessionFactory sf = createSessionFactory(locator);
+
+      ClientSession session = sf.createSession(false, true, true);
+
+      session.start();
+
+      final SimpleString queueName = new SimpleString("DuplicateDetectionTestQueue");
+      session.createQueue(queueName, queueName, null, true);
+
+      ClientProducer producer = session.createProducer(queueName);
+
+      for (int i = 0; i < TEST_SIZE; i++) {
+         ClientMessage message = session.createMessage(true);
+         message.putStringProperty(Message.HDR_DUPLICATE_DETECTION_ID, SimpleString.toSimpleString("DUPL-"
+ i));
+         producer.send(message);
+      }
+      session.commit();
+
+      sf.close();
+      session.close();
+      locator.close();
+
+      server.stop();
+
+      server.getConfiguration().setIDCacheSize(100);
+
+      server.start();
+
+      locator = ActiveMQClient.createServerLocatorWithoutHA(new TransportConfiguration(INVM_CONNECTOR_FACTORY));
+
+      locator.setBlockOnNonDurableSend(true);
+      sf = createSessionFactory(locator);
+      session = sf.createSession(false, false, false);
+      session.start();
+
+      producer = session.createProducer(queueName);
+
+      // will send the last 50 again
+      for (int i = TEST_SIZE - 50; i < TEST_SIZE; i++) {
+         ClientMessage message = session.createMessage(true);
+         message.putStringProperty(Message.HDR_DUPLICATE_DETECTION_ID, SimpleString.toSimpleString("DUPL-"
+ i));
+         producer.send(message);
+      }
+
+      try {
+         session.commit();
+         Assert.fail("Exception expected");
+      }
+      catch (ActiveMQException expected) {
+
+      }
+
+   }
+
    @Test
    public void testSimpleDuplicateDetectionWithString() throws Exception {
       ClientSession session = sf.createSession(false, true, true);
@@ -1240,176 +1311,6 @@ public class DuplicateDetectionTest extends ActiveMQTestBase {
    }
 
    @Test
-   public void testDuplicateCachePersistedRestartWithSmallerCache() throws Exception {
-      server.stop();
-
-      final int initialCacheSize = 10;
-      final int subsequentCacheSize = 5;
-
-      config = createDefaultInVMConfig().setIDCacheSize(initialCacheSize);
-
-      server = createServer(config);
-
-      server.start();
-
-      sf = createSessionFactory(locator);
-
-      ClientSession session = sf.createSession(false, true, true);
-
-      session.start();
-
-      final SimpleString queueName = new SimpleString("DuplicateDetectionTestQueue");
-
-      session.createQueue(queueName, queueName, null, false);
-
-      ClientProducer producer = session.createProducer(queueName);
-
-      ClientConsumer consumer = session.createConsumer(queueName);
-
-      for (int i = 0; i < initialCacheSize; i++) {
-         ClientMessage message = createMessage(session, i);
-         SimpleString dupID = new SimpleString("abcdefg" + i);
-         message.putBytesProperty(Message.HDR_DUPLICATE_DETECTION_ID, dupID.getData());
-         producer.send(message);
-         ClientMessage message2 = consumer.receive(1000);
-         Assert.assertEquals(i, message2.getObjectProperty(propKey));
-      }
-
-      session.close();
-
-      sf.close();
-
-      server.stop();
-
-      waitForServerToStop(server);
-
-      config.setIDCacheSize(subsequentCacheSize);
-
-      server = createServer(config);
-
-      server.start();
-
-      sf = createSessionFactory(locator);
-
-      session = sf.createSession(false, true, true);
-
-      session.start();
-
-      session.createQueue(queueName, queueName, null, false);
-
-      producer = session.createProducer(queueName);
-
-      consumer = session.createConsumer(queueName);
-
-      for (int i = 0; i < initialCacheSize; i++) {
-         ClientMessage message = createMessage(session, i);
-         SimpleString dupID = new SimpleString("abcdefg" + i);
-         message.putBytesProperty(Message.HDR_DUPLICATE_DETECTION_ID, dupID.getData());
-         producer.send(message);
-         if (i >= subsequentCacheSize) {
-            // Message should get through
-            ClientMessage message2 = consumer.receive(1000);
-            Assert.assertEquals(i, message2.getObjectProperty(propKey));
-         }
-         else {
-            ClientMessage message2 = consumer.receiveImmediate();
-            Assert.assertNull(message2);
-         }
-      }
-   }
-
-   @Test
-   public void testDuplicateCachePersistedRestartWithSmallerCacheEnsureDeleted() throws Exception
{
-      server.stop();
-
-      final int initialCacheSize = 10;
-      final int subsequentCacheSize = 5;
-
-      config = createDefaultInVMConfig().setIDCacheSize(initialCacheSize);
-
-      server = createServer(config);
-
-      server.start();
-
-      sf = createSessionFactory(locator);
-
-      ClientSession session = sf.createSession(false, true, true);
-
-      session.start();
-
-      final SimpleString queueName = new SimpleString("DuplicateDetectionTestQueue");
-
-      session.createQueue(queueName, queueName, null, false);
-
-      ClientProducer producer = session.createProducer(queueName);
-
-      ClientConsumer consumer = session.createConsumer(queueName);
-
-      for (int i = 0; i < initialCacheSize; i++) {
-         ClientMessage message = createMessage(session, i);
-         SimpleString dupID = new SimpleString("abcdefg" + i);
-         message.putBytesProperty(Message.HDR_DUPLICATE_DETECTION_ID, dupID.getData());
-         producer.send(message);
-         ClientMessage message2 = consumer.receive(1000);
-         Assert.assertEquals(i, message2.getObjectProperty(propKey));
-      }
-
-      session.close();
-
-      sf.close();
-
-      server.stop();
-
-      waitForServerToStop(server);
-
-      config.setIDCacheSize(subsequentCacheSize);
-
-      server = createServer(config);
-
-      server.start();
-
-      // Now stop and set back to original cache size and restart
-
-      server.stop();
-
-      waitForServerToStop(server);
-
-      config.setIDCacheSize(initialCacheSize);
-
-      server = createServer(config);
-
-      server.start();
-
-      sf = createSessionFactory(locator);
-
-      session = sf.createSession(false, true, true);
-
-      session.start();
-
-      session.createQueue(queueName, queueName, null, false);
-
-      producer = session.createProducer(queueName);
-
-      consumer = session.createConsumer(queueName);
-
-      for (int i = 0; i < initialCacheSize; i++) {
-         ClientMessage message = createMessage(session, i);
-         SimpleString dupID = new SimpleString("abcdefg" + i);
-         message.putBytesProperty(Message.HDR_DUPLICATE_DETECTION_ID, dupID.getData());
-         producer.send(message);
-         if (i >= subsequentCacheSize) {
-            // Message should get through
-            ClientMessage message2 = consumer.receive(1000);
-            Assert.assertEquals(i, message2.getObjectProperty(propKey));
-         }
-         else {
-            ClientMessage message2 = consumer.receiveImmediate();
-            Assert.assertNull(message2);
-         }
-      }
-   }
-
-   @Test
    public void testNoPersist() throws Exception {
       server.stop();
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/96849a42/tests/performance-tests/src/test/java/org/apache/activemq/artemis/tests/performance/storage/PersistMultiThreadTest.java
----------------------------------------------------------------------
diff --git a/tests/performance-tests/src/test/java/org/apache/activemq/artemis/tests/performance/storage/PersistMultiThreadTest.java
b/tests/performance-tests/src/test/java/org/apache/activemq/artemis/tests/performance/storage/PersistMultiThreadTest.java
index 6351357..6244330 100644
--- a/tests/performance-tests/src/test/java/org/apache/activemq/artemis/tests/performance/storage/PersistMultiThreadTest.java
+++ b/tests/performance-tests/src/test/java/org/apache/activemq/artemis/tests/performance/storage/PersistMultiThreadTest.java
@@ -257,6 +257,10 @@ public class PersistMultiThreadTest extends ActiveMQTestBase {
       }
 
       @Override
+      public void criticalException(Throwable e) {
+      }
+
+      @Override
       public int getNumberOfPages() {
          return 0;
       }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/96849a42/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/paging/impl/PagingStoreImplTest.java
----------------------------------------------------------------------
diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/paging/impl/PagingStoreImplTest.java
b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/paging/impl/PagingStoreImplTest.java
index 59d2646..5f02cf9 100644
--- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/paging/impl/PagingStoreImplTest.java
+++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/paging/impl/PagingStoreImplTest.java
@@ -782,6 +782,10 @@ public class PagingStoreImplTest extends ActiveMQTestBase {
 
    static final class FakeStoreFactory implements PagingStoreFactory {
 
+      @Override
+      public void criticalException(Throwable e) {
+      }
+
       final SequentialFileFactory factory;
 
       public FakeStoreFactory() {

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/96849a42/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/DuplicateDetectionUnitTest.java
----------------------------------------------------------------------
diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/DuplicateDetectionUnitTest.java
b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/DuplicateDetectionUnitTest.java
index 30e302e..c47041a 100644
--- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/DuplicateDetectionUnitTest.java
+++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/DuplicateDetectionUnitTest.java
@@ -106,7 +106,7 @@ public class DuplicateDetectionUnitTest extends ActiveMQTestBase {
          DuplicateIDCacheImpl cacheID = new DuplicateIDCacheImpl(ADDRESS, 10, journal, true);
 
          for (int i = 0; i < 100; i++) {
-            cacheID.addToCache(RandomUtil.randomBytes(), null);
+            cacheID.addToCache(RandomUtil.randomBytes());
          }
 
          journal.stop();

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/96849a42/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/fakes/FakeConsumer.java
----------------------------------------------------------------------
diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/fakes/FakeConsumer.java
b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/fakes/FakeConsumer.java
index a7be2fa..81015e4 100644
--- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/fakes/FakeConsumer.java
+++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/fakes/FakeConsumer.java
@@ -93,41 +93,47 @@ public class FakeConsumer implements Consumer {
 
    @Override
    public synchronized HandleStatus handle(final MessageReference reference) {
-      if (statusToReturn == HandleStatus.BUSY) {
-         return HandleStatus.BUSY;
-      }
-
-      if (filter != null) {
-         if (filter.match(reference.getMessage())) {
-            references.addLast(reference);
-            reference.getQueue().referenceHandled();
-            notify();
-
-            return HandleStatus.HANDLED;
+      try {
+         if (statusToReturn == HandleStatus.BUSY) {
+            return HandleStatus.BUSY;
          }
-         else {
-            return HandleStatus.NO_MATCH;
+
+         if (filter != null) {
+            if (filter.match(reference.getMessage())) {
+               references.addLast(reference);
+               reference.getQueue().referenceHandled();
+               notify();
+
+               return HandleStatus.HANDLED;
+            }
+            else {
+               return HandleStatus.NO_MATCH;
+            }
          }
-      }
 
-      if (newStatus != null) {
-         if (delayCountdown == 0) {
-            statusToReturn = newStatus;
+         if (newStatus != null) {
+            if (delayCountdown == 0) {
+               statusToReturn = newStatus;
 
-            newStatus = null;
+               newStatus = null;
+            }
+            else {
+               delayCountdown--;
+            }
          }
-         else {
-            delayCountdown--;
+
+         if (statusToReturn == HandleStatus.HANDLED) {
+            reference.getQueue().referenceHandled();
+            references.addLast(reference);
+            notify();
          }
-      }
 
-      if (statusToReturn == HandleStatus.HANDLED) {
-         reference.getQueue().referenceHandled();
-         references.addLast(reference);
-         notify();
+         return statusToReturn;
+      }
+      catch (Exception e) {
+         e.printStackTrace();
+         throw new IllegalStateException(e.getMessage(), e);
       }
-
-      return statusToReturn;
    }
 
    @Override

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/96849a42/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/fakes/FakePostOffice.java
----------------------------------------------------------------------
diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/fakes/FakePostOffice.java
b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/fakes/FakePostOffice.java
index 27d9c33..4f8a007 100644
--- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/fakes/FakePostOffice.java
+++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/fakes/FakePostOffice.java
@@ -113,8 +113,7 @@ public class FakePostOffice implements PostOffice {
    }
 
    @Override
-   public Binding removeBinding(final SimpleString uniqueName, final Transaction tx) throws
Exception {
-
+   public Binding removeBinding(SimpleString uniqueName, Transaction tx, boolean deleteData)
throws Exception {
       return null;
    }
 


Mime
View raw message