activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From clebertsuco...@apache.org
Subject [1/3] activemq-artemis git commit: ARTEMIS-465 Create LM on write packet in ReplicationEndpoint
Date Tue, 05 Apr 2016 03:20:18 GMT
Repository: activemq-artemis
Updated Branches:
  refs/heads/master b16b864f0 -> 315c5ffe8


ARTEMIS-465 Create LM on write packet in ReplicationEndpoint

If a LM write packet is received from the live assume that the large
message exists and create a local reference.

Old behavour would reject the packet which could lead to loss of data on
failover see JIRA.


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/e2c6d0b7
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/e2c6d0b7
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/e2c6d0b7

Branch: refs/heads/master
Commit: e2c6d0b7f775da430501aa7230fc25d5dd2f62df
Parents: 7da22ff
Author: Martyn Taylor <mtaylor@redhat.com>
Authored: Fri Apr 1 23:13:04 2016 +0100
Committer: Clebert Suconic <clebertsuconic@apache.org>
Committed: Mon Apr 4 23:15:54 2016 -0400

----------------------------------------------------------------------
 .../impl/journal/JournalStorageManager.java       |  4 ++++
 .../impl/journal/LargeServerMessageImpl.java      |  2 +-
 .../core/replication/ReplicationEndpoint.java     | 18 ++++++++++++------
 3 files changed, 17 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e2c6d0b7/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java
index 8807f4e..7deebcb 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java
@@ -438,6 +438,10 @@ public class JournalStorageManager extends AbstractJournalStorageManager
{
 
          largeMessage.setMessageID(id);
 
+         // We do this here to avoid a case where the replication gets a list without this
file
+         // to avoid a race
+         largeMessage.validateFile();
+
          if (largeMessage.isDurable()) {
             // We store a marker on the journal that the large file is pending
             long pendingRecordID = storePendingLargeMessage(id);

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e2c6d0b7/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeServerMessageImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeServerMessageImpl.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeServerMessageImpl.java
index 906cbd3..c24924a 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeServerMessageImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeServerMessageImpl.java
@@ -341,7 +341,7 @@ public final class LargeServerMessageImpl extends ServerMessageImpl implements
L
 
    // Private -------------------------------------------------------
 
-   private synchronized void validateFile() throws ActiveMQException {
+   public synchronized void validateFile() throws ActiveMQException {
       try {
          if (file == null) {
             if (messageID <= 0) {

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e2c6d0b7/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationEndpoint.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationEndpoint.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationEndpoint.java
index 5b6dbfb..4cc937c 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationEndpoint.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationEndpoint.java
@@ -435,7 +435,7 @@ public final class ReplicationEndpoint implements ChannelHandler, ActiveMQCompon
       SequentialFile channel1;
       switch (msg.getFileType()) {
          case LARGE_MESSAGE: {
-            ReplicatedLargeMessage largeMessage = lookupLargeMessage(id, false);
+            ReplicatedLargeMessage largeMessage = lookupLargeMessage(id, false, false);
             if (!(largeMessage instanceof LargeServerMessageInSync)) {
                ActiveMQServerLogger.LOGGER.largeMessageIncompatible();
                return;
@@ -536,7 +536,7 @@ public final class ReplicationEndpoint implements ChannelHandler, ActiveMQCompon
    }
 
    private void handleLargeMessageEnd(final ReplicationLargeMessageEndMessage packet) {
-      final ReplicatedLargeMessage message = lookupLargeMessage(packet.getMessageId(), true);
+      final ReplicatedLargeMessage message = lookupLargeMessage(packet.getMessageId(), true,
false);
       if (message != null) {
          executor.execute(new Runnable() {
             @Override
@@ -556,13 +556,13 @@ public final class ReplicationEndpoint implements ChannelHandler, ActiveMQCompon
     * @param packet
     */
    private void handleLargeMessageWrite(final ReplicationLargeMessageWriteMessage packet)
throws Exception {
-      ReplicatedLargeMessage message = lookupLargeMessage(packet.getMessageId(), false);
+      ReplicatedLargeMessage message = lookupLargeMessage(packet.getMessageId(), false, true);
       if (message != null) {
          message.addBytes(packet.getBody());
       }
    }
 
-   private ReplicatedLargeMessage lookupLargeMessage(final long messageId, final boolean
delete) {
+   private ReplicatedLargeMessage lookupLargeMessage(final long messageId, final boolean
delete, final boolean createIfNotExists) {
       ReplicatedLargeMessage message;
 
       if (delete) {
@@ -571,8 +571,14 @@ public final class ReplicationEndpoint implements ChannelHandler, ActiveMQCompon
       else {
          message = largeMessages.get(messageId);
          if (message == null) {
-            // No warnings if it's a delete, as duplicate deletes may be sent repeatedly.
-            ActiveMQServerLogger.LOGGER.largeMessageNotAvailable(messageId);
+            if (createIfNotExists) {
+               createLargeMessage(messageId, false);
+               message = largeMessages.get(messageId);
+            }
+            else {
+               // No warnings if it's a delete, as duplicate deletes may be sent repeatedly.
+               ActiveMQServerLogger.LOGGER.largeMessageNotAvailable(messageId);
+            }
          }
       }
 


Mime
View raw message