activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From clebertsuco...@apache.org
Subject [2/2] activemq-artemis git commit: ARTEMIS-1650 Improve paged message acknowledge
Date Thu, 08 Feb 2018 14:15:41 GMT
ARTEMIS-1650 Improve paged message acknowledge

Cache `messageID`, `transactionID` and `isLargeMessage`
in PagedReference, so that when acknowledge, we do not have to
get PagedMessage which may be GCed and cause re-read entire page.


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/822445a7
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/822445a7
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/822445a7

Branch: refs/heads/master
Commit: 822445a717f943c64a84b2ac6a0af8ace9e5cd23
Parents: 33b265c
Author: huaishk <shoukunhuai@gmail.com>
Authored: Wed Jan 31 09:48:43 2018 +0800
Committer: Clebert Suconic <clebertsuconic@apache.org>
Committed: Thu Feb 8 09:12:57 2018 -0500

----------------------------------------------------------------------
 .../core/paging/cursor/PagedReference.java      |  4 ++++
 .../core/paging/cursor/PagedReferenceImpl.java  | 24 ++++++++++++++++++++
 .../cursor/impl/PageSubscriptionImpl.java       |  4 ++--
 .../artemis/core/server/MessageReference.java   |  2 ++
 .../core/server/impl/LastValueQueue.java        |  5 ++++
 .../core/server/impl/MessageReferenceImpl.java  |  5 ++++
 .../artemis/core/server/impl/RefsOperation.java |  5 +++-
 .../core/server/impl/ServerConsumerImpl.java    |  2 +-
 8 files changed, 47 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/822445a7/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PagedReference.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PagedReference.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PagedReference.java
index c1ff089..be2d042 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PagedReference.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PagedReference.java
@@ -24,4 +24,8 @@ public interface PagedReference extends MessageReference {
    PagePosition getPosition();
 
    PagedMessage getPagedMessage();
+
+   boolean isLargeMessage();
+
+   long getTransactionID();
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/822445a7/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PagedReferenceImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PagedReferenceImpl.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PagedReferenceImpl.java
index 7189007..42c5423 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PagedReferenceImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PagedReferenceImpl.java
@@ -53,6 +53,12 @@ public class PagedReferenceImpl extends LinkedListImpl.Node<PagedReferenceImpl>
 
    private Object protocolData;
 
+   private final boolean largeMessage;
+
+   private final long transactionID;
+
+   private final long messageID;
+
    @Override
    public Object getProtocolData() {
       return protocolData;
@@ -95,6 +101,9 @@ public class PagedReferenceImpl extends LinkedListImpl.Node<PagedReferenceImpl>
       this.position = position;
       this.message = new WeakReference<>(message);
       this.subscription = subscription;
+      this.largeMessage = message.getMessage().isLargeMessage();
+      this.transactionID = message.getTransactionID();
+      this.messageID = message.getMessage().getMessageID();
    }
 
    @Override
@@ -256,4 +265,19 @@ public class PagedReferenceImpl extends LinkedListImpl.Node<PagedReferenceImpl>
       return this.consumerId;
    }
 
+   @Override
+   public boolean isLargeMessage() {
+      return largeMessage;
+   }
+
+   @Override
+   public long getTransactionID() {
+      return transactionID;
+   }
+
+   @Override
+   public long getMessageID() {
+      return messageID;
+   }
+
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/822445a7/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageSubscriptionImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageSubscriptionImpl.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageSubscriptionImpl.java
index a674935..24c69be 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageSubscriptionImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageSubscriptionImpl.java
@@ -849,8 +849,8 @@ final class PageSubscriptionImpl implements PageSubscription {
    }
 
    private PageTransactionInfo getPageTransaction(final PagedReference reference) throws
ActiveMQException {
-      if (reference.getPagedMessage().getTransactionID() >= 0) {
-         return pageStore.getPagingManager().getTransaction(reference.getPagedMessage().getTransactionID());
+      if (reference.getTransactionID() >= 0) {
+         return pageStore.getPagingManager().getTransaction(reference.getTransactionID());
       } else {
          return null;
       }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/822445a7/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/MessageReference.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/MessageReference.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/MessageReference.java
index 799b0b0..906ea7e 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/MessageReference.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/MessageReference.java
@@ -38,6 +38,8 @@ public interface MessageReference {
 
    Message getMessage();
 
+   long getMessageID();
+
    /**
     * We define this method aggregation here because on paging we need to hold the original
estimate,
     * so we need to perform some extra steps on paging.

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/822445a7/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java
index 7aada5e..90b8814 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java
@@ -238,6 +238,11 @@ public class LastValueQueue extends QueueImpl {
       }
 
       @Override
+      public long getMessageID() {
+         return getMessage().getMessageID();
+      }
+
+      @Override
       public Queue getQueue() {
          return ref.getQueue();
       }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/822445a7/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/MessageReferenceImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/MessageReferenceImpl.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/MessageReferenceImpl.java
index 1b434bc..7543ba5 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/MessageReferenceImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/MessageReferenceImpl.java
@@ -147,6 +147,11 @@ public class MessageReferenceImpl extends LinkedListImpl.Node<MessageReferenceIm
    }
 
    @Override
+   public long getMessageID() {
+      return getMessage().getMessageID();
+   }
+
+   @Override
    public Queue getQueue() {
       return queue;
    }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/822445a7/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/RefsOperation.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/RefsOperation.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/RefsOperation.java
index 6bf69ed..e492985 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/RefsOperation.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/RefsOperation.java
@@ -24,6 +24,7 @@ import java.util.Map;
 
 import org.apache.activemq.artemis.api.core.Message;
 import org.apache.activemq.artemis.core.paging.cursor.NonExistentPage;
+import org.apache.activemq.artemis.core.paging.cursor.PagedReference;
 import org.apache.activemq.artemis.core.persistence.StorageManager;
 import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
 import org.apache.activemq.artemis.core.server.MessageReference;
@@ -159,7 +160,9 @@ public class RefsOperation extends TransactionOperationAbstract {
 
       if (pagedMessagesToPostACK != null) {
          for (MessageReference refmsg : pagedMessagesToPostACK) {
-            decrementRefCount(refmsg);
+            if (((PagedReference) refmsg).isLargeMessage()) {
+               decrementRefCount(refmsg);
+            }
          }
       }
    }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/822445a7/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
index 45dd05c..95d613e 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
@@ -867,7 +867,7 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener
{
 
             acks++;
          }
-         while (ref.getMessage().getMessageID() != messageID);
+         while (ref.getMessageID() != messageID);
 
          if (startedTransaction) {
             tx.commit();


Mime
View raw message