activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tab...@apache.org
Subject svn commit: r1189980 - in /activemq/trunk/activemq-core/src: main/java/org/apache/activemq/broker/util/TimeStampingBrokerPlugin.java test/java/org/apache/activemq/broker/util/TimeStampingBrokerPluginTest.java
Date Thu, 27 Oct 2011 19:58:44 GMT
Author: tabish
Date: Thu Oct 27 19:58:42 2011
New Revision: 1189980

URL: http://svn.apache.org/viewvc?rev=1189980&view=rev
Log:
Apply patch for https://issues.apache.org/jira/browse/AMQ-3541

Added:
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/util/TimeStampingBrokerPluginTest.java
  (with props)
Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/util/TimeStampingBrokerPlugin.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/util/TimeStampingBrokerPlugin.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/util/TimeStampingBrokerPlugin.java?rev=1189980&r1=1189979&r2=1189980&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/util/TimeStampingBrokerPlugin.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/util/TimeStampingBrokerPlugin.java
Thu Oct 27 19:58:42 2011
@@ -18,6 +18,9 @@ package org.apache.activemq.broker.util;
 
 import org.apache.activemq.broker.BrokerPluginSupport;
 import org.apache.activemq.broker.ProducerBrokerExchange;
+import org.apache.activemq.broker.region.policy.DeadLetterStrategy;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ActiveMQMessage;
 import org.apache.activemq.command.Message;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -26,12 +29,12 @@ import org.slf4j.LoggerFactory;
  * A Broker interceptor which updates a JMS Client's timestamp on the message
  * with a broker timestamp. Useful when the clocks on client machines are known
  * to not be correct and you can only trust the time set on the broker machines.
- * 
+ *
  * Enabling this plugin will break JMS compliance since the timestamp that the
  * producer sees on the messages after as send() will be different from the
  * timestamp the consumer will observe when he receives the message. This plugin
  * is not enabled in the default ActiveMQ configuration.
- * 
+ *
  * 2 new attributes have been added which will allow the administrator some override control
  * over the expiration time for incoming messages:
  *
@@ -41,38 +44,38 @@ import org.slf4j.LoggerFactory;
  * Attribute 'ttlCeiling' can be used to apply a limit to the expiration time
  *
  * @org.apache.xbean.XBean element="timeStampingBrokerPlugin"
- * 
- * 
+ *
+ *
  */
 public class TimeStampingBrokerPlugin extends BrokerPluginSupport {
     private static final Logger LOG = LoggerFactory.getLogger(TimeStampingBrokerPlugin.class);
-    /** 
+    /**
     * variable which (when non-zero) is used to override
     * the expiration date for messages that arrive with
     * no expiration date set (in Milliseconds).
     */
     long zeroExpirationOverride = 0;
 
-    /** 
+    /**
     * variable which (when non-zero) is used to limit
-    * the expiration date (in Milliseconds).  
+    * the expiration date (in Milliseconds).
     */
     long ttlCeiling = 0;
-    
+
     /**
      * If true, the plugin will not update timestamp to past values
      * False by default
      */
     boolean futureOnly = false;
-    
-    
+
+
     /**
      * if true, update timestamp even if message has passed through a network
      * default false
      */
     boolean processNetworkMessages = false;
 
-    /** 
+    /**
     * setter method for zeroExpirationOverride
     */
     public void setZeroExpirationOverride(long ttl)
@@ -80,7 +83,7 @@ public class TimeStampingBrokerPlugin ex
         this.zeroExpirationOverride = ttl;
     }
 
-    /** 
+    /**
     * setter method for ttlCeiling
     */
     public void setTtlCeiling(long ttlCeiling)
@@ -88,19 +91,21 @@ public class TimeStampingBrokerPlugin ex
         this.ttlCeiling = ttlCeiling;
     }
 
-	public void setFutureOnly(boolean futureOnly) {
-		this.futureOnly = futureOnly;
-	}
-	
-	public void setProcessNetworkMessages(Boolean processNetworkMessages) {
-	    this.processNetworkMessages = processNetworkMessages;
-	}
+    public void setFutureOnly(boolean futureOnly) {
+        this.futureOnly = futureOnly;
+    }
+
+    public void setProcessNetworkMessages(Boolean processNetworkMessages) {
+        this.processNetworkMessages = processNetworkMessages;
+    }
 
-	@Override
+    @Override
     public void send(ProducerBrokerExchange producerExchange, Message message) throws Exception
{
-        if (message.getTimestamp() > 0
-            && (processNetworkMessages || (message.getBrokerPath() == null || message.getBrokerPath().length
== 0))) {
+
+        if (message.getTimestamp() > 0 && !isDestinationDLQ(message) &&
+           (processNetworkMessages || (message.getBrokerPath() == null || message.getBrokerPath().length
== 0))) {
             // timestamp not been disabled and has not passed through a network or processNetworkMessages=true
+
             long oldExpiration = message.getExpiration();
             long newTimeStamp = System.currentTimeMillis();
             long timeToLive = zeroExpirationOverride;
@@ -112,17 +117,40 @@ public class TimeStampingBrokerPlugin ex
                 timeToLive = ttlCeiling;
             }
             long expiration = timeToLive + newTimeStamp;
-			//In the scenario that the Broker is behind the clients we never want to set the Timestamp
and Expiration in the past 
-			if(!futureOnly || (expiration > oldExpiration)) {
-				if (timeToLive > 0 && expiration > 0) {
-					message.setExpiration(expiration);
-				}
-				message.setTimestamp(newTimeStamp);
-				if (LOG.isDebugEnabled()) {
-				    LOG.debug("Set message " + message.getMessageId() + " timestamp from " + oldTimestamp
+ " to " + newTimeStamp);
-				}
-			}
+            // In the scenario that the Broker is behind the clients we never want to set
the
+            // Timestamp and Expiration in the past
+            if(!futureOnly || (expiration > oldExpiration)) {
+                if (timeToLive > 0 && expiration > 0) {
+                    message.setExpiration(expiration);
+                }
+                message.setTimestamp(newTimeStamp);
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("Set message " + message.getMessageId() + " timestamp from
" + oldTimestamp + " to " + newTimeStamp);
+                }
+            }
         }
         super.send(producerExchange, message);
     }
+
+    private boolean isDestinationDLQ(Message message) {
+        DeadLetterStrategy deadLetterStrategy;
+        Message tmp;
+
+        if (message != null && message.getRegionDestination() != null) {
+            deadLetterStrategy = message.getRegionDestination().getDeadLetterStrategy();
+            if (deadLetterStrategy != null) {
+                // Cheap copy, since we only need two fields
+                tmp = new ActiveMQMessage();
+                tmp.setDestination(message.getOriginalDestination());
+                tmp.setRegionDestination(message.getRegionDestination());
+
+                // Determine if we are headed for a DLQ
+                ActiveMQDestination deadLetterDestination = deadLetterStrategy.getDeadLetterQueueFor(tmp,
null);
+                if (deadLetterDestination.equals(message.getDestination())) {
+                    return true;
+                }
+            }
+        }
+        return false;
+    }
 }

Added: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/util/TimeStampingBrokerPluginTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/util/TimeStampingBrokerPluginTest.java?rev=1189980&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/util/TimeStampingBrokerPluginTest.java
(added)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/util/TimeStampingBrokerPluginTest.java
Thu Oct 27 19:58:42 2011
@@ -0,0 +1,197 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.util;
+
+import javax.jms.Connection;
+import javax.jms.DeliveryMode;
+import javax.jms.Destination;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import junit.framework.TestCase;
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerPlugin;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.TransportConnector;
+import org.apache.activemq.broker.region.policy.DeadLetterStrategy;
+import org.apache.activemq.broker.region.policy.IndividualDeadLetterStrategy;
+import org.apache.activemq.broker.region.policy.PolicyEntry;
+import org.apache.activemq.broker.region.policy.PolicyMap;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TimeStampingBrokerPluginTest extends TestCase {
+
+	BrokerService broker;
+	TransportConnector tcpConnector;
+	MessageProducer producer;
+	MessageConsumer consumer;
+	Connection connection;
+	Session session;
+	Destination destination;
+	String queue = "TEST.FOO";
+	long expiry = 500;
+	
+	@Before
+	public void setUp() throws Exception {
+		TimeStampingBrokerPlugin tsbp = new TimeStampingBrokerPlugin();
+    	tsbp.setZeroExpirationOverride(expiry);
+    	tsbp.setTtlCeiling(expiry);
+    	
+        broker = new BrokerService();
+        broker.setPersistent(false);
+        broker.setUseJmx(true);
+        broker.setPlugins(new BrokerPlugin[] {tsbp});
+        tcpConnector = broker.addConnector("tcp://localhost:0");
+        
+        // Add policy and individual DLQ strategy
+        PolicyEntry policy = new PolicyEntry();
+        DeadLetterStrategy strategy = new IndividualDeadLetterStrategy();
+        strategy.setProcessExpired(true);
+        ((IndividualDeadLetterStrategy)strategy).setUseQueueForQueueMessages(true);
+        ((IndividualDeadLetterStrategy)strategy).setQueuePrefix("DLQ.");
+        strategy.setProcessNonPersistent(true);
+        policy.setDeadLetterStrategy(strategy);
+
+        PolicyMap pMap = new PolicyMap();
+        pMap.setDefaultEntry(policy);
+
+        broker.setDestinationPolicy(pMap);
+        
+        broker.start();
+        // Create a ConnectionFactory
+        ActiveMQConnectionFactory connectionFactory =
+            new ActiveMQConnectionFactory(tcpConnector.getConnectUri());
+
+        // Create a Connection
+        connection = connectionFactory.createConnection();
+        connection.start();
+
+        // Create a Session
+        session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+        // Create the destination Queue
+        destination = session.createQueue(queue);
+
+        // Create a MessageProducer from the Session to the Topic or Queue
+        producer = session.createProducer(destination);
+        producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
+	}
+	
+	@After
+	public void tearDown() throws Exception {
+	     // Clean up
+        producer.close();
+        consumer.close();
+        session.close();
+        connection.close();
+        broker.stop();
+	}
+	@Test
+    public void testExpirationSet() throws Exception {
+    	
+        // Create a messages
+        Message sentMessage = session.createMessage();
+
+        // Tell the producer to send the message
+        long beforeSend = System.currentTimeMillis();
+        producer.send(sentMessage);
+
+        // Create a MessageConsumer from the Session to the Topic or Queue
+        consumer = session.createConsumer(destination);
+
+        // Wait for a message
+        Message receivedMessage = consumer.receive(1000);
+
+        // assert we got the same message ID we sent
+        assertEquals(sentMessage.getJMSMessageID(), receivedMessage.getJMSMessageID());
+        
+        // assert message timestamp is in window
+        assertTrue("Expiration should be not null" + receivedMessage.getJMSExpiration() +
"\n", Long.valueOf(receivedMessage.getJMSExpiration()) != null);
+
+        // assert message expiration is in window
+        assertTrue("Before send: " + beforeSend + " Msg ts: " + receivedMessage.getJMSTimestamp()
+ " Msg Expiry: " + receivedMessage.getJMSExpiration(), beforeSend <= receivedMessage.getJMSExpiration()
&& receivedMessage.getJMSExpiration() <= (receivedMessage.getJMSTimestamp() + expiry));
+    }
+    @Test
+    public void testExpirationCelingSet() throws Exception {
+    	
+        // Create a messages
+        Message sentMessage = session.createMessage();
+        // Tell the producer to send the message
+        long beforeSend = System.currentTimeMillis();
+        long sendExpiry =  beforeSend + (expiry*22);
+        sentMessage.setJMSExpiration(sendExpiry);
+
+        producer.send(sentMessage);
+
+        // Create a MessageConsumer from the Session to the Topic or Queue
+        consumer = session.createConsumer(destination);
+
+        // Wait for a message
+        Message receivedMessage = consumer.receive(1000);
+
+        // assert we got the same message ID we sent
+        assertEquals(sentMessage.getJMSMessageID(), receivedMessage.getJMSMessageID());
+        
+        // assert message timestamp is in window
+        assertTrue("Expiration should be not null" + receivedMessage.getJMSExpiration() +
"\n", Long.valueOf(receivedMessage.getJMSExpiration()) != null);
+
+        // assert message expiration is in window
+        assertTrue("Sent expiry: " + sendExpiry + " Recv ts: " + receivedMessage.getJMSTimestamp()
+ " Recv expiry: " + receivedMessage.getJMSExpiration(), beforeSend <= receivedMessage.getJMSExpiration()
&& receivedMessage.getJMSExpiration() <= (receivedMessage.getJMSTimestamp() + expiry));
+    }
+    
+    @Test
+    public void testExpirationDLQ() throws Exception {
+    	
+        // Create a messages
+        Message sentMessage = session.createMessage();
+        // Tell the producer to send the message
+        long beforeSend = System.currentTimeMillis();
+        long sendExpiry =  beforeSend + expiry;
+        sentMessage.setJMSExpiration(sendExpiry);
+
+        producer.send(sentMessage);
+
+        // Create a MessageConsumer from the Session to the Topic or Queue
+        consumer = session.createConsumer(destination);
+
+        Thread.sleep(expiry+250);
+        
+        // Wait for a message
+        Message receivedMessage = consumer.receive(1000);
+
+        // Message should roll to DLQ
+        assertNull(receivedMessage);
+                
+        // Close old consumer, setup DLQ listener
+        consumer.close();
+        consumer = session.createConsumer(session.createQueue("DLQ."+queue));
+        
+        // Get mesage from DLQ
+        receivedMessage = consumer.receive(1000);
+
+        // assert we got the same message ID we sent
+        assertEquals(sentMessage.getJMSMessageID(), receivedMessage.getJMSMessageID());
+        
+        // assert message timestamp is in window
+        //System.out.println("Recv: " + receivedMessage.getJMSExpiration());
+        assertEquals("Expiration should be zero" + receivedMessage.getJMSExpiration() + "\n",
receivedMessage.getJMSExpiration(), 0);
+        
+    }
+}

Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/util/TimeStampingBrokerPluginTest.java
------------------------------------------------------------------------------
    svn:eol-style = native



Mime
View raw message