activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From clebertsuco...@apache.org
Subject [1/2] activemq-artemis git commit: ARTEMIS-1221 Duplicated ID causes LargeMessage lost at backup
Date Thu, 29 Jun 2017 16:55:20 GMT
Repository: activemq-artemis
Updated Branches:
  refs/heads/master 70f6a2956 -> aa932141f


ARTEMIS-1221 Duplicated ID causes LargeMessage lost at backup

When a large message is replicated to backup, a pendingID is generated
when the large message is finished. This pendingID is generated by a
BatchingIDGenerator at backup.

It is possible that a pendingID generated at backup may be a duplicate
to an ID generated at live server.

This can cause a problem when a large message with a messageID that is
the same as another largemessage's pendingID is replicated and stored
in the backup's journal, and then a deleteRecord for the pendingID
is appended. If backup becomes live and loads the journal, it will
drop the large message add record because there is a deleteRecord of
the same ID (even though it is a pendingID of another message).
As a result the expecting client will never get this large message.

So in summary, the root cause is that the pendingIDs for large
messages are generated at backup while backup is not alive.

The solution to this is that instead of the backup generating
the pendingID, we make them all be generated in advance
at live server and let them replicated to backup whereever needed.
The ID generater at backup only works when backup becomes live
(when it is properly initialized from journal).


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/d50f577c
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/d50f577c
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/d50f577c

Branch: refs/heads/master
Commit: d50f577cd50df37634f592db65200861fe3e13d3
Parents: 70f6a29
Author: Howard Gao <howard.gao@gmail.com>
Authored: Thu Jun 29 00:03:47 2017 +0800
Committer: Howard Gao <howard.gao@gmail.com>
Committed: Thu Jun 29 00:03:47 2017 +0800

----------------------------------------------------------------------
 .../journal/AbstractJournalStorageManager.java  |   2 +-
 .../impl/journal/JournalStorageManager.java     |  19 ++-
 .../impl/journal/LargeServerMessageImpl.java    |   2 +-
 .../impl/journal/LargeServerMessageInSync.java  |  10 ++
 .../ReplicationLargeMessageEndMessage.java      |  19 ++-
 .../replication/ReplicatedLargeMessage.java     |   4 +
 .../core/replication/ReplicationEndpoint.java   |   1 +
 .../core/replication/ReplicationManager.java    |   7 +-
 .../artemis/core/server/LargeServerMessage.java |   6 +-
 .../failover/FailoverTestWithDivert.java        | 148 +++++++++++++++++++
 .../replication/ReplicationTest.java            |   4 +-
 11 files changed, 203 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/d50f577c/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java
index 4ba8e82..0eb1dc3 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java
@@ -1565,7 +1565,7 @@ public abstract class AbstractJournalStorageManager implements StorageManager
{
       if (largeServerMessage.getPendingRecordID() >= 0) {
          try {
             confirmPendingLargeMessage(largeServerMessage.getPendingRecordID());
-            largeServerMessage.setPendingRecordID(-1);
+            largeServerMessage.setPendingRecordID(LargeServerMessage.NO_PENDING_ID);
          } catch (Exception e) {
             ActiveMQServerLogger.LOGGER.warn(e.getMessage(), e);
          }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/d50f577c/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java
index 803c441..ba7bb86 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java
@@ -274,7 +274,7 @@ public class JournalStorageManager extends AbstractJournalStorageManager
{
             ActiveMQServerLogger.LOGGER.journalErrorDeletingMessage(e, largeMsgId);
          }
          if (replicator != null) {
-            replicator.largeMessageDelete(largeMsgId);
+            replicator.largeMessageDelete(largeMsgId, JournalStorageManager.this);
          }
       }
       largeMessagesToDelete.clear();
@@ -375,10 +375,17 @@ public class JournalStorageManager extends AbstractJournalStorageManager
{
       journalFF.releaseBuffer(buffer);
    }
 
-   public long storePendingLargeMessage(final long messageID) throws Exception {
+   public long storePendingLargeMessage(final long messageID, long recordID) throws Exception
{
       readLock();
       try {
-         long recordID = generateID();
+         if (recordID == LargeServerMessage.NO_PENDING_ID) {
+            recordID = generateID();
+         } else {
+            //this means the large message doesn't
+            //have a pendingRecordID, but one has been
+            //generated (coming from live server) for use.
+            recordID = -recordID;
+         }
 
          messageJournal.appendAddRecord(recordID, JournalRecordIds.ADD_LARGE_MESSAGE_PENDING,
new PendingLargeMessageEncoding(messageID), true, getContext(true));
 
@@ -396,7 +403,7 @@ public class JournalStorageManager extends AbstractJournalStorageManager
{
             // And the client won't be waiting for the actual file to be deleted.
             // We set a temporary record (short lived) on the journal
             // to avoid a situation where the server is restarted and pending large message
stays on forever
-            largeServerMessage.setPendingRecordID(storePendingLargeMessage(largeServerMessage.getMessageID()));
+            largeServerMessage.setPendingRecordID(storePendingLargeMessage(largeServerMessage.getMessageID(),
largeServerMessage.getPendingRecordID()));
          } catch (Exception e) {
             throw new ActiveMQInternalErrorException(e.getMessage(), e);
          }
@@ -427,7 +434,7 @@ public class JournalStorageManager extends AbstractJournalStorageManager
{
                readLock();
                try {
                   if (replicator != null) {
-                     replicator.largeMessageDelete(largeServerMessage.getMessageID());
+                     replicator.largeMessageDelete(largeServerMessage.getMessageID(), JournalStorageManager.this);
                   }
                   file.delete();
 
@@ -475,7 +482,7 @@ public class JournalStorageManager extends AbstractJournalStorageManager
{
 
          if (largeMessage.isDurable()) {
             // We store a marker on the journal that the large file is pending
-            long pendingRecordID = storePendingLargeMessage(id);
+            long pendingRecordID = storePendingLargeMessage(id, LargeServerMessage.NO_PENDING_ID);
 
             largeMessage.setPendingRecordID(pendingRecordID);
          }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/d50f577c/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeServerMessageImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeServerMessageImpl.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeServerMessageImpl.java
index 79c1a85..b8236af 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeServerMessageImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeServerMessageImpl.java
@@ -42,7 +42,7 @@ public final class LargeServerMessageImpl extends CoreMessage implements
LargeSe
 
    private final JournalStorageManager storageManager;
 
-   private long pendingRecordID = -1;
+   private long pendingRecordID = NO_PENDING_ID;
 
    private boolean paged;
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/d50f577c/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeServerMessageInSync.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeServerMessageInSync.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeServerMessageInSync.java
index 42126d4..66ccd8c 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeServerMessageInSync.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeServerMessageInSync.java
@@ -158,4 +158,14 @@ public final class LargeServerMessageInSync implements ReplicatedLargeMessage
{
       storageManager.addBytesToLargeMessage(appendFile, mainLM.getMessageID(), bytes);
    }
 
+   @Override
+   public void setPendingRecordID(long pendingRecordID) {
+      mainLM.setPendingRecordID(pendingRecordID);
+   }
+
+   @Override
+   public long getPendingRecordID() {
+      return mainLM.getPendingRecordID();
+   }
+
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/d50f577c/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationLargeMessageEndMessage.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationLargeMessageEndMessage.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationLargeMessageEndMessage.java
index 4a09cc0..a9be86a 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationLargeMessageEndMessage.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationLargeMessageEndMessage.java
@@ -23,31 +23,40 @@ import org.apache.activemq.artemis.utils.DataConstants;
 public class ReplicationLargeMessageEndMessage extends PacketImpl {
 
    long messageId;
+   long pendingRecordId;
 
    public ReplicationLargeMessageEndMessage() {
       super(PacketImpl.REPLICATION_LARGE_MESSAGE_END);
    }
 
-   public ReplicationLargeMessageEndMessage(final long messageId) {
+   public ReplicationLargeMessageEndMessage(final long messageId, final long pendingRecordId)
{
       this();
       this.messageId = messageId;
+      //we use negative value to indicate that this id is pre-generated by live node
+      //so that it won't be generated at backup.
+      //see https://issues.apache.org/jira/browse/ARTEMIS-1221
+      this.pendingRecordId = -pendingRecordId;
    }
 
-
    @Override
    public int expectedEncodeSize() {
       return PACKET_HEADERS_SIZE +
-         DataConstants.SIZE_LONG; // buffer.writeLong(messageId);
+         DataConstants.SIZE_LONG + // buffer.writeLong(messageId)
+         DataConstants.SIZE_LONG; // buffer.writeLong(pendingRecordId);
    }
 
    @Override
    public void encodeRest(final ActiveMQBuffer buffer) {
       buffer.writeLong(messageId);
+      buffer.writeLong(pendingRecordId);
    }
 
    @Override
    public void decodeRest(final ActiveMQBuffer buffer) {
       messageId = buffer.readLong();
+      if (buffer.readableBytes() >= DataConstants.SIZE_LONG) {
+         pendingRecordId = buffer.readLong();
+      }
    }
 
    /**
@@ -85,4 +94,8 @@ public class ReplicationLargeMessageEndMessage extends PacketImpl {
          return false;
       return true;
    }
+
+   public long getPendingRecordId() {
+      return pendingRecordId;
+   }
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/d50f577c/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicatedLargeMessage.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicatedLargeMessage.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicatedLargeMessage.java
index 3b6327a..b744805 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicatedLargeMessage.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicatedLargeMessage.java
@@ -52,4 +52,8 @@ public interface ReplicatedLargeMessage {
     */
    void addBytes(byte[] body) throws Exception;
 
+   void setPendingRecordID(long pendingRecordID);
+
+   long getPendingRecordID();
+
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/d50f577c/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationEndpoint.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationEndpoint.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationEndpoint.java
index 6f899f3..f879aeb 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationEndpoint.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationEndpoint.java
@@ -479,6 +479,7 @@ public final class ReplicationEndpoint implements ChannelHandler, ActiveMQCompon
       }
       final ReplicatedLargeMessage message = lookupLargeMessage(packet.getMessageId(), true,
false);
       if (message != null) {
+         message.setPendingRecordID(packet.getPendingRecordId());
          executor.execute(new Runnable() {
             @Override
             public void run() {

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/d50f577c/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationManager.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationManager.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationManager.java
index 8a624ce..398f452 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationManager.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationManager.java
@@ -43,6 +43,7 @@ import org.apache.activemq.artemis.core.paging.PagedMessage;
 import org.apache.activemq.artemis.core.persistence.OperationContext;
 import org.apache.activemq.artemis.core.persistence.Persister;
 import org.apache.activemq.artemis.core.persistence.impl.journal.AbstractJournalStorageManager;
+import org.apache.activemq.artemis.core.persistence.impl.journal.JournalStorageManager;
 import org.apache.activemq.artemis.core.persistence.impl.journal.OperationContextImpl;
 import org.apache.activemq.artemis.core.protocol.core.Channel;
 import org.apache.activemq.artemis.core.protocol.core.ChannelHandler;
@@ -241,9 +242,11 @@ public final class ReplicationManager implements ActiveMQComponent {
       }
    }
 
-   public void largeMessageDelete(final Long messageId) {
+   //we pass in storageManager to generate ID only if enabled
+   public void largeMessageDelete(final Long messageId, JournalStorageManager storageManager)
{
       if (enabled) {
-         sendReplicatePacket(new ReplicationLargeMessageEndMessage(messageId));
+         long pendingRecordID = storageManager.generateID();
+         sendReplicatePacket(new ReplicationLargeMessageEndMessage(messageId, pendingRecordID));
       }
    }
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/d50f577c/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/LargeServerMessage.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/LargeServerMessage.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/LargeServerMessage.java
index 6fcc802..a80e369 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/LargeServerMessage.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/LargeServerMessage.java
@@ -23,13 +23,11 @@ import org.apache.activemq.artemis.core.replication.ReplicatedLargeMessage;
 
 public interface LargeServerMessage extends ReplicatedLargeMessage, ICoreMessage {
 
+   long NO_PENDING_ID = -1;
+
    @Override
    void addBytes(byte[] bytes) throws Exception;
 
-   void setPendingRecordID(long pendingRecordID);
-
-   long getPendingRecordID();
-
    /**
     * We have to copy the large message content in case of DLQ and paged messages
     * For that we need to pre-mark the LargeMessage with a flag when it is paged

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/d50f577c/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/FailoverTestWithDivert.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/FailoverTestWithDivert.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/FailoverTestWithDivert.java
new file mode 100644
index 0000000..76efc22
--- /dev/null
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/FailoverTestWithDivert.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.cluster.failover;
+
+import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
+import org.apache.activemq.artemis.api.core.RoutingType;
+import org.apache.activemq.artemis.api.core.TransportConfiguration;
+import org.apache.activemq.artemis.api.core.client.ActiveMQClient;
+import org.apache.activemq.artemis.api.core.client.ClientConsumer;
+import org.apache.activemq.artemis.api.core.client.ClientMessage;
+import org.apache.activemq.artemis.api.core.client.ClientProducer;
+import org.apache.activemq.artemis.api.core.client.ClientSession;
+import org.apache.activemq.artemis.api.core.client.ServerLocator;
+import org.apache.activemq.artemis.core.client.impl.ClientSessionFactoryInternal;
+import org.apache.activemq.artemis.core.config.Configuration;
+import org.apache.activemq.artemis.core.config.CoreAddressConfiguration;
+import org.apache.activemq.artemis.core.config.CoreQueueConfiguration;
+import org.apache.activemq.artemis.core.config.DivertConfiguration;
+import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class FailoverTestWithDivert extends FailoverTestBase {
+
+   private static final String DIVERT_ADDRESS = "jms.queue.testQueue";
+   private static final String DIVERT_FORWARD_ADDRESS = "jms.queue.divertedQueue";
+   private ClientSessionFactoryInternal sf;
+
+   @Before
+   @Override
+   public void setUp() throws Exception {
+      super.setUp();
+   }
+
+   @Override
+   protected TransportConfiguration getAcceptorTransportConfiguration(final boolean live)
{
+      return getNettyAcceptorTransportConfiguration(live);
+   }
+
+   @Override
+   protected TransportConfiguration getConnectorTransportConfiguration(final boolean live)
{
+      return getNettyConnectorTransportConfiguration(live);
+   }
+
+   @Override
+   protected void createConfigs() throws Exception {
+      createReplicatedConfigs();
+
+      liveConfig.setJournalFileSize(10240000);
+      backupConfig.setJournalFileSize(10240000);
+      addQueue(liveConfig, DIVERT_ADDRESS, DIVERT_ADDRESS);
+      addQueue(liveConfig, DIVERT_FORWARD_ADDRESS, DIVERT_FORWARD_ADDRESS);
+      addDivert(liveConfig, DIVERT_ADDRESS, DIVERT_FORWARD_ADDRESS, false);
+      addDivert(backupConfig, DIVERT_ADDRESS, DIVERT_FORWARD_ADDRESS, false);
+   }
+
+   private void addQueue(Configuration serverConfig, String address, String name) {
+
+      List<CoreAddressConfiguration> addrConfigs = serverConfig.getAddressConfigurations();
+      CoreAddressConfiguration addrCfg = new CoreAddressConfiguration();
+      addrCfg.setName(address);
+      addrCfg.addRoutingType(RoutingType.ANYCAST);
+      CoreQueueConfiguration qConfig = new CoreQueueConfiguration();
+      qConfig.setName(name);
+      qConfig.setAddress(address);
+      addrCfg.addQueueConfiguration(qConfig);
+      addrConfigs.add(addrCfg);
+   }
+
+   private void addDivert(Configuration serverConfig, String source, String target, boolean
exclusive) {
+      List<DivertConfiguration> divertConfigs = serverConfig.getDivertConfigurations();
+      DivertConfiguration newDivert = new DivertConfiguration();
+      newDivert.setName("myDivert");
+      newDivert.setAddress(source);
+      newDivert.setForwardingAddress(target);
+      newDivert.setExclusive(exclusive);
+      divertConfigs.add(newDivert);
+   }
+
+   @Test
+   public void testUniqueIDsWithDivert() throws Exception {
+      Map<String, Object> params = new HashMap<>();
+      params.put(TransportConstants.HOST_PROP_NAME, "localhost");
+      TransportConfiguration tc = createTransportConfiguration(true, false, params);
+      ServerLocator locator = addServerLocator(ActiveMQClient.createServerLocatorWithHA(tc)).setBlockOnNonDurableSend(true).setBlockOnDurableSend(true).setBlockOnAcknowledge(true).setReconnectAttempts(-1);
+      sf = createSessionFactoryAndWaitForTopology(locator, 2);
+      int minLarge = locator.getMinLargeMessageSize();
+
+      ClientSession session = sf.createSession(false, false);
+      addClientSession(session);
+      session.start();
+
+      final int num = 100;
+      ClientProducer producer = session.createProducer(DIVERT_ADDRESS);
+      for (int i = 0; i < num; i++) {
+         ClientMessage message = createLargeMessage(session, 2 * minLarge);
+         producer.send(message);
+      }
+      session.commit();
+
+      ClientConsumer consumer = session.createConsumer(DIVERT_ADDRESS);
+      for (int i = 0;  i < num; i++) {
+         ClientMessage receivedFromSourceQueue = consumer.receive(5000);
+         assertNotNull(receivedFromSourceQueue);
+         receivedFromSourceQueue.acknowledge();
+      }
+      session.commit();
+
+      crash(session);
+
+      ClientConsumer consumer1 = session.createConsumer(DIVERT_FORWARD_ADDRESS);
+      for (int i = 0; i < num; i++) {
+         ClientMessage receivedFromTargetQueue = consumer1.receive(5000);
+         assertNotNull(receivedFromTargetQueue);
+         receivedFromTargetQueue.acknowledge();
+      }
+      session.commit();
+   }
+
+   private ClientMessage createLargeMessage(ClientSession session, final int largeSize) {
+      ClientMessage message = session.createMessage(true);
+      ActiveMQBuffer bodyBuffer = message.getBodyBuffer();
+      final int propSize = 10240;
+      while (bodyBuffer.writerIndex() < largeSize) {
+         byte[] prop = new byte[propSize];
+         bodyBuffer.writeBytes(prop);
+      }
+      return message;
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/d50f577c/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/replication/ReplicationTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/replication/ReplicationTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/replication/ReplicationTest.java
index ab32517..d67b980 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/replication/ReplicationTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/replication/ReplicationTest.java
@@ -204,7 +204,7 @@ public final class ReplicationTest extends ActiveMQTestBase {
    public void testSendPackets() throws Exception {
       setupServer(true);
 
-      StorageManager storage = getStorage();
+      JournalStorageManager storage = getStorage();
 
       manager = liveServer.getReplicationManager();
       waitForComponent(manager);
@@ -270,7 +270,7 @@ public final class ReplicationTest extends ActiveMQTestBase {
 
       manager.largeMessageWrite(500, new byte[1024]);
 
-      manager.largeMessageDelete(Long.valueOf(500));
+      manager.largeMessageDelete(Long.valueOf(500), storage);
 
       blockOnReplication(storage, manager);
 


Mime
View raw message