Return-Path: X-Original-To: apmail-activemq-commits-archive@www.apache.org Delivered-To: apmail-activemq-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id C5FB018FC0 for ; Tue, 5 Jan 2016 14:15:09 +0000 (UTC) Received: (qmail 38665 invoked by uid 500); 5 Jan 2016 14:15:09 -0000 Delivered-To: apmail-activemq-commits-archive@activemq.apache.org Received: (qmail 38595 invoked by uid 500); 5 Jan 2016 14:15:09 -0000 Mailing-List: contact commits-help@activemq.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@activemq.apache.org Delivered-To: mailing list commits@activemq.apache.org Received: (qmail 38586 invoked by uid 99); 5 Jan 2016 14:15:09 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 05 Jan 2016 14:15:09 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 1C272E03CE; Tue, 5 Jan 2016 14:15:09 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: clebertsuconic@apache.org To: commits@activemq.apache.org Date: Tue, 05 Jan 2016 14:15:10 -0000 Message-Id: <751ecc80c2524e22924284a99d668b02@git.apache.org> In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [2/4] activemq-artemis git commit: ARTEMIS-332 - Duplicate delivery over Bridges under OME scenarios, paging and other failures http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/96849a42/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java index b0a5a7e..ae93a97 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java @@ -415,7 +415,7 @@ public class ActiveMQServerImpl implements ActiveMQServer { backupActivationThread.start(); } else { - ActiveMQServerLogger.LOGGER.serverStarted(getVersion().getFullVersion(), configuration.getName(), nodeManager.getNodeId(), identity != null ? identity : "" ); + ActiveMQServerLogger.LOGGER.serverStarted(getVersion().getFullVersion(), configuration.getName(), nodeManager.getNodeId(), identity != null ? identity : ""); } // start connector service connectorsService = new ConnectorsService(configuration, storageManager, scheduledPool, postOffice, serviceRegistry); @@ -508,18 +508,19 @@ public class ActiveMQServerImpl implements ActiveMQServer { * Stops the server in a different thread. */ public final void stopTheServer(final boolean criticalIOError) { - ExecutorService executor = Executors.newSingleThreadExecutor(); - executor.submit(new Runnable() { + Thread thread = new Thread() { @Override public void run() { try { - stop(false, criticalIOError, false); + ActiveMQServerImpl.this.stop(false, criticalIOError, false); } catch (Exception e) { ActiveMQServerLogger.LOGGER.errorStoppingServer(e); } } - }); + }; + + thread.start(); } @Override @@ -722,7 +723,6 @@ public class ActiveMQServerImpl implements ActiveMQServer { } } - pagingManager = null; securityStore = null; resourceManager = null; @@ -1016,7 +1016,7 @@ public class ActiveMQServerImpl implements ActiveMQServer { if (securityStore != null) { X509Certificate[] certificates = null; if (connection.getTransportConnection() instanceof NettyConnection) { - certificates = CertificateUtil.getCertsFromChannel(((NettyConnection)connection.getTransportConnection()).getChannel()); + certificates = CertificateUtil.getCertsFromChannel(((NettyConnection) connection.getTransportConnection()).getChannel()); } securityStore.authenticate(username, password, certificates); } @@ -1428,7 +1428,7 @@ public class ActiveMQServerImpl implements ActiveMQServer { throw ActiveMQMessageBundle.BUNDLE.bindingNotDivert(name); } - postOffice.removeBinding(name, null); + postOffice.removeBinding(name, null, true); } @Override @@ -1954,11 +1954,16 @@ public class ActiveMQServerImpl implements ActiveMQServer { boolean failedAlready = false; @Override - public synchronized void onIOException(Exception cause, String message, SequentialFile file) { + public synchronized void onIOException(Throwable cause, String message, SequentialFile file) { if (!failedAlready) { failedAlready = true; - ActiveMQServerLogger.LOGGER.ioCriticalIOError(message, file.toString(), cause); + if (file == null) { + ActiveMQServerLogger.LOGGER.ioCriticalIOError(message, "NULL", cause); + } + else { + ActiveMQServerLogger.LOGGER.ioCriticalIOError(message, file.toString(), cause); + } stopTheServer(true); } @@ -2021,10 +2026,7 @@ public class ActiveMQServerImpl implements ActiveMQServer { * move any older data away and log a warning about it. */ void moveServerData() { - File[] dataDirs = new File[]{configuration.getBindingsLocation(), - configuration.getJournalLocation(), - configuration.getPagingLocation(), - configuration.getLargeMessagesLocation()}; + File[] dataDirs = new File[]{configuration.getBindingsLocation(), configuration.getJournalLocation(), configuration.getPagingLocation(), configuration.getLargeMessagesLocation()}; boolean allEmpty = true; int lowestSuffixForMovedData = 1; http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/96849a42/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java index 5420688..c6d5aee 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java @@ -21,6 +21,7 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Executor; import java.util.concurrent.ScheduledExecutorService; +import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.api.core.Message; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.core.filter.Filter; @@ -66,7 +67,15 @@ public class LastValueQueue extends QueueImpl { @Override public synchronized void addTail(final MessageReference ref, final boolean direct) { - SimpleString prop = ref.getMessage().getSimpleStringProperty(Message.HDR_LAST_VALUE_NAME); + SimpleString prop; + + try { + prop = ref.getMessage().getSimpleStringProperty(Message.HDR_LAST_VALUE_NAME); + } + catch (ActiveMQException e) { + criticalError(e); + throw new IllegalStateException(e); + } if (prop != null) { HolderReference hr = map.get(prop); @@ -103,45 +112,59 @@ public class LastValueQueue extends QueueImpl { @Override public synchronized void addHead(final MessageReference ref) { - SimpleString prop = ref.getMessage().getSimpleStringProperty(Message.HDR_LAST_VALUE_NAME); + try { + SimpleString prop = ref.getMessage().getSimpleStringProperty(Message.HDR_LAST_VALUE_NAME); - if (prop != null) { - HolderReference hr = map.get(prop); + if (prop != null) { + HolderReference hr = map.get(prop); - if (hr != null) { - // We keep the current ref and ack the one we are returning + if (hr != null) { + // We keep the current ref and ack the one we are returning - super.referenceHandled(); + super.referenceHandled(); - try { - super.acknowledge(ref); + try { + super.acknowledge(ref); + } + catch (Exception e) { + ActiveMQServerLogger.LOGGER.errorAckingOldReference(e); + } } - catch (Exception e) { - ActiveMQServerLogger.LOGGER.errorAckingOldReference(e); + else { + map.put(prop, (HolderReference) ref); + + super.addHead(ref); } } else { - map.put(prop, (HolderReference) ref); - super.addHead(ref); } } - else { - super.addHead(ref); + catch (ActiveMQException e) { + criticalError(e); + throw new IllegalStateException(e); } } @Override protected void refRemoved(MessageReference ref) { - synchronized (this) { - SimpleString prop = ref.getMessage().getSimpleStringProperty(Message.HDR_LAST_VALUE_NAME); + try { - if (prop != null) { - map.remove(prop); + synchronized (this) { + SimpleString prop = ref.getMessage().getSimpleStringProperty(Message.HDR_LAST_VALUE_NAME); + + if (prop != null) { + map.remove(prop); + } } + + super.refRemoved(ref); + } + catch (ActiveMQException e) { + criticalError(e); + throw new IllegalStateException(e); } - super.refRemoved(ref); } private class HolderReference implements MessageReference { @@ -200,7 +223,13 @@ public class LastValueQueue extends QueueImpl { @Override public ServerMessage getMessage() { - return ref.getMessage(); + try { + return ref.getMessage(); + } + catch (ActiveMQException e) { + criticalError(e); + throw new IllegalStateException(e); + } } @Override @@ -256,7 +285,13 @@ public class LastValueQueue extends QueueImpl { */ @Override public int getMessageMemoryEstimate() { - return ref.getMessage().getMemoryEstimate(); + try { + return ref.getMessage().getMemoryEstimate(); + } + catch (ActiveMQException e) { + criticalError(e); + throw new IllegalStateException(e); + } } /* (non-Javadoc) http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/96849a42/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/MessageReferenceImpl.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/MessageReferenceImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/MessageReferenceImpl.java index 96413f7..fd04b6d 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/MessageReferenceImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/MessageReferenceImpl.java @@ -208,7 +208,7 @@ public class MessageReferenceImpl implements MessageReference { } if (other instanceof MessageReferenceImpl) { - MessageReference reference = (MessageReferenceImpl) other; + MessageReferenceImpl reference = (MessageReferenceImpl) other; if (this.getMessage().equals(reference.getMessage())) return true; http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/96849a42/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java index 18eb0b8..c963e4d 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java @@ -40,6 +40,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; +import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.api.core.Message; import org.apache.activemq.artemis.api.core.Pair; import org.apache.activemq.artemis.api.core.SimpleString; @@ -893,7 +894,7 @@ public class QueueImpl implements Queue { } @Override - public synchronized MessageReference getReference(final long id1) { + public synchronized MessageReference getReference(final long id1) throws ActiveMQException { LinkedListIterator iterator = iterator(); try { @@ -1053,7 +1054,13 @@ public class QueueImpl implements Queue { @Override public void cancel(final Transaction tx, final MessageReference reference, boolean ignoreRedeliveryCheck) { - getRefsOperation(tx, ignoreRedeliveryCheck).addAck(reference); + try { + getRefsOperation(tx, ignoreRedeliveryCheck).addAck(reference); + } + catch (ActiveMQException e) { + criticalError(e); + getPageSubscription().getPagingStore().criticalException(e); + } } @Override @@ -1102,7 +1109,7 @@ public class QueueImpl implements Queue { } @Override - public void deliverScheduledMessages() { + public void deliverScheduledMessages() throws ActiveMQException { List scheduledMessages = scheduledDeliveryHandler.cancel(null); if (scheduledMessages != null && scheduledMessages.size() > 0) { for (MessageReference ref : scheduledMessages) { @@ -1311,7 +1318,7 @@ public class QueueImpl implements Queue { Transaction tx = new BindingsTransactionImpl(storageManager); try { - postOffice.removeBinding(name, tx); + postOffice.removeBinding(name, tx, true); deleteAllReferences(); @@ -1770,7 +1777,12 @@ public class QueueImpl implements Queue { private synchronized void internalAddTail(final MessageReference ref) { refAdded(ref); - messageReferences.addTail(ref, ref.getMessage().getPriority()); + try { + messageReferences.addTail(ref, ref.getMessage().getPriority()); + } + catch (ActiveMQException e) { + criticalError(e); + } } /** @@ -1781,9 +1793,18 @@ public class QueueImpl implements Queue { * @param ref */ private void internalAddHead(final MessageReference ref) { - queueMemorySize.addAndGet(ref.getMessageMemoryEstimate()); - refAdded(ref); - messageReferences.addHead(ref, ref.getMessage().getPriority()); + try { + queueMemorySize.addAndGet(ref.getMessageMemoryEstimate()); + refAdded(ref); + messageReferences.addHead(ref, ref.getMessage().getPriority()); + } + catch (ActiveMQException e) { + criticalError(e); + } + } + + void criticalError(ActiveMQException e) { + storageManager.criticalError(e); } private synchronized void doInternalPoll() { @@ -2011,14 +2032,17 @@ public class QueueImpl implements Queue { return null; } else { - // But we don't use the groupID on internal queues (clustered queues) otherwise the group map would leak forever - return ref.getMessage().getSimpleStringProperty(Message.HDR_GROUP_ID); + try { + // But we don't use the groupID on internal queues (clustered queues) otherwise the group map would leak forever + return ref.getMessage().getSimpleStringProperty(Message.HDR_GROUP_ID); + } + catch (ActiveMQException e) { + criticalError(e); + throw new IllegalStateException(e); + } } } - /** - * @param ref - */ protected void refRemoved(MessageReference ref) { queueMemorySize.addAndGet(-ref.getMessageMemoryEstimate()); if (ref.isPaged()) { @@ -2026,9 +2050,6 @@ public class QueueImpl implements Queue { } } - /** - * @param ref - */ protected void refAdded(final MessageReference ref) { if (ref.isPaged()) { pagedReferences.incrementAndGet(); @@ -2502,23 +2523,29 @@ public class QueueImpl implements Queue { } private boolean checkExpired(final MessageReference reference) { - if (reference.getMessage().isExpired()) { - if (isTrace) { - ActiveMQServerLogger.LOGGER.trace("Reference " + reference + " is expired"); - } - reference.handled(); + try { + if (reference.getMessage().isExpired()) { + if (isTrace) { + ActiveMQServerLogger.LOGGER.trace("Reference " + reference + " is expired"); + } + reference.handled(); - try { - expire(reference); + try { + expire(reference); + } + catch (Exception e) { + ActiveMQServerLogger.LOGGER.errorExpiringRef(e); + } + + return true; } - catch (Exception e) { - ActiveMQServerLogger.LOGGER.errorExpiringRef(e); + else { + return false; } - - return true; } - else { - return false; + catch (ActiveMQException e) { + criticalError(e); + throw new IllegalStateException(e); } } @@ -2557,7 +2584,7 @@ public class QueueImpl implements Queue { } @Override - public void postAcknowledge(final MessageReference ref) { + public void postAcknowledge(final MessageReference ref) throws ActiveMQException { QueueImpl queue = (QueueImpl) ref.getQueue(); queue.decDelivering(); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/96849a42/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/RefsOperation.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/RefsOperation.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/RefsOperation.java index d117186..92d1a61 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/RefsOperation.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/RefsOperation.java @@ -16,6 +16,13 @@ */ package org.apache.activemq.artemis.core.server.impl; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; + +import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.core.persistence.StorageManager; import org.apache.activemq.artemis.core.server.ActiveMQServerLogger; import org.apache.activemq.artemis.core.server.MessageReference; @@ -25,12 +32,6 @@ import org.apache.activemq.artemis.core.transaction.Transaction; import org.apache.activemq.artemis.core.transaction.TransactionOperationAbstract; import org.apache.activemq.artemis.core.transaction.impl.TransactionImpl; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; - public class RefsOperation extends TransactionOperationAbstract { private final StorageManager storageManager; @@ -55,7 +56,7 @@ public class RefsOperation extends TransactionOperationAbstract { ignoreRedeliveryCheck = true; } - synchronized void addAck(final MessageReference ref) { + synchronized void addAck(final MessageReference ref) throws ActiveMQException { refsToAck.add(ref); if (ref.isPaged()) { if (pagedMessagesToPostACK == null) { @@ -147,7 +148,17 @@ public class RefsOperation extends TransactionOperationAbstract { public void afterCommit(final Transaction tx) { for (MessageReference ref : refsToAck) { synchronized (ref.getQueue()) { - queue.postAcknowledge(ref); + try { + queue.postAcknowledge(ref); + } + catch (ActiveMQException e) { + if (queue instanceof QueueImpl) { + ((QueueImpl) queue).criticalError(e); + } + else { + ActiveMQServerLogger.LOGGER.warn(e.getMessage(), e); + } + } } } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/96849a42/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerImpl.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerImpl.java index f9ee1ce..6b5e2e2 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerImpl.java @@ -28,6 +28,7 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; +import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.core.filter.Filter; import org.apache.activemq.artemis.core.server.ActiveMQServerLogger; import org.apache.activemq.artemis.core.server.MessageReference; @@ -97,7 +98,7 @@ public class ScheduledDeliveryHandlerImpl implements ScheduledDeliveryHandler { } @Override - public List cancel(final Filter filter) { + public List cancel(final Filter filter) throws ActiveMQException { List refs = new ArrayList<>(); synchronized (scheduledReferences) { @@ -115,7 +116,7 @@ public class ScheduledDeliveryHandlerImpl implements ScheduledDeliveryHandler { } @Override - public MessageReference removeReferenceWithID(final long id) { + public MessageReference removeReferenceWithID(final long id) throws ActiveMQException { synchronized (scheduledReferences) { Iterator iter = scheduledReferences.iterator(); while (iter.hasNext()) { http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/96849a42/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java index 4a8b16a..7d54d31 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java @@ -286,7 +286,7 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener { // should go back into the // queue for delivery later. // TCP-flow control has to be done first than everything else otherwise we may lose notifications - if (!callback.isWritable(this) || !started || transferring ) { + if (!callback.isWritable(this) || !started || transferring) { return HandleStatus.BUSY; } @@ -733,25 +733,63 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener { } } - @Override - public void individualAcknowledge(final Transaction tx, final long messageID) throws Exception { + public void individualAcknowledge(Transaction tx, + final long messageID) throws Exception { if (browseOnly) { return; } - MessageReference ref = removeReferenceByID(messageID); + boolean startedTransaction = false; - if (ref == null) { - ActiveMQIllegalStateException ils = ActiveMQMessageBundle.BUNDLE.consumerNoReference(id, messageID, messageQueue.getName()); - if (tx != null) { - tx.markAsRollbackOnly(ils); - } - throw ils; + if (tx == null) { + startedTransaction = true; + tx = new TransactionImpl(storageManager); } - ackReference(tx, ref); + try { + + MessageReference ref; + ref = removeReferenceByID(messageID); + + if (ActiveMQServerLogger.LOGGER.isTraceEnabled()) { + ActiveMQServerLogger.LOGGER.trace("ACKing ref " + ref + " on tx= " + tx + ", consumer=" + this); + } + + if (ref == null) { + ActiveMQIllegalStateException ils = new ActiveMQIllegalStateException("Cannot find ref to ack " + messageID); + if (tx != null) { + tx.markAsRollbackOnly(ils); + } + throw ils; + } + + ackReference(tx, ref); + + if (startedTransaction) { + tx.commit(); + } + } + catch (ActiveMQException e) { + if (startedTransaction) { + tx.rollback(); + } + else { + tx.markAsRollbackOnly(e); + } + throw e; + } + catch (Throwable e) { + ActiveMQServerLogger.LOGGER.errorAckingMessage((Exception) e); + ActiveMQIllegalStateException hqex = new ActiveMQIllegalStateException(e.getMessage()); + if (startedTransaction) { + tx.rollback(); + } + else { + tx.markAsRollbackOnly(hqex); + } + throw hqex; + } - acks++; } @Override http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/96849a42/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java index e21102c..ebe2f8a 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java @@ -316,6 +316,21 @@ public class ServerSessionImpl implements ServerSession, FailureListener { } @Override + public void markTXFailed(Throwable e) { + Transaction currentTX = this.tx; + if (currentTX != null) { + if (e instanceof ActiveMQException) { + currentTX.markAsRollbackOnly((ActiveMQException) e); + } + else { + ActiveMQException exception = new ActiveMQException(e.getMessage()); + exception.initCause(e); + currentTX.markAsRollbackOnly(exception); + } + } + } + + @Override public boolean removeConsumer(final long consumerID) throws Exception { return consumers.remove(consumerID) != null; } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/96849a42/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java index 49dcbe8..3f726f0 100644 --- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java +++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java @@ -229,7 +229,7 @@ public class ScheduledDeliveryHandlerTest extends Assert { } } - private void validateSequence(ScheduledDeliveryHandlerImpl handler) { + private void validateSequence(ScheduledDeliveryHandlerImpl handler) throws Exception { long lastSequence = -1; for (MessageReference ref : handler.getScheduledReferences()) { assertEquals(lastSequence + 1, ref.getMessage().getMessageID()); @@ -256,7 +256,7 @@ public class ScheduledDeliveryHandlerTest extends Assert { handler.checkAndSchedule(refImpl, tail); } - private void debugList(boolean fail, ScheduledDeliveryHandlerImpl handler, long numberOfExpectedMessages) { + private void debugList(boolean fail, ScheduledDeliveryHandlerImpl handler, long numberOfExpectedMessages) throws Exception { List refs = handler.getScheduledReferences(); HashSet messages = new HashSet<>(); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/96849a42/artemis-server/src/test/java/org/apache/activemq/artemis/core/transaction/impl/TransactionImplTest.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/transaction/impl/TransactionImplTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/transaction/impl/TransactionImplTest.java index 3909c3c..6c5cfe5 100644 --- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/transaction/impl/TransactionImplTest.java +++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/transaction/impl/TransactionImplTest.java @@ -208,6 +208,11 @@ public class TransactionImplTest extends ActiveMQTestBase { } @Override + public void criticalError(Throwable error) { + error.printStackTrace(); + } + + @Override public OperationContext newContext(Executor executor) { return null; } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/96849a42/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/DuplicateDetectionTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/DuplicateDetectionTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/DuplicateDetectionTest.java index 05a48e9..5fe8953 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/DuplicateDetectionTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/DuplicateDetectionTest.java @@ -16,10 +16,16 @@ */ package org.apache.activemq.artemis.tests.integration; +import javax.transaction.xa.XAException; +import javax.transaction.xa.XAResource; +import javax.transaction.xa.Xid; + import org.apache.activemq.artemis.api.core.ActiveMQDuplicateIdException; import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.api.core.Message; import org.apache.activemq.artemis.api.core.SimpleString; +import org.apache.activemq.artemis.api.core.TransportConfiguration; +import org.apache.activemq.artemis.api.core.client.ActiveMQClient; import org.apache.activemq.artemis.api.core.client.ClientConsumer; import org.apache.activemq.artemis.api.core.client.ClientMessage; import org.apache.activemq.artemis.api.core.client.ClientProducer; @@ -37,10 +43,6 @@ import org.junit.Assert; import org.junit.Before; import org.junit.Test; -import javax.transaction.xa.XAException; -import javax.transaction.xa.XAResource; -import javax.transaction.xa.Xid; - public class DuplicateDetectionTest extends ActiveMQTestBase { private static final IntegrationTestLogger log = IntegrationTestLogger.LOGGER; @@ -213,6 +215,75 @@ public class DuplicateDetectionTest extends ActiveMQTestBase { Assert.assertEquals(0, ((PostOfficeImpl) server.getPostOffice()).getDuplicateIDCaches().size()); } + // It is important to test the shrink with this rule + // because we could have this after crashes + // we would eventually have a higher number of caches while we couldn't have time to clear previous ones + @Test + public void testShrinkCache() throws Exception { + server.stop(); + server.getConfiguration().setIDCacheSize(150); + server.start(); + + final int TEST_SIZE = 200; + + ServerLocator locator = ActiveMQClient.createServerLocatorWithoutHA(new TransportConfiguration(INVM_CONNECTOR_FACTORY)); + + locator.setBlockOnNonDurableSend(true); + + ClientSessionFactory sf = createSessionFactory(locator); + + ClientSession session = sf.createSession(false, true, true); + + session.start(); + + final SimpleString queueName = new SimpleString("DuplicateDetectionTestQueue"); + session.createQueue(queueName, queueName, null, true); + + ClientProducer producer = session.createProducer(queueName); + + for (int i = 0; i < TEST_SIZE; i++) { + ClientMessage message = session.createMessage(true); + message.putStringProperty(Message.HDR_DUPLICATE_DETECTION_ID, SimpleString.toSimpleString("DUPL-" + i)); + producer.send(message); + } + session.commit(); + + sf.close(); + session.close(); + locator.close(); + + server.stop(); + + server.getConfiguration().setIDCacheSize(100); + + server.start(); + + locator = ActiveMQClient.createServerLocatorWithoutHA(new TransportConfiguration(INVM_CONNECTOR_FACTORY)); + + locator.setBlockOnNonDurableSend(true); + sf = createSessionFactory(locator); + session = sf.createSession(false, false, false); + session.start(); + + producer = session.createProducer(queueName); + + // will send the last 50 again + for (int i = TEST_SIZE - 50; i < TEST_SIZE; i++) { + ClientMessage message = session.createMessage(true); + message.putStringProperty(Message.HDR_DUPLICATE_DETECTION_ID, SimpleString.toSimpleString("DUPL-" + i)); + producer.send(message); + } + + try { + session.commit(); + Assert.fail("Exception expected"); + } + catch (ActiveMQException expected) { + + } + + } + @Test public void testSimpleDuplicateDetectionWithString() throws Exception { ClientSession session = sf.createSession(false, true, true); @@ -1240,176 +1311,6 @@ public class DuplicateDetectionTest extends ActiveMQTestBase { } @Test - public void testDuplicateCachePersistedRestartWithSmallerCache() throws Exception { - server.stop(); - - final int initialCacheSize = 10; - final int subsequentCacheSize = 5; - - config = createDefaultInVMConfig().setIDCacheSize(initialCacheSize); - - server = createServer(config); - - server.start(); - - sf = createSessionFactory(locator); - - ClientSession session = sf.createSession(false, true, true); - - session.start(); - - final SimpleString queueName = new SimpleString("DuplicateDetectionTestQueue"); - - session.createQueue(queueName, queueName, null, false); - - ClientProducer producer = session.createProducer(queueName); - - ClientConsumer consumer = session.createConsumer(queueName); - - for (int i = 0; i < initialCacheSize; i++) { - ClientMessage message = createMessage(session, i); - SimpleString dupID = new SimpleString("abcdefg" + i); - message.putBytesProperty(Message.HDR_DUPLICATE_DETECTION_ID, dupID.getData()); - producer.send(message); - ClientMessage message2 = consumer.receive(1000); - Assert.assertEquals(i, message2.getObjectProperty(propKey)); - } - - session.close(); - - sf.close(); - - server.stop(); - - waitForServerToStop(server); - - config.setIDCacheSize(subsequentCacheSize); - - server = createServer(config); - - server.start(); - - sf = createSessionFactory(locator); - - session = sf.createSession(false, true, true); - - session.start(); - - session.createQueue(queueName, queueName, null, false); - - producer = session.createProducer(queueName); - - consumer = session.createConsumer(queueName); - - for (int i = 0; i < initialCacheSize; i++) { - ClientMessage message = createMessage(session, i); - SimpleString dupID = new SimpleString("abcdefg" + i); - message.putBytesProperty(Message.HDR_DUPLICATE_DETECTION_ID, dupID.getData()); - producer.send(message); - if (i >= subsequentCacheSize) { - // Message should get through - ClientMessage message2 = consumer.receive(1000); - Assert.assertEquals(i, message2.getObjectProperty(propKey)); - } - else { - ClientMessage message2 = consumer.receiveImmediate(); - Assert.assertNull(message2); - } - } - } - - @Test - public void testDuplicateCachePersistedRestartWithSmallerCacheEnsureDeleted() throws Exception { - server.stop(); - - final int initialCacheSize = 10; - final int subsequentCacheSize = 5; - - config = createDefaultInVMConfig().setIDCacheSize(initialCacheSize); - - server = createServer(config); - - server.start(); - - sf = createSessionFactory(locator); - - ClientSession session = sf.createSession(false, true, true); - - session.start(); - - final SimpleString queueName = new SimpleString("DuplicateDetectionTestQueue"); - - session.createQueue(queueName, queueName, null, false); - - ClientProducer producer = session.createProducer(queueName); - - ClientConsumer consumer = session.createConsumer(queueName); - - for (int i = 0; i < initialCacheSize; i++) { - ClientMessage message = createMessage(session, i); - SimpleString dupID = new SimpleString("abcdefg" + i); - message.putBytesProperty(Message.HDR_DUPLICATE_DETECTION_ID, dupID.getData()); - producer.send(message); - ClientMessage message2 = consumer.receive(1000); - Assert.assertEquals(i, message2.getObjectProperty(propKey)); - } - - session.close(); - - sf.close(); - - server.stop(); - - waitForServerToStop(server); - - config.setIDCacheSize(subsequentCacheSize); - - server = createServer(config); - - server.start(); - - // Now stop and set back to original cache size and restart - - server.stop(); - - waitForServerToStop(server); - - config.setIDCacheSize(initialCacheSize); - - server = createServer(config); - - server.start(); - - sf = createSessionFactory(locator); - - session = sf.createSession(false, true, true); - - session.start(); - - session.createQueue(queueName, queueName, null, false); - - producer = session.createProducer(queueName); - - consumer = session.createConsumer(queueName); - - for (int i = 0; i < initialCacheSize; i++) { - ClientMessage message = createMessage(session, i); - SimpleString dupID = new SimpleString("abcdefg" + i); - message.putBytesProperty(Message.HDR_DUPLICATE_DETECTION_ID, dupID.getData()); - producer.send(message); - if (i >= subsequentCacheSize) { - // Message should get through - ClientMessage message2 = consumer.receive(1000); - Assert.assertEquals(i, message2.getObjectProperty(propKey)); - } - else { - ClientMessage message2 = consumer.receiveImmediate(); - Assert.assertNull(message2); - } - } - } - - @Test public void testNoPersist() throws Exception { server.stop(); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/96849a42/tests/performance-tests/src/test/java/org/apache/activemq/artemis/tests/performance/storage/PersistMultiThreadTest.java ---------------------------------------------------------------------- diff --git a/tests/performance-tests/src/test/java/org/apache/activemq/artemis/tests/performance/storage/PersistMultiThreadTest.java b/tests/performance-tests/src/test/java/org/apache/activemq/artemis/tests/performance/storage/PersistMultiThreadTest.java index 6351357..6244330 100644 --- a/tests/performance-tests/src/test/java/org/apache/activemq/artemis/tests/performance/storage/PersistMultiThreadTest.java +++ b/tests/performance-tests/src/test/java/org/apache/activemq/artemis/tests/performance/storage/PersistMultiThreadTest.java @@ -257,6 +257,10 @@ public class PersistMultiThreadTest extends ActiveMQTestBase { } @Override + public void criticalException(Throwable e) { + } + + @Override public int getNumberOfPages() { return 0; } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/96849a42/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/paging/impl/PagingStoreImplTest.java ---------------------------------------------------------------------- diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/paging/impl/PagingStoreImplTest.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/paging/impl/PagingStoreImplTest.java index 59d2646..5f02cf9 100644 --- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/paging/impl/PagingStoreImplTest.java +++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/paging/impl/PagingStoreImplTest.java @@ -782,6 +782,10 @@ public class PagingStoreImplTest extends ActiveMQTestBase { static final class FakeStoreFactory implements PagingStoreFactory { + @Override + public void criticalException(Throwable e) { + } + final SequentialFileFactory factory; public FakeStoreFactory() { http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/96849a42/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/DuplicateDetectionUnitTest.java ---------------------------------------------------------------------- diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/DuplicateDetectionUnitTest.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/DuplicateDetectionUnitTest.java index 30e302e..c47041a 100644 --- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/DuplicateDetectionUnitTest.java +++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/DuplicateDetectionUnitTest.java @@ -106,7 +106,7 @@ public class DuplicateDetectionUnitTest extends ActiveMQTestBase { DuplicateIDCacheImpl cacheID = new DuplicateIDCacheImpl(ADDRESS, 10, journal, true); for (int i = 0; i < 100; i++) { - cacheID.addToCache(RandomUtil.randomBytes(), null); + cacheID.addToCache(RandomUtil.randomBytes()); } journal.stop(); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/96849a42/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/fakes/FakeConsumer.java ---------------------------------------------------------------------- diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/fakes/FakeConsumer.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/fakes/FakeConsumer.java index a7be2fa..81015e4 100644 --- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/fakes/FakeConsumer.java +++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/fakes/FakeConsumer.java @@ -93,41 +93,47 @@ public class FakeConsumer implements Consumer { @Override public synchronized HandleStatus handle(final MessageReference reference) { - if (statusToReturn == HandleStatus.BUSY) { - return HandleStatus.BUSY; - } - - if (filter != null) { - if (filter.match(reference.getMessage())) { - references.addLast(reference); - reference.getQueue().referenceHandled(); - notify(); - - return HandleStatus.HANDLED; + try { + if (statusToReturn == HandleStatus.BUSY) { + return HandleStatus.BUSY; } - else { - return HandleStatus.NO_MATCH; + + if (filter != null) { + if (filter.match(reference.getMessage())) { + references.addLast(reference); + reference.getQueue().referenceHandled(); + notify(); + + return HandleStatus.HANDLED; + } + else { + return HandleStatus.NO_MATCH; + } } - } - if (newStatus != null) { - if (delayCountdown == 0) { - statusToReturn = newStatus; + if (newStatus != null) { + if (delayCountdown == 0) { + statusToReturn = newStatus; - newStatus = null; + newStatus = null; + } + else { + delayCountdown--; + } } - else { - delayCountdown--; + + if (statusToReturn == HandleStatus.HANDLED) { + reference.getQueue().referenceHandled(); + references.addLast(reference); + notify(); } - } - if (statusToReturn == HandleStatus.HANDLED) { - reference.getQueue().referenceHandled(); - references.addLast(reference); - notify(); + return statusToReturn; + } + catch (Exception e) { + e.printStackTrace(); + throw new IllegalStateException(e.getMessage(), e); } - - return statusToReturn; } @Override http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/96849a42/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/fakes/FakePostOffice.java ---------------------------------------------------------------------- diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/fakes/FakePostOffice.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/fakes/FakePostOffice.java index 27d9c33..4f8a007 100644 --- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/fakes/FakePostOffice.java +++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/fakes/FakePostOffice.java @@ -113,8 +113,7 @@ public class FakePostOffice implements PostOffice { } @Override - public Binding removeBinding(final SimpleString uniqueName, final Transaction tx) throws Exception { - + public Binding removeBinding(SimpleString uniqueName, Transaction tx, boolean deleteData) throws Exception { return null; }