activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gtu...@apache.org
Subject svn commit: r1021466 - in /activemq/trunk/activemq-core/src: main/java/org/apache/activemq/broker/region/ main/java/org/apache/activemq/broker/region/policy/ main/java/org/apache/activemq/command/ test/java/org/apache/activemq/bugs/
Date Mon, 11 Oct 2010 19:06:03 GMT
Author: gtully
Date: Mon Oct 11 19:06:02 2010
New Revision: 1021466

URL: http://svn.apache.org/viewvc?rev=1021466&view=rev
Log:
resolve https://issues.apache.org/activemq/browse/AMQ-2103 and https://issues.apache.org/activemq/browse/AMQ-2966,
implement fix for 2103 using boolean policy for queues named reduceMemoryFootprint, when set,
after a message is persisted, the internal state is cleared. This works at a natural sync
point in the broker and avoids contention. The contention to marshall with the original patch
results in AMQ-2966

Added:
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2103Test.java 
 (with props)
Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ActiveMQMapMessage.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ActiveMQObjectMessage.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ActiveMQTextMessage.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/Message.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java?rev=1021466&r1=1021465&r2=1021466&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java
Mon Oct 11 19:06:02 2010
@@ -86,6 +86,7 @@ public abstract class BaseDestination im
     private long inactiveTimoutBeforeGC = DEFAULT_INACTIVE_TIMEOUT_BEFORE_GC;
     private boolean gcIfInactive;
     private long lastActiveTime=0l;
+    private boolean reduceMemoryFootprint = false;
 
     /**
      * @param broker
@@ -662,5 +663,12 @@ public abstract class BaseDestination im
         }
         return result;
     }
-    
+
+    public void setReduceMemoryFootprint(boolean reduceMemoryFootprint) {
+        this.reduceMemoryFootprint = reduceMemoryFootprint;
+    }
+
+    protected boolean isReduceMemoryFootprint() {
+        return this.reduceMemoryFootprint;
+    }
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java?rev=1021466&r1=1021465&r2=1021466&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
Mon Oct 11 19:06:02 2010
@@ -668,6 +668,9 @@ public class Queue extends BaseDestinati
             if (store != null && message.isPersistent()) {        
                 message.getMessageId().setBrokerSequenceId(getDestinationSequenceId());
                 result = store.asyncAddQueueMessage(context, message);
+                if (isReduceMemoryFootprint()) {
+                    message.clearMarshalledState();
+                }
             }
             if (context.isInTransaction()) {
                 // If this is a transacted message.. increase the usage now so that

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java?rev=1021466&r1=1021465&r2=1021466&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java
Mon Oct 11 19:06:02 2010
@@ -91,6 +91,7 @@ public class PolicyEntry extends Destina
     private boolean allConsumersExclusiveByDefault;
     private boolean gcInactiveDestinations;
     private long inactiveTimoutBeforeGC = BaseDestination.DEFAULT_INACTIVE_TIMEOUT_BEFORE_GC;
+    private boolean reduceMemoryFootprint;
     
    
     public void configure(Broker broker,Queue queue) {
@@ -163,6 +164,7 @@ public class PolicyEntry extends Destina
         destination.setPrioritizedMessages(isPrioritizedMessages());
         destination.setGcIfInactive(isGcInactiveDestinations());
         destination.setInactiveTimoutBeforeGC(getInactiveTimoutBeforeGC());
+        destination.setReduceMemoryFootprint(isReduceMemoryFootprint());
     }
 
     public void configure(Broker broker, SystemUsage memoryManager, TopicSubscription subscription)
{
@@ -780,5 +782,12 @@ public class PolicyEntry extends Destina
     public void setInactiveTimoutBeforeGC(long inactiveTimoutBeforeGC) {
         this.inactiveTimoutBeforeGC = inactiveTimoutBeforeGC;
     }
+    
+    public boolean isReduceMemoryFootprint() {
+        return reduceMemoryFootprint;
+    }
 
+    public void setReduceMemoryFootprint(boolean reduceMemoryFootprint) {
+        this.reduceMemoryFootprint = reduceMemoryFootprint;
+    }
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ActiveMQMapMessage.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ActiveMQMapMessage.java?rev=1021466&r1=1021465&r2=1021466&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ActiveMQMapMessage.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ActiveMQMapMessage.java
Mon Oct 11 19:06:02 2010
@@ -118,6 +118,11 @@ public class ActiveMQMapMessage extends 
         storeContent();
     }
 
+    public void clearMarshalledState() throws JMSException {
+        super.clearMarshalledState();
+        map.clear();
+    }
+
     private void storeContent() {
         try {
             if (getContent() == null && !map.isEmpty()) {

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ActiveMQObjectMessage.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ActiveMQObjectMessage.java?rev=1021466&r1=1021465&r2=1021466&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ActiveMQObjectMessage.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ActiveMQObjectMessage.java
Mon Oct 11 19:06:02 2010
@@ -201,6 +201,11 @@ public class ActiveMQObjectMessage exten
         storeContent();
     }
 
+    public void clearMarshalledState() throws JMSException {
+        super.clearMarshalledState();
+        this.object = null;
+    }
+
     public void onMessageRolledBack() {
         super.onMessageRolledBack();
 

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ActiveMQTextMessage.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ActiveMQTextMessage.java?rev=1021466&r1=1021465&r2=1021466&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ActiveMQTextMessage.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ActiveMQTextMessage.java
Mon Oct 11 19:06:02 2010
@@ -121,14 +121,13 @@ public class ActiveMQTextMessage extends
         }
     }
 
-    @Override
-    public void afterMarshall(WireFormat wireFormat) throws IOException {
-        super.afterMarshall(wireFormat);
-        //see https://issues.apache.org/activemq/browse/AMQ-2103
-        // and https://issues.apache.org/activemq/browse/AMQ-2966
-        this.text=null;
+    // see https://issues.apache.org/activemq/browse/AMQ-2103
+    // and https://issues.apache.org/activemq/browse/AMQ-2966
+    public void clearMarshalledState() throws JMSException {
+        super.clearMarshalledState();
+        this.text = null;
     }
-    
+
     /**
      * Clears out the message body. Clearing a message's body does not clear its
      * header values or property entries. <p/>

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/Message.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/Message.java?rev=1021466&r1=1021465&r2=1021466&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/Message.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/Message.java Mon
Oct 11 19:06:02 2010
@@ -94,6 +94,11 @@ public abstract class Message extends Ba
     public abstract Message copy();
     public abstract void clearBody() throws JMSException;
 
+    // useful to reduce the memory footprint of a persisted message
+    public void clearMarshalledState() throws JMSException {
+        properties = null;
+    }
+
     protected void copy(Message copy) {
         super.copy(copy);
         copy.producerId = producerId;

Added: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2103Test.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2103Test.java?rev=1021466&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2103Test.java (added)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2103Test.java Mon
Oct 11 19:06:02 2010
@@ -0,0 +1,129 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.bugs;
+
+import javax.jms.Connection;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import junit.framework.Test;
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.ActiveMQSession;
+import org.apache.activemq.broker.BrokerTestSupport;
+import org.apache.activemq.broker.region.policy.PolicyEntry;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ActiveMQMapMessage;
+import org.apache.activemq.command.ActiveMQObjectMessage;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.command.ActiveMQTextMessage;
+import org.apache.activemq.usecases.MyObject;
+
+public class AMQ2103Test extends BrokerTestSupport {
+    static PolicyEntry reduceMemoryFootprint = new PolicyEntry();
+    static {
+        reduceMemoryFootprint.setReduceMemoryFootprint(true);
+    }
+
+    public PolicyEntry defaultPolicy = reduceMemoryFootprint;
+
+    @Override
+    protected PolicyEntry getDefaultPolicy() {
+        return defaultPolicy;
+    }
+
+    public void initCombosForTestVerifyMarshalledStateIsCleared() throws Exception {
+        addCombinationValues("defaultPolicy", new Object[]{defaultPolicy, null});    
+    }
+
+    public static Test suite() {
+        return suite(AMQ2103Test.class);
+    }
+
+    /**
+     * use mem persistence so no marshaling,
+     * reduceMemoryFootprint on/off that will reduce memory by whacking the marshaled state
+     * With vm transport and deferred serialisation and no persistence (mem persistence),
+     * we see the message as sent by the client so we can validate the contents against
+     * the policy
+     * @throws Exception
+     */
+    public void testVerifyMarshalledStateIsCleared() throws Exception {
+
+        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("vm://localhost");
+        factory.setOptimizedMessageDispatch(true);
+        factory.setObjectMessageSerializationDefered(true);
+        factory.setCopyMessageOnSend(false);
+
+        Connection connection = factory.createConnection();
+        Session session = (ActiveMQSession)connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        ActiveMQDestination destination = new ActiveMQQueue("testQ");
+		MessageConsumer consumer = session.createConsumer(destination);
+		connection.start();
+
+        MessageProducer producer = session.createProducer(destination);
+        final MyObject obj = new MyObject("A message");
+        ActiveMQObjectMessage m1 = (ActiveMQObjectMessage)session.createObjectMessage();
+        m1.setObject(obj);
+        producer.send(m1);
+
+        ActiveMQTextMessage m2 = new ActiveMQTextMessage();
+        m2.setText("Test Message Payload.");
+        producer.send(m2);
+
+        ActiveMQMapMessage m3 = new ActiveMQMapMessage();
+        m3.setString("text", "my message");
+        producer.send(m3);
+
+        Message m = consumer.receive(maxWait);
+        assertNotNull(m);
+        assertEquals(m1.getMessageId().toString(), m.getJMSMessageID());
+        assertTrue(m instanceof ActiveMQObjectMessage);
+
+        if (getDefaultPolicy() != null) {
+            assertNull("object data cleared by reduceMemoryFootprint (and never marshalled
as using mem persistence)",
+                ((ActiveMQObjectMessage)m).getObject());
+        }
+
+        // verify no serialisation via vm transport
+        assertEquals("writeObject called", 0, obj.getWriteObjectCalled());
+        assertEquals("readObject called", 0, obj.getReadObjectCalled());
+        assertEquals("readObjectNoData called", 0, obj.getReadObjectNoDataCalled());
+
+        m = consumer.receive(maxWait);
+        assertNotNull(m);
+        assertEquals(m2.getMessageId().toString(), m.getJMSMessageID());
+        assertTrue(m instanceof ActiveMQTextMessage);
+
+        if (getDefaultPolicy() != null) {
+            assertNull("text cleared by reduceMemoryFootprint (and never marshalled as using
mem persistence)",
+                ((ActiveMQTextMessage)m).getText());
+        }
+
+        m = consumer.receive(maxWait);
+        assertNotNull(m);
+        assertEquals(m3.getMessageId().toString(), m.getJMSMessageID());
+        assertTrue(m instanceof ActiveMQMapMessage);
+
+        if (getDefaultPolicy() != null) {
+            assertNull("text cleared by reduceMemoryFootprint (and never marshalled as using
mem persistence)",
+                ((ActiveMQMapMessage)m).getStringProperty("text"));
+        }
+
+        connection.close();
+    }
+}

Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2103Test.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2103Test.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date



Mime
View raw message