activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jstrac...@apache.org
Subject svn commit: r357732 - in /incubator/activemq/trunk/activemq-core/src: main/java/org/activemq/ main/java/org/activemq/broker/region/ main/java/org/activemq/broker/region/policy/ test/java/org/activemq/test/retroactive/ test/resources/org/activemq/test/r...
Date Mon, 19 Dec 2005 17:43:38 GMT
Author: jstrachan
Date: Mon Dec 19 09:43:22 2005
New Revision: 357732

URL: http://svn.apache.org/viewcvs?rev=357732&view=rev
Log:
* added test case to demonstrate query-based subscription recovery policy in action.
* minor refactor to the SubscriptionRecoveryPolicy API to make it easy to generate messages
from inside the recovery policy

Added:
    incubator/activemq/trunk/activemq-core/src/test/java/org/activemq/test/retroactive/DummyMessageQuery.java
  (with props)
    incubator/activemq/trunk/activemq-core/src/test/java/org/activemq/test/retroactive/RetroactiveConsumerWithMessageQueryTest.java
  (with props)
    incubator/activemq/trunk/activemq-core/src/test/resources/org/activemq/test/retroactive/activemq-message-query.xml
  (with props)
Modified:
    incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/ActiveMQMessageTransformation.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/broker/region/Topic.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/broker/region/policy/FixedSizedSubscriptionRecoveryPolicy.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/broker/region/policy/LastImageSubscriptionRecoveryPolicy.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/broker/region/policy/MessageQuery.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/broker/region/policy/NoSubscriptionRecoveryPolicy.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/broker/region/policy/QueryBasedSubscriptionRecoveryPolicy.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/broker/region/policy/SubscriptionRecoveryPolicy.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/broker/region/policy/TimedSubscriptionRecoveryPolicy.java

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/ActiveMQMessageTransformation.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/ActiveMQMessageTransformation.java?rev=357732&r1=357731&r2=357732&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/ActiveMQMessageTransformation.java
(original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/ActiveMQMessageTransformation.java
Mon Dec 19 09:43:22 2005
@@ -48,6 +48,11 @@
 import org.activemq.command.ActiveMQTextMessage;
 import org.activemq.command.ActiveMQTopic;
 
+/**
+ * A helper class for converting normal JMS interfaces into ActiveMQ specific ones.
+ * 
+ * @version $Revision: 1.1 $
+ */
 public class ActiveMQMessageTransformation {
 
 	/**

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/broker/region/Topic.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/broker/region/Topic.java?rev=357732&r1=357731&r2=357732&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/broker/region/Topic.java
(original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/broker/region/Topic.java
Mon Dec 19 09:43:22 2005
@@ -90,7 +90,7 @@
         }
         else {
             if (sub.getConsumerInfo().isRetroactive()) {
-                subscriptionRecoveryPolicy.recover(context, sub);
+                subscriptionRecoveryPolicy.recover(context, this, sub);
             }
             consumers.add(sub);
         }
@@ -272,8 +272,9 @@
         dispatchValve.increment();
         MessageEvaluationContext msgContext = context.getMessageEvaluationContext();
         try {
-
-            subscriptionRecoveryPolicy.add(context, message);
+            if (! subscriptionRecoveryPolicy.add(context, message)) {
+                return;
+            }
             if (consumers.isEmpty())
                 return;
 

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/broker/region/policy/FixedSizedSubscriptionRecoveryPolicy.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/broker/region/policy/FixedSizedSubscriptionRecoveryPolicy.java?rev=357732&r1=357731&r2=357732&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/broker/region/policy/FixedSizedSubscriptionRecoveryPolicy.java
(original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/broker/region/policy/FixedSizedSubscriptionRecoveryPolicy.java
Mon Dec 19 09:43:22 2005
@@ -24,6 +24,7 @@
 import org.activemq.broker.ConnectionContext;
 import org.activemq.broker.region.MessageReference;
 import org.activemq.broker.region.Subscription;
+import org.activemq.broker.region.Topic;
 import org.activemq.filter.MessageEvaluationContext;
 import org.activemq.memory.list.DestinationBasedMessageList;
 import org.activemq.memory.list.MessageList;
@@ -44,11 +45,12 @@
     private int maximumSize = 100 * 64 * 1024;
     private boolean useSharedBuffer = true;
 
-    public void add(ConnectionContext context, MessageReference message) throws Throwable
{
+    public boolean add(ConnectionContext context, MessageReference message) throws Throwable
{
         buffer.add(message);
+        return true;
     }
 
-    public void recover(ConnectionContext context, Subscription sub) throws Throwable {
+    public void recover(ConnectionContext context, Topic topic, Subscription sub) throws
Throwable {
         // Re-dispatch the messages from the buffer.
         List copy = buffer.getMessages(sub);
         if( !copy.isEmpty() ) {

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/broker/region/policy/LastImageSubscriptionRecoveryPolicy.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/broker/region/policy/LastImageSubscriptionRecoveryPolicy.java?rev=357732&r1=357731&r2=357732&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/broker/region/policy/LastImageSubscriptionRecoveryPolicy.java
(original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/broker/region/policy/LastImageSubscriptionRecoveryPolicy.java
Mon Dec 19 09:43:22 2005
@@ -21,6 +21,7 @@
 import org.activemq.broker.ConnectionContext;
 import org.activemq.broker.region.MessageReference;
 import org.activemq.broker.region.Subscription;
+import org.activemq.broker.region.Topic;
 import org.activemq.filter.MessageEvaluationContext;
 
 /**
@@ -35,11 +36,12 @@
 
     volatile private MessageReference lastImage;
 
-    public void add(ConnectionContext context, MessageReference node) throws Throwable {
+    public boolean add(ConnectionContext context, MessageReference node) throws Throwable
{
         lastImage = node;
+        return true;
     }
 
-    public void recover(ConnectionContext context, Subscription sub) throws Throwable {
+    public void recover(ConnectionContext context, Topic topic, Subscription sub) throws
Throwable {
         // Re-dispatch the last message seen.
         MessageReference node = lastImage;
         if( node != null ){

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/broker/region/policy/MessageQuery.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/broker/region/policy/MessageQuery.java?rev=357732&r1=357731&r2=357732&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/broker/region/policy/MessageQuery.java
(original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/broker/region/policy/MessageQuery.java
Mon Dec 19 09:43:22 2005
@@ -18,11 +18,12 @@
 package org.activemq.broker.region.policy;
 
 import org.activemq.command.ActiveMQDestination;
+import org.activemq.command.Message;
 
 import javax.jms.MessageListener;
 
 /**
- * Represents some kind of query which will load messages from some source.
+ * Represents some kind of query which will load initial messages from some source for a
new topic subscriber.
  * 
  * @version $Revision$
  */
@@ -34,6 +35,20 @@
      * @param destination the destination on which the query is to be performed
      * @param listener is the listener to notify as each message is created or loaded
      */
-    public void execute(ActiveMQDestination destination, MessageListener listener);
+    public void execute(ActiveMQDestination destination, MessageListener listener) throws
Exception;
+
+    /**
+     * Returns true if the given update is valid and does not overlap with the initial message
query.
+     * When performing an initial load from some source, there is a chance that an update
may occur which is logically before
+     * the message sent on the initial load - so this method provides a hook where the query
instance can keep track of the version IDs
+     * of the messages sent so that if an older version is sent as an update it can be excluded
to avoid going backwards in time.
+     * 
+     * e.g. if the execute() method creates version 2 of an object and then an update message
is sent for version 1, this method should return false to 
+     * hide the old update message.
+     * 
+     * @param message the update message which may have been sent before the query actually
completed
+     * @return true if the update message is valid otherwise false in which case the update
message will be discarded.
+     */
+    public boolean validateUpdate(Message message);
 
 }

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/broker/region/policy/NoSubscriptionRecoveryPolicy.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/broker/region/policy/NoSubscriptionRecoveryPolicy.java?rev=357732&r1=357731&r2=357732&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/broker/region/policy/NoSubscriptionRecoveryPolicy.java
(original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/broker/region/policy/NoSubscriptionRecoveryPolicy.java
Mon Dec 19 09:43:22 2005
@@ -21,6 +21,7 @@
 import org.activemq.broker.ConnectionContext;
 import org.activemq.broker.region.MessageReference;
 import org.activemq.broker.region.Subscription;
+import org.activemq.broker.region.Topic;
 
 /**
  * This is the default Topic recovery policy which does not recover any messages.
@@ -31,10 +32,11 @@
  */
 public class NoSubscriptionRecoveryPolicy implements SubscriptionRecoveryPolicy {
 
-    public void add(ConnectionContext context, MessageReference node) throws Throwable {
+    public boolean add(ConnectionContext context, MessageReference node) throws Throwable
{
+        return true;
     }
 
-    public void recover(ConnectionContext context, Subscription sub) throws Throwable {
+    public void recover(ConnectionContext context, Topic topic, Subscription sub) throws
Throwable {
     }
 
     public void start() throws Exception {

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/broker/region/policy/QueryBasedSubscriptionRecoveryPolicy.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/broker/region/policy/QueryBasedSubscriptionRecoveryPolicy.java?rev=357732&r1=357731&r2=357732&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/broker/region/policy/QueryBasedSubscriptionRecoveryPolicy.java
(original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/broker/region/policy/QueryBasedSubscriptionRecoveryPolicy.java
Mon Dec 19 09:43:22 2005
@@ -18,13 +18,22 @@
  **/
 package org.activemq.broker.region.policy;
 
+import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicLong;
+
 import org.activemq.ActiveMQMessageTransformation;
 import org.activemq.broker.ConnectionContext;
+import org.activemq.broker.region.Destination;
 import org.activemq.broker.region.MessageReference;
 import org.activemq.broker.region.Subscription;
+import org.activemq.broker.region.Topic;
 import org.activemq.command.ActiveMQDestination;
 import org.activemq.command.ActiveMQMessage;
+import org.activemq.command.ConnectionId;
+import org.activemq.command.MessageId;
+import org.activemq.command.ProducerId;
+import org.activemq.command.SessionId;
 import org.activemq.filter.MessageEvaluationContext;
+import org.activemq.util.IdGenerator;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
@@ -44,18 +53,25 @@
     private static final Log log = LogFactory.getLog(QueryBasedSubscriptionRecoveryPolicy.class);
 
     private MessageQuery query;
+    private AtomicLong messageSequence = new AtomicLong(0);
+    private IdGenerator idGenerator = new IdGenerator();
+    private ProducerId producerId = createProducerId();
+
+    public QueryBasedSubscriptionRecoveryPolicy() {
+    }
 
-    public void add(ConnectionContext context, MessageReference message) throws Throwable
{
+    public boolean add(ConnectionContext context, MessageReference message) throws Throwable
{
+        return query.validateUpdate(message.getMessage());
     }
 
-    public void recover(ConnectionContext context, final Subscription sub) throws Throwable
{
+    public void recover(ConnectionContext context, final Topic topic, final Subscription
sub) throws Throwable {
         if (query != null) {
             final MessageEvaluationContext msgContext = context.getMessageEvaluationContext();
             try {
                 ActiveMQDestination destination = sub.getConsumerInfo().getDestination();
                 query.execute(destination, new MessageListener() {
                     public void onMessage(Message message) {
-                        dispatchInitialMessage(message, msgContext, sub);
+                        dispatchInitialMessage(message, topic, msgContext, sub);
                     }
                 });
             }
@@ -66,7 +82,7 @@
     }
 
     public void start() throws Exception {
-        if (query != null) {
+        if (query == null) {
             throw new IllegalArgumentException("No query property configured");
         }
     }
@@ -85,10 +101,17 @@
         this.query = query;
     }
 
-    protected void dispatchInitialMessage(Message message, MessageEvaluationContext msgContext,
Subscription sub) {
+    protected void dispatchInitialMessage(Message message,  Destination regionDestination,
MessageEvaluationContext msgContext, Subscription sub) {
         try {
             ActiveMQMessage activeMessage = ActiveMQMessageTransformation.transformMessage(message,
null);
-            msgContext.setDestination(activeMessage.getDestination());
+            ActiveMQDestination destination = activeMessage.getDestination();
+            if (destination == null) {
+                destination = sub.getConsumerInfo().getDestination();
+                activeMessage.setDestination(destination);
+            }
+            activeMessage.setRegionDestination(regionDestination);
+            configure(activeMessage);
+            msgContext.setDestination(destination);
             msgContext.setMessageReference(activeMessage);
             if (sub.matches(activeMessage, msgContext)) {
                 sub.add(activeMessage);
@@ -97,5 +120,19 @@
         catch (Throwable e) {
             log.warn("Failed to dispatch initial message: " + message + " into subscription.
Reason: " + e, e);
         }
+    }
+
+    protected void configure(ActiveMQMessage msg) {
+        long sequenceNumber = messageSequence.incrementAndGet();
+        msg.setMessageId(new MessageId(producerId, sequenceNumber));
+        msg.onSend();
+        msg.setProducerId(producerId);
+    }
+
+    protected ProducerId createProducerId() {
+        String id = idGenerator.generateId();
+        ConnectionId connectionId = new ConnectionId(id);
+        SessionId sessionId = new SessionId(connectionId, 1);
+        return new ProducerId(sessionId, 1);
     }
 }

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/broker/region/policy/SubscriptionRecoveryPolicy.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/broker/region/policy/SubscriptionRecoveryPolicy.java?rev=357732&r1=357731&r2=357732&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/broker/region/policy/SubscriptionRecoveryPolicy.java
(original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/broker/region/policy/SubscriptionRecoveryPolicy.java
Mon Dec 19 09:43:22 2005
@@ -22,6 +22,7 @@
 import org.activemq.broker.ConnectionContext;
 import org.activemq.broker.region.MessageReference;
 import org.activemq.broker.region.Subscription;
+import org.activemq.broker.region.Topic;
 
 /**
  * Abstraction to allow different recovery policies to be plugged
@@ -37,17 +38,20 @@
      * 
      * @param context
      * @param node
+     * @return TODO
      * @throws Throwable
      */
-    void add(ConnectionContext context, MessageReference message) throws Throwable;
+    boolean add(ConnectionContext context, MessageReference message) throws Throwable;
     
     /**
      * Let a subscription recover message held by the policy.
      * 
      * @param context
+     * @param topic TODO
+     * @param topic 
      * @param node
      * @throws Throwable
      */
-    void recover(ConnectionContext context, Subscription sub) throws Throwable;
+    void recover(ConnectionContext context, Topic topic, Subscription sub) throws Throwable;
 
 }

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/broker/region/policy/TimedSubscriptionRecoveryPolicy.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/broker/region/policy/TimedSubscriptionRecoveryPolicy.java?rev=357732&r1=357731&r2=357732&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/broker/region/policy/TimedSubscriptionRecoveryPolicy.java
(original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/broker/region/policy/TimedSubscriptionRecoveryPolicy.java
Mon Dec 19 09:43:22 2005
@@ -27,6 +27,7 @@
 import org.activemq.broker.ConnectionContext;
 import org.activemq.broker.region.MessageReference;
 import org.activemq.broker.region.Subscription;
+import org.activemq.broker.region.Topic;
 import org.activemq.filter.MessageEvaluationContext;
 import org.activemq.thread.Scheduler;
 
@@ -66,11 +67,12 @@
         }
     };
 
-    public void add(ConnectionContext context, MessageReference message) throws Throwable
{
+    public boolean add(ConnectionContext context, MessageReference message) throws Throwable
{
         buffer.add(new TimestampWrapper(message, lastGCRun));
+        return true;
     }
 
-    public void recover(ConnectionContext context, Subscription sub) throws Throwable {
+    public void recover(ConnectionContext context, Topic topic, Subscription sub) throws
Throwable {
         
         // Re-dispatch the messages from the buffer.
         ArrayList copy = new ArrayList(buffer);

Added: incubator/activemq/trunk/activemq-core/src/test/java/org/activemq/test/retroactive/DummyMessageQuery.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/test/java/org/activemq/test/retroactive/DummyMessageQuery.java?rev=357732&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/test/java/org/activemq/test/retroactive/DummyMessageQuery.java
(added)
+++ incubator/activemq/trunk/activemq-core/src/test/java/org/activemq/test/retroactive/DummyMessageQuery.java
Mon Dec 19 09:43:22 2005
@@ -0,0 +1,47 @@
+/**
+ * 
+ * Copyright 2005 LogicBlaze, Inc. http://www.logicblaze.com
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License"); 
+ * you may not use this file except in compliance with the License. 
+ * You may obtain a copy of the License at 
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, 
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
+ * See the License for the specific language governing permissions and 
+ * limitations under the License. 
+ * 
+ **/
+package org.activemq.test.retroactive;
+
+import org.activemq.broker.region.policy.MessageQuery;
+import org.activemq.command.ActiveMQDestination;
+import org.activemq.command.ActiveMQTextMessage;
+import org.activemq.command.Message;
+
+import javax.jms.MessageListener;
+
+/**
+ *
+ * @version $Revision$
+ */
+public class DummyMessageQuery implements MessageQuery {
+
+    public static int messageCount = 10;
+    
+    public void execute(ActiveMQDestination destination, MessageListener listener) throws
Exception {
+        System.out.println("Initial query is creating: " + messageCount + " messages");
+        for (int i = 0; i < messageCount; i++) {
+            ActiveMQTextMessage message = new ActiveMQTextMessage();
+            message.setText("Initial message: " + i + " loaded from query");
+            listener.onMessage(message);
+        }
+    }
+
+    public boolean validateUpdate(Message message) {
+        return true;
+    }
+}

Propchange: incubator/activemq/trunk/activemq-core/src/test/java/org/activemq/test/retroactive/DummyMessageQuery.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/activemq/trunk/activemq-core/src/test/java/org/activemq/test/retroactive/DummyMessageQuery.java
------------------------------------------------------------------------------
    svn:keywords = Date Author Id Revision HeadURL

Propchange: incubator/activemq/trunk/activemq-core/src/test/java/org/activemq/test/retroactive/DummyMessageQuery.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Added: incubator/activemq/trunk/activemq-core/src/test/java/org/activemq/test/retroactive/RetroactiveConsumerWithMessageQueryTest.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/test/java/org/activemq/test/retroactive/RetroactiveConsumerWithMessageQueryTest.java?rev=357732&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/test/java/org/activemq/test/retroactive/RetroactiveConsumerWithMessageQueryTest.java
(added)
+++ incubator/activemq/trunk/activemq-core/src/test/java/org/activemq/test/retroactive/RetroactiveConsumerWithMessageQueryTest.java
Mon Dec 19 09:43:22 2005
@@ -0,0 +1,108 @@
+/**
+ * 
+ * Copyright 2005 LogicBlaze, Inc. http://www.logicblaze.com
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License"); 
+ * you may not use this file except in compliance with the License. 
+ * You may obtain a copy of the License at 
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, 
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
+ * See the License for the specific language governing permissions and 
+ * limitations under the License. 
+ * 
+ **/
+package org.activemq.test.retroactive;
+
+import org.activemq.ActiveMQConnectionFactory;
+import org.activemq.EmbeddedBrokerTestSupport;
+import org.activemq.broker.BrokerService;
+import org.activemq.util.MessageList;
+import org.activemq.xbean.BrokerFactoryBean;
+import org.springframework.core.io.ClassPathResource;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+
+import java.util.Date;
+
+/**
+ * 
+ * @version $Revision$
+ */
+public class RetroactiveConsumerWithMessageQueryTest extends EmbeddedBrokerTestSupport {
+    protected int messageCount = 20;
+    protected Connection connection;
+    protected Session session;
+
+    public void testConsumeAndReceiveInitialQueryBeforeUpdates() throws Exception {
+
+        // lets some messages
+        connection = createConnection();
+        session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        connection.start();
+
+        MessageConsumer consumer = session.createConsumer(destination);
+        MessageList listener = new MessageList();
+        listener.setVerbose(true);
+        consumer.setMessageListener(listener);
+
+        MessageProducer producer = session.createProducer(destination);
+        int updateMessageCount = messageCount - DummyMessageQuery.messageCount;
+        for (int i = 0; i < updateMessageCount; i++) {
+            TextMessage message = session.createTextMessage("Update Message: " + i + " sent
at: " + new Date());
+            producer.send(message);
+        }
+        producer.close();
+        System.out.println("Sent: " + updateMessageCount + " update messages");
+
+        listener.assertMessagesReceived(messageCount);
+    }
+
+    protected void setUp() throws Exception {
+        useTopic = true;
+        bindAddress = "vm://localhost";
+        super.setUp();
+    }
+
+    protected void tearDown() throws Exception {
+        if (session != null) {
+            session.close();
+            session = null;
+        }
+        if (connection != null) {
+            connection.close();
+        }
+        super.tearDown();
+    }
+
+    protected ConnectionFactory createConnectionFactory() throws Exception {
+        ActiveMQConnectionFactory answer = new ActiveMQConnectionFactory(bindAddress);
+        answer.setUseRetroactiveConsumer(true);
+        return answer;
+    }
+
+    protected BrokerService createBroker() throws Exception {
+        String uri = getBrokerXml();
+        System.out.println("Loading broker configuration from the classpath with URI: " +
uri);
+        BrokerFactoryBean factory = new BrokerFactoryBean(new ClassPathResource(uri));
+        factory.afterPropertiesSet();
+        return factory.getBroker();
+    }
+
+    protected void startBroker() throws Exception {
+        // broker already started by XBean
+    }
+
+    protected String getBrokerXml() {
+        return "org/activemq/test/retroactive/activemq-message-query.xml";
+    }
+
+}

Propchange: incubator/activemq/trunk/activemq-core/src/test/java/org/activemq/test/retroactive/RetroactiveConsumerWithMessageQueryTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/activemq/trunk/activemq-core/src/test/java/org/activemq/test/retroactive/RetroactiveConsumerWithMessageQueryTest.java
------------------------------------------------------------------------------
    svn:keywords = Date Author Id Revision HeadURL

Propchange: incubator/activemq/trunk/activemq-core/src/test/java/org/activemq/test/retroactive/RetroactiveConsumerWithMessageQueryTest.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Added: incubator/activemq/trunk/activemq-core/src/test/resources/org/activemq/test/retroactive/activemq-message-query.xml
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/test/resources/org/activemq/test/retroactive/activemq-message-query.xml?rev=357732&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/test/resources/org/activemq/test/retroactive/activemq-message-query.xml
(added)
+++ incubator/activemq/trunk/activemq-core/src/test/resources/org/activemq/test/retroactive/activemq-message-query.xml
Mon Dec 19 09:43:22 2005
@@ -0,0 +1,26 @@
+<?xml version="1.0" encoding="UTF-8"?>
+
+<!-- this file can only be parsed using the xbean-spring library -->
+<!-- START SNIPPET: xbean -->
+<beans xmlns="http://activemq.org/config/1.0"
+  xmlns:s="http://xbean.org/spring/">
+
+  <broker persistent="false">
+
+    <destinationPolicy>
+      <policyMap>
+        <policyEntries>
+          <policyEntry topic="org.activemq.test.>">
+            <subscriptionRecoveryPolicy>
+              <queryBasedSubscriptionRecoveryPolicy query="#myQuery" />
+            </subscriptionRecoveryPolicy>
+          </policyEntry>
+        </policyEntries>
+      </policyMap>
+    </destinationPolicy>
+  </broker>
+
+  <bean id="myQuery"
+    class="org.activemq.test.retroactive.DummyMessageQuery" />
+</beans>
+<!-- END SNIPPET: xbean -->

Propchange: incubator/activemq/trunk/activemq-core/src/test/resources/org/activemq/test/retroactive/activemq-message-query.xml
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/activemq/trunk/activemq-core/src/test/resources/org/activemq/test/retroactive/activemq-message-query.xml
------------------------------------------------------------------------------
    svn:keywords = Date Author Id Revision HeadURL

Propchange: incubator/activemq/trunk/activemq-core/src/test/resources/org/activemq/test/retroactive/activemq-message-query.xml
------------------------------------------------------------------------------
    svn:mime-type = text/xml



Mime
View raw message