activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From clebertsuco...@apache.org
Subject [activemq-artemis] 01/03: ARTEMIS-2900 Expose property (getWholeMessageSize) so users can intercept size of messages and large messages
Date Mon, 14 Sep 2020 19:35:56 GMT
This is an automated email from the ASF dual-hosted git repository.

clebertsuconic pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git

commit 7cf5289efa011493a3a69a96d1293609204018d6
Author: Clebert Suconic <clebertsuconic@apache.org>
AuthorDate: Mon Sep 14 10:07:56 2020 -0400

    ARTEMIS-2900 Expose property (getWholeMessageSize) so users can intercept size of messages
and large messages
---
 .../apache/activemq/artemis/api/core/Message.java  |  10 +
 .../protocol/amqp/broker/AMQPLargeMessage.java     |  22 ++
 .../impl/journal/LargeServerMessageImpl.java       |  11 +
 .../largemessages/AMQPLargeMessageSizeTest.java    | 239 +++++++++++++++++++++
 4 files changed, 282 insertions(+)

diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java
index 4ade8bb..2d3c548 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java
@@ -656,6 +656,16 @@ public interface Message {
    int getEncodeSize();
 
    /**
+    * Return an estimate of the size of the message on the wire.
+    * for LargeMessages this will contain whatever is needed to encode properties and the
body size of large messages.
+    * For AMQP this will return the whole body size of the message as the body will contain
all the data including properties.
+    * @return
+    */
+   default long getWholeMessageSize() {
+      return getEncodeSize();
+   }
+
+   /**
     * Returns all the names of the properties for this message.
     */
    Set<SimpleString> getPropertyNames();
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPLargeMessage.java
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPLargeMessage.java
index d37888f..d540de3 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPLargeMessage.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPLargeMessage.java
@@ -104,6 +104,18 @@ public class AMQPLargeMessage extends AMQPMessage implements LargeServerMessage
       this.storageManager = storageManager;
    }
 
+   public AMQPLargeMessage(long id,
+                           long messageFormat,
+                           TypedProperties extraProperties,
+                           CoreMessageObjectPools coreMessageObjectPools,
+                           StorageManager storageManager,
+                           LargeBody largeBody) {
+      super(messageFormat, extraProperties, coreMessageObjectPools);
+      this.setMessageID(id);
+      this.largeBody = largeBody;
+      this.storageManager = storageManager;
+   }
+
    /**
     * Copy constructor
     */
@@ -374,6 +386,16 @@ public class AMQPLargeMessage extends AMQPMessage implements LargeServerMessage
       return 0;
    }
 
+   @Override
+   public long getWholeMessageSize() {
+      try {
+         return largeBody.getBodySize();
+      } catch (Exception e) {
+         logger.warn(e.getMessage());
+         return -1;
+      }
+   }
+
 
    @Override
    public int getMemoryEstimate() {
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeServerMessageImpl.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeServerMessageImpl.java
index f931121..3fadb16 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeServerMessageImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeServerMessageImpl.java
@@ -189,6 +189,17 @@ public final class LargeServerMessageImpl extends CoreMessage implements
CoreLar
       }
    }
 
+
+   @Override
+   public long getWholeMessageSize() {
+      try {
+         return getEncodeSize() + largeBody.getBodySize();
+      } catch (Exception e) {
+         logger.warn(e.getMessage(), e);
+         return getEncodeSize();
+      }
+   }
+
    public void encode(final ActiveMQBuffer buffer1) {
       super.encodeHeadersAndProperties(buffer1.byteBuf());
    }
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/largemessages/AMQPLargeMessageSizeTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/largemessages/AMQPLargeMessageSizeTest.java
new file mode 100644
index 0000000..81ddfc3
--- /dev/null
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/largemessages/AMQPLargeMessageSizeTest.java
@@ -0,0 +1,239 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.amqp.largemessages;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
+import org.apache.activemq.artemis.api.core.ActiveMQException;
+import org.apache.activemq.artemis.core.io.IOCallback;
+import org.apache.activemq.artemis.core.io.SequentialFile;
+import org.apache.activemq.artemis.core.io.buffer.TimedBuffer;
+import org.apache.activemq.artemis.core.journal.EncodingSupport;
+import org.apache.activemq.artemis.core.persistence.impl.journal.LargeBody;
+import org.apache.activemq.artemis.core.persistence.impl.journal.LargeServerMessageImpl;
+import org.apache.activemq.artemis.core.persistence.impl.nullpm.NullStorageManager;
+import org.apache.activemq.artemis.protocol.amqp.broker.AMQPLargeMessage;
+import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class AMQPLargeMessageSizeTest extends ActiveMQTestBase {
+
+   @Test
+   public void testAMQPMockLargeMessageSize() throws Exception {
+      AMQPLargeMessage amqpLargeMessage = new AMQPLargeMessage(1, 0, null, null, null, new
MockLargeBody(123456));
+      Assert.assertTrue(amqpLargeMessage.getWholeMessageSize() >= 123456);
+   }
+
+   @Test
+   public void testCoreMockLargeMessageSize() throws Exception {
+      LargeServerMessageImpl largeServerMessage = new LargeServerMessageImpl((byte) 0, 1,
new MockSM(123456), null);
+      Assert.assertTrue(largeServerMessage.getWholeMessageSize() >= 123456);
+   }
+
+   private static class MockSM extends NullStorageManager {
+
+      final long size;
+
+      MockSM(long size) {
+         this.size = size;
+      }
+
+      @Override
+      public SequentialFile createFileForLargeMessage(long messageID, boolean durable) {
+         return new MockFileSize(size);
+      }
+
+      @Override
+      public SequentialFile createFileForLargeMessage(long messageID, LargeMessageExtension
extension) {
+         return new MockFileSize(size);
+      }
+   }
+
+   private static class MockFileSize implements SequentialFile {
+
+      final long size;
+
+      MockFileSize(long size) {
+         this.size = size;
+      }
+
+      @Override
+      public boolean isOpen() {
+         return false;
+      }
+
+      @Override
+      public boolean exists() {
+         return false;
+      }
+
+      @Override
+      public void open() throws Exception {
+
+      }
+
+      @Override
+      public void open(int maxIO, boolean useExecutor) throws Exception {
+
+      }
+
+      @Override
+      public ByteBuffer map(int position, long size) throws IOException {
+         return null;
+      }
+
+      @Override
+      public boolean fits(int size) {
+         return false;
+      }
+
+      @Override
+      public int calculateBlockStart(int position) throws Exception {
+         return 0;
+      }
+
+      @Override
+      public String getFileName() {
+         return null;
+      }
+
+      @Override
+      public void fill(int size) throws Exception {
+
+      }
+
+      @Override
+      public void delete() throws IOException, InterruptedException, ActiveMQException {
+
+      }
+
+      @Override
+      public void write(ActiveMQBuffer bytes, boolean sync, IOCallback callback) throws Exception
{
+
+      }
+
+      @Override
+      public void write(ActiveMQBuffer bytes, boolean sync) throws Exception {
+
+      }
+
+      @Override
+      public void write(EncodingSupport bytes, boolean sync, IOCallback callback) throws
Exception {
+
+      }
+
+      @Override
+      public void write(EncodingSupport bytes, boolean sync) throws Exception {
+
+      }
+
+      @Override
+      public void writeDirect(ByteBuffer bytes, boolean sync, IOCallback callback) {
+
+      }
+
+      @Override
+      public void writeDirect(ByteBuffer bytes, boolean sync) throws Exception {
+
+      }
+
+      @Override
+      public void blockingWriteDirect(ByteBuffer bytes, boolean sync, boolean releaseBuffer)
throws Exception {
+
+      }
+
+      @Override
+      public int read(ByteBuffer bytes, IOCallback callback) throws Exception {
+         return 0;
+      }
+
+      @Override
+      public int read(ByteBuffer bytes) throws Exception {
+         return 0;
+      }
+
+      @Override
+      public void position(long pos) throws IOException {
+
+      }
+
+      @Override
+      public long position() {
+         return 0;
+      }
+
+      @Override
+      public void close() throws Exception {
+
+      }
+
+      @Override
+      public void sync() throws IOException {
+
+      }
+
+      @Override
+      public long size() throws Exception {
+         return size;
+      }
+
+      @Override
+      public void renameTo(String newFileName) throws Exception {
+
+      }
+
+      @Override
+      public SequentialFile cloneFile() {
+         return null;
+      }
+
+      @Override
+      public void copyTo(SequentialFile newFileName) throws Exception {
+
+      }
+
+      @Override
+      public void setTimedBuffer(TimedBuffer buffer) {
+
+      }
+
+      @Override
+      public File getJavaFile() {
+         return null;
+      }
+   }
+
+   private static class MockLargeBody extends LargeBody {
+
+      final long size;
+
+      MockLargeBody(long size) {
+         super(null, null);
+         this.size = size;
+      }
+
+      @Override
+      public long getBodySize() {
+         return size;
+      }
+   }
+
+}


Mime
View raw message