Return-Path: Delivered-To: apmail-activemq-commits-archive@www.apache.org Received: (qmail 17413 invoked from network); 6 Aug 2007 17:08:26 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.2) by minotaur.apache.org with SMTP; 6 Aug 2007 17:08:25 -0000 Received: (qmail 98402 invoked by uid 500); 6 Aug 2007 17:08:25 -0000 Delivered-To: apmail-activemq-commits-archive@activemq.apache.org Received: (qmail 98373 invoked by uid 500); 6 Aug 2007 17:08:24 -0000 Mailing-List: contact commits-help@activemq.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@activemq.apache.org Delivered-To: mailing list commits@activemq.apache.org Received: (qmail 98364 invoked by uid 99); 6 Aug 2007 17:08:24 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 06 Aug 2007 10:08:24 -0700 X-ASF-Spam-Status: No, hits=-100.0 required=10.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.3] (HELO eris.apache.org) (140.211.11.3) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 06 Aug 2007 17:08:11 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id 4576A1A981A; Mon, 6 Aug 2007 10:07:58 -0700 (PDT) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r563194 - in /activemq/trunk: activemq-core/src/main/java/org/apache/activemq/broker/region/ activemq-core/src/main/java/org/apache/activemq/command/ activemq-core/src/main/java/org/apache/activemq/openwire/v3/ activemq-core/src/main/java/o... Date: Mon, 06 Aug 2007 17:07:57 -0000 To: commits@activemq.apache.org From: chirino@apache.org X-Mailer: svnmailer-1.1.0 Message-Id: <20070806170758.4576A1A981A@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: chirino Date: Mon Aug 6 10:07:55 2007 New Revision: 563194 URL: http://svn.apache.org/viewvc?view=rev&rev=563194 Log: Fix for AMQ-1356 : Durable Subscriptions do not work with Wildcards after broker is restarted. - Added a RecoveryBrokerTest.testWildCardSubscriptionPreservedOnRestart() test case that was showing that that wildcards did not work with DurableSubscriptions - Fix the TransactedTopicMasterSlaveTest so that setDeleteAllMessagesOnStartup() takes effect (had to be done before the connectors are added. - Change the MessageStore interface so that subscriptions are created using the data in the SubscriptionInfo class - Added a subscribedDestination field to the SubscriptionInfo so that the original wildcard subscrption can be remembered - The KahaReference store now deletes it's State store too when deleteAllMessages() is called - Fixed KahaPersistenceAdapter.getDestinations() so that it actually returns all the destinations. - We now recover all the topics eagerly when the topic region is started. Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Destination.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicRegion.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/SubscriptionInfo.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/openwire/v3/SubscriptionInfoMarshaller.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/ProxyTopicMessageStore.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/TopicMessageStore.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/TopicReferenceStore.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQTopicMessageStore.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCAdapter.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCTopicMessageStore.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/Statements.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/AxionJDBCAdapter.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalTopicMessageStore.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaPersistenceAdapter.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStoreAdapter.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicMessageStore.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicReferenceStore.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryTopicMessageStore.java activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/RecoveryBrokerTest.java activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/ft/TransactedTopicMasterSlaveTest.java activemq/trunk/activemq-core/src/test/java/org/apache/activemq/openwire/v3/SubscriptionInfoTest.java activemq/trunk/activemq-jpa-store/src/main/java/org/apache/activemq/store/jpa/JPATopicMessageStore.java activemq/trunk/activemq-jpa-store/src/main/java/org/apache/activemq/store/jpa/JPATopicReferenceStore.java activemq/trunk/activemq-jpa-store/src/main/java/org/apache/activemq/store/jpa/model/StoredSubscription.java Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java?view=diff&rev=563194&r1=563193&r2=563194 ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java Mon Aug 6 10:07:55 2007 @@ -17,10 +17,13 @@ */ package org.apache.activemq.broker.region; +import java.util.ArrayList; import java.util.HashMap; import java.util.Iterator; +import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; import javax.jms.JMSException; @@ -36,14 +39,13 @@ import org.apache.activemq.command.MessagePull; import org.apache.activemq.command.RemoveSubscriptionInfo; import org.apache.activemq.command.Response; +import org.apache.activemq.filter.DestinationFilter; import org.apache.activemq.filter.DestinationMap; import org.apache.activemq.memory.UsageManager; import org.apache.activemq.thread.TaskRunnerFactory; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import java.util.concurrent.ConcurrentHashMap; - /** * * @version $Revision: 1.14 $ @@ -81,6 +83,16 @@ public void start() throws Exception { started = true; + + Set inactiveDests = getInactiveDestinations(); + for (Iterator iter = inactiveDests.iterator(); iter.hasNext();) { + ActiveMQDestination dest = (ActiveMQDestination) iter.next(); + + ConnectionContext context = new ConnectionContext(); + context.setBroker(broker.getBrokerService().getBroker()); + context.getBroker().addDestination(context , dest); + } + for (Iterator i = destinations.values().iterator();i.hasNext();) { Destination dest = (Destination)i.next(); dest.start(); @@ -110,18 +122,28 @@ dest.start(); destinations.put(destination,dest); destinationMap.put(destination,dest); - // Add all consumers that are interested in the destination. - for(Iterator iter=subscriptions.values().iterator();iter.hasNext();){ - Subscription sub=(Subscription)iter.next(); - if(sub.matches(destination)){ - dest.addSubscription(context,sub); - } - } + addSubscriptionsForDestination(context, dest); } return dest; } } + protected List addSubscriptionsForDestination(ConnectionContext context, + Destination dest) throws Exception { + + ArrayList rc = new ArrayList(); + // Add all consumers that are interested in the destination. + for(Iterator iter=subscriptions.values().iterator();iter.hasNext();){ + Subscription sub=(Subscription)iter.next(); + if(sub.matches(dest.getActiveMQDestination())){ + dest.addSubscription(context,sub); + rc.add(sub); + } + } + return rc; + + } + public void removeDestination(ConnectionContext context,ActiveMQDestination destination,long timeout) throws Exception{ @@ -205,7 +227,6 @@ return (Subscription)o; } - Subscription sub = createSubscription(context, info); // We may need to add some destinations that are in persistent store but not active // in the broker. @@ -216,14 +237,9 @@ // eagerly load all destinations into the broker but have an inactive state for the // destination which has reduced memory usage. // - Set inactiveDests = getInactiveDestinations(); - for (Iterator iter = inactiveDests.iterator(); iter.hasNext();) { - ActiveMQDestination dest = (ActiveMQDestination) iter.next(); - if( sub.matches(dest) ) { - context.getBroker().addDestination(context, dest); - } - } + DestinationFilter destinationFilter = DestinationFilter.parseFilter(info.getDestination()); + Subscription sub = createSubscription(context, info); subscriptions.put(info.getConsumerId(), sub); Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Destination.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Destination.java?view=diff&rev=563194&r1=563193&r2=563194 ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Destination.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Destination.java Mon Aug 6 10:07:55 2007 @@ -27,6 +27,8 @@ import org.apache.activemq.command.Message; import org.apache.activemq.command.MessageAck; import org.apache.activemq.memory.UsageManager; +import org.apache.activemq.store.MessageStore; +import org.apache.activemq.store.TopicMessageStore; /** * @@ -53,4 +55,5 @@ public Message[] browse(); public String getName(); + public MessageStore getMessageStore(); } Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java?view=diff&rev=563194&r1=563193&r2=563194 ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java Mon Aug 6 10:07:55 2007 @@ -25,6 +25,7 @@ import org.apache.activemq.command.Message; import org.apache.activemq.command.MessageAck; import org.apache.activemq.memory.UsageManager; +import org.apache.activemq.store.MessageStore; import java.io.IOException; import java.util.Iterator; @@ -114,4 +115,8 @@ dest.send(context, message); } } + + public MessageStore getMessageStore() { + return next.getMessageStore(); + } } Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java?view=diff&rev=563194&r1=563193&r2=563194 ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java Mon Aug 6 10:07:55 2007 @@ -425,7 +425,7 @@ context.getConnection().dispatchAsync(md); }else{ context.getConnection().dispatchSync(md); - onDispatch(node,message); + onDispatch(node,message); } //System.err.println(broker.getBrokerName() + " " + this + " (" + enqueueCounter + ", " + dispatchCounter +") " + node); return true; @@ -439,11 +439,13 @@ if(node!=QueueMessageReference.NULL_MESSAGE){ node.getRegionDestination().getDestinationStatistics().getDispatched().increment(); } - try{ - dispatchMatched(); - }catch(IOException e){ - context.getConnection().serviceExceptionAsync(e); - } + } + if( info.isDispatchAsync() ) { + try{ + dispatchMatched(); + }catch(IOException e){ + context.getConnection().serviceExceptionAsync(e); + } } } Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java?view=diff&rev=563194&r1=563193&r2=563194 ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java Mon Aug 6 10:07:55 2007 @@ -23,6 +23,7 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CopyOnWriteArraySet; + import org.apache.activemq.advisory.AdvisorySupport; import org.apache.activemq.broker.Broker; import org.apache.activemq.broker.ConnectionContext; @@ -183,7 +184,13 @@ } // Do we need to create the subscription? if (info == null) { - store.addSubsciption(clientId, subscriptionName, selector, subscription.getConsumerInfo().isRetroactive()); + info = new SubscriptionInfo(); + info.setClientId(clientId); + info.setSelector(selector); + info.setSubscriptionName(subscriptionName); + info.setDestination(getActiveMQDestination()); // This destination is an actual destination id. + info.setSubscribedDestination(subscription.getConsumerInfo().getDestination()); // This destination might be a pattern + store.addSubsciption(info, subscription.getConsumerInfo().isRetroactive()); } final MessageEvaluationContext msgContext = new MessageEvaluationContext(); Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicRegion.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicRegion.java?view=diff&rev=563194&r1=563193&r2=563194 ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicRegion.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicRegion.java Mon Aug 6 10:07:55 2007 @@ -18,7 +18,9 @@ package org.apache.activemq.broker.region; import java.io.IOException; +import java.util.HashSet; import java.util.Iterator; +import java.util.List; import java.util.Set; import javax.jms.InvalidDestinationException; @@ -148,19 +150,14 @@ return "TopicRegion: destinations=" + destinations.size() + ", subscriptions=" + subscriptions.size() + ", memory=" + memoryManager.getPercentUsage() + "%"; } - - // Implementation methods - // ------------------------------------------------------------------------- - protected Destination createDestination(ConnectionContext context, ActiveMQDestination destination) throws Exception { - Topic topic = (Topic) super.createDestination(context, destination); - - recoverDurableSubscriptions(context, topic); - - return topic; - } - - private void recoverDurableSubscriptions(ConnectionContext context, Topic topic) throws IOException, JMSException, Exception { - TopicMessageStore store = (TopicMessageStore) topic.getMessageStore(); + + @Override + protected List addSubscriptionsForDestination(ConnectionContext context, Destination dest) throws Exception { + + List rc = super.addSubscriptionsForDestination(context, dest); + HashSet dupChecker = new HashSet(rc); + + TopicMessageStore store = (TopicMessageStore) dest.getMessageStore(); // Eagerly recover the durable subscriptions if (store != null) { SubscriptionInfo[] infos = store.getAllSubscriptions(); @@ -181,16 +178,40 @@ sub = (DurableTopicSubscription) createSubscription(c, consumerInfo ); } - topic.addSubscription(context, sub); - } + if( dupChecker.contains(sub ) ) { + continue; + } + + dupChecker.add(sub); + rc.add(sub); + dest.addSubscription(context, sub); + } + + // Now perhaps there other durable subscriptions (via wild card) that would match this destination.. + durableSubscriptions.values(); + for (Iterator iterator = durableSubscriptions.values().iterator(); iterator + .hasNext();) { + DurableTopicSubscription sub = (DurableTopicSubscription) iterator.next(); + // Skip over subscriptions that we allready added.. + if( dupChecker.contains(sub ) ) { + continue; + } + + if( sub.matches(dest.getActiveMQDestination()) ) { + rc.add(sub); + dest.addSubscription(context, sub); + } + } } + + return rc; } private ConsumerInfo createInactiveConsumerInfo(SubscriptionInfo info) { ConsumerInfo rc = new ConsumerInfo(); rc.setSelector(info.getSelector()); rc.setSubscriptionName(info.getSubscriptionName()); - rc.setDestination(info.getDestination()); + rc.setDestination(info.getSubscribedDestination()); rc.setConsumerId(createConsumerId()); return rc; } Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/SubscriptionInfo.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/SubscriptionInfo.java?view=diff&rev=563194&r1=563193&r2=563194 ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/SubscriptionInfo.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/SubscriptionInfo.java Mon Aug 6 10:07:55 2007 @@ -21,6 +21,7 @@ /** + * Used to represent a durable subscription. * * @openwire:marshaller code="55" * @version $Revision: 1.6 $ @@ -29,6 +30,7 @@ public static final byte DATA_STRUCTURE_TYPE=CommandTypes.DURABLE_SUBSCRIPTION_INFO; + protected ActiveMQDestination subscribedDestination; protected ActiveMQDestination destination; protected String clientId; protected String subscriptionName; @@ -50,6 +52,9 @@ } /** + * This is the a resolved destination that the subscription is receiving messages from. + * This will never be a pattern or a composite destination. + * * @openwire:property version=1 cache=true */ public ActiveMQDestination getDestination() { @@ -120,5 +125,24 @@ } return result; } + + /** + * The destination the client originally subscribed to.. This may not match the {@see getDestination} method + * if the subscribed destination uses patterns or composites. + * + * If the subscribed destinationis not set, this just ruturns the desitination. + * + * @openwire:property version=3 + */ + public ActiveMQDestination getSubscribedDestination() { + if( subscribedDestination == null ) { + return getDestination(); + } + return subscribedDestination; + } + + public void setSubscribedDestination(ActiveMQDestination subscribedDestination) { + this.subscribedDestination = subscribedDestination; + } } Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/openwire/v3/SubscriptionInfoMarshaller.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/openwire/v3/SubscriptionInfoMarshaller.java?view=diff&rev=563194&r1=563193&r2=563194 ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/openwire/v3/SubscriptionInfoMarshaller.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/openwire/v3/SubscriptionInfoMarshaller.java Mon Aug 6 10:07:55 2007 @@ -70,6 +70,7 @@ info.setDestination((org.apache.activemq.command.ActiveMQDestination) tightUnmarsalCachedObject(wireFormat, dataIn, bs)); info.setSelector(tightUnmarshalString(dataIn, bs)); info.setSubcriptionName(tightUnmarshalString(dataIn, bs)); + info.setSubscribedDestination((org.apache.activemq.command.ActiveMQDestination) tightUnmarsalNestedObject(wireFormat, dataIn, bs)); } @@ -86,6 +87,7 @@ rc += tightMarshalCachedObject1(wireFormat, (DataStructure)info.getDestination(), bs); rc += tightMarshalString1(info.getSelector(), bs); rc += tightMarshalString1(info.getSubcriptionName(), bs); + rc += tightMarshalNestedObject1(wireFormat, (DataStructure)info.getSubscribedDestination(), bs); return rc + 0; } @@ -105,6 +107,7 @@ tightMarshalCachedObject2(wireFormat, (DataStructure)info.getDestination(), dataOut, bs); tightMarshalString2(info.getSelector(), dataOut, bs); tightMarshalString2(info.getSubcriptionName(), dataOut, bs); + tightMarshalNestedObject2(wireFormat, (DataStructure)info.getSubscribedDestination(), dataOut, bs); } @@ -123,6 +126,7 @@ info.setDestination((org.apache.activemq.command.ActiveMQDestination) looseUnmarsalCachedObject(wireFormat, dataIn)); info.setSelector(looseUnmarshalString(dataIn)); info.setSubcriptionName(looseUnmarshalString(dataIn)); + info.setSubscribedDestination((org.apache.activemq.command.ActiveMQDestination) looseUnmarsalNestedObject(wireFormat, dataIn)); } @@ -139,6 +143,7 @@ looseMarshalCachedObject(wireFormat, (DataStructure)info.getDestination(), dataOut); looseMarshalString(info.getSelector(), dataOut); looseMarshalString(info.getSubcriptionName(), dataOut); + looseMarshalNestedObject(wireFormat, (DataStructure)info.getSubscribedDestination(), dataOut); } } Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/ProxyTopicMessageStore.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/ProxyTopicMessageStore.java?view=diff&rev=563194&r1=563193&r2=563194 ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/ProxyTopicMessageStore.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/ProxyTopicMessageStore.java Mon Aug 6 10:07:55 2007 @@ -71,8 +71,8 @@ throws IOException { delegate.acknowledge(context, clientId, subscriptionName, messageId); } - public void addSubsciption(String clientId, String subscriptionName, String selector, boolean retroactive) throws IOException { - delegate.addSubsciption(clientId, subscriptionName, selector, retroactive); + public void addSubsciption(SubscriptionInfo subscriptionInfo, boolean retroactive) throws IOException { + delegate.addSubsciption(subscriptionInfo, retroactive); } public void deleteSubscription(String clientId, String subscriptionName) throws IOException { delegate.deleteSubscription(clientId, subscriptionName); Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/TopicMessageStore.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/TopicMessageStore.java?view=diff&rev=563194&r1=563193&r2=563194 ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/TopicMessageStore.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/TopicMessageStore.java Mon Aug 6 10:07:55 2007 @@ -106,7 +106,7 @@ public SubscriptionInfo lookupSubscription(String clientId,String subscriptionName) throws IOException; /** - * Lists all the durable subscirptions for a given destination. + * Lists all the durable subscriptions for a given destination. * * @return an array SubscriptionInfos * @throws IOException @@ -126,6 +126,6 @@ * @throws IOException * */ - public void addSubsciption(String clientId,String subscriptionName,String selector,boolean retroactive) + public void addSubsciption(SubscriptionInfo subscriptionInfo, boolean retroactive) throws IOException; } Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/TopicReferenceStore.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/TopicReferenceStore.java?view=diff&rev=563194&r1=563193&r2=563194 ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/TopicReferenceStore.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/TopicReferenceStore.java Mon Aug 6 10:07:55 2007 @@ -132,6 +132,6 @@ * @throws IOException * */ - public void addSubsciption(String clientId,String subscriptionName,String selector,boolean retroactive) + public void addSubsciption(SubscriptionInfo subscriptionInfo,boolean retroactive) throws IOException; } Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQTopicMessageStore.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQTopicMessageStore.java?view=diff&rev=563194&r1=563193&r2=563194 ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQTopicMessageStore.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQTopicMessageStore.java Mon Aug 6 10:07:55 2007 @@ -70,9 +70,9 @@ return topicReferenceStore.lookupSubscription(clientId,subscriptionName); } - public void addSubsciption(String clientId,String subscriptionName,String selector,boolean retroactive) + public void addSubsciption(SubscriptionInfo subscriptionInfo,boolean retroactive) throws IOException{ - topicReferenceStore.addSubsciption(clientId,subscriptionName,selector,retroactive); + topicReferenceStore.addSubsciption(subscriptionInfo,retroactive); } /** Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCAdapter.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCAdapter.java?view=diff&rev=563194&r1=563193&r2=563194 ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCAdapter.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCAdapter.java Mon Aug 6 10:07:55 2007 @@ -56,8 +56,7 @@ public abstract void doRecoverNextMessages(TransactionContext c,ActiveMQDestination destination,String clientId, String subscriptionName,long seq,int maxReturned,JDBCMessageRecoveryListener listener) throws Exception; - public abstract void doSetSubscriberEntry(TransactionContext c,ActiveMQDestination destination,String clientId, - String subscriptionName,String selector,boolean retroactive) throws SQLException,IOException; + public abstract void doSetSubscriberEntry(TransactionContext c,SubscriptionInfo subscriptionInfo,boolean retroactive) throws SQLException,IOException; public abstract SubscriptionInfo doGetSubscriberEntry(TransactionContext c,ActiveMQDestination destination, String clientId,String subscriptionName) throws SQLException,IOException; Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCTopicMessageStore.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCTopicMessageStore.java?view=diff&rev=563194&r1=563193&r2=563194 ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCTopicMessageStore.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCTopicMessageStore.java Mon Aug 6 10:07:55 2007 @@ -138,16 +138,16 @@ * @see org.apache.activemq.store.TopicMessageStore#storeSubsciption(org.apache.activemq.service.SubscriptionInfo, * boolean) */ - public void addSubsciption(String clientId, String subscriptionName, String selector, boolean retroactive) + public void addSubsciption(SubscriptionInfo subscriptionInfo, boolean retroactive) throws IOException { TransactionContext c = persistenceAdapter.getTransactionContext(); try { c = persistenceAdapter.getTransactionContext(); - adapter.doSetSubscriberEntry(c, destination, clientId, subscriptionName, selector, retroactive); + adapter.doSetSubscriberEntry(c, subscriptionInfo, retroactive); } catch (SQLException e) { JDBCPersistenceAdapter.log("JDBC Failure: ",e); throw IOExceptionSupport - .create("Failed to lookup subscription for info: " + clientId + ". Reason: " + e, e); + .create("Failed to lookup subscription for info: " + subscriptionInfo.getClientId() + ". Reason: " + e, e); } finally { c.close(); } Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/Statements.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/Statements.java?view=diff&rev=563194&r1=563193&r2=563194 ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/Statements.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/Statements.java Mon Aug 6 10:07:55 2007 @@ -84,6 +84,7 @@ "CREATE INDEX " + getFullMessageTableName() + "_EIDX ON " + getFullMessageTableName() + " (EXPIRATION)", "CREATE TABLE " + getFullAckTableName() + "(" + "CONTAINER " + containerNameDataType + " NOT NULL" + + ", SUB_DEST " + stringIdDataType + ", CLIENT_ID " + stringIdDataType + " NOT NULL" + ", SUB_NAME " + stringIdDataType + " NOT NULL" + ", SELECTOR " + stringIdDataType + ", LAST_ACKED_ID " + sequenceDataType + ", PRIMARY KEY ( CONTAINER, CLIENT_ID, SUB_NAME))", @@ -165,14 +166,14 @@ public String getCreateDurableSubStatement() { if (createDurableSubStatement == null) { createDurableSubStatement = "INSERT INTO " + getFullAckTableName() - + "(CONTAINER, CLIENT_ID, SUB_NAME, SELECTOR, LAST_ACKED_ID) " + "VALUES (?, ?, ?, ?, ?)"; + + "(CONTAINER, CLIENT_ID, SUB_NAME, SELECTOR, LAST_ACKED_ID, SUB_DEST) " + "VALUES (?, ?, ?, ?, ?, ?)"; } return createDurableSubStatement; } public String getFindDurableSubStatement() { if (findDurableSubStatement == null) { - findDurableSubStatement = "SELECT SELECTOR, SUB_NAME " + "FROM " + getFullAckTableName() + findDurableSubStatement = "SELECT SELECTOR, SUB_DEST " + "FROM " + getFullAckTableName() + " WHERE CONTAINER=? AND CLIENT_ID=? AND SUB_NAME=?"; } return findDurableSubStatement; @@ -180,7 +181,7 @@ public String getFindAllDurableSubsStatement() { if (findAllDurableSubsStatement == null) { - findAllDurableSubsStatement = "SELECT SELECTOR, SUB_NAME, CLIENT_ID" + " FROM " + getFullAckTableName() + findAllDurableSubsStatement = "SELECT SELECTOR, SUB_NAME, CLIENT_ID, SUB_DEST" + " FROM " + getFullAckTableName() + " WHERE CONTAINER=?"; } return findAllDurableSubsStatement; Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/AxionJDBCAdapter.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/AxionJDBCAdapter.java?view=diff&rev=563194&r1=563193&r2=563194 ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/AxionJDBCAdapter.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/AxionJDBCAdapter.java Mon Aug 6 10:07:55 2007 @@ -48,6 +48,7 @@ "CREATE INDEX "+statements.getFullMessageTableName()+"_EIDX ON "+statements.getFullMessageTableName()+" (EXPIRATION)", "CREATE TABLE "+statements.getFullAckTableName()+"(" +"CONTAINER "+statements.getContainerNameDataType()+" NOT NULL" + +", SUB_DEST " + statements.getContainerNameDataType() +", CLIENT_ID "+statements.getStringIdDataType()+" NOT NULL" +", SUB_NAME "+statements.getStringIdDataType()+" NOT NULL" +", SELECTOR "+statements.getStringIdDataType() Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java?view=diff&rev=563194&r1=563193&r2=563194 ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java Mon Aug 6 10:07:55 2007 @@ -431,8 +431,7 @@ * @see org.apache.activemq.store.jdbc.JDBCAdapter#doSetSubscriberEntry(java.sql.Connection, java.lang.Object, * org.apache.activemq.service.SubscriptionInfo) */ - public void doSetSubscriberEntry(TransactionContext c,ActiveMQDestination destination,String clientId, - String subscriptionName,String selector,boolean retroactive) throws SQLException,IOException{ + public void doSetSubscriberEntry(TransactionContext c,SubscriptionInfo info,boolean retroactive) throws SQLException,IOException{ // dumpTables(c, destination.getQualifiedName(), clientId, subscriptionName); PreparedStatement s=null; try{ @@ -451,13 +450,14 @@ } } s=c.getConnection().prepareStatement(statements.getCreateDurableSubStatement()); - s.setString(1,destination.getQualifiedName()); - s.setString(2,clientId); - s.setString(3,subscriptionName); - s.setString(4,selector); + s.setString(1,info.getDestination().getQualifiedName()); + s.setString(2,info.getClientId()); + s.setString(3,info.getSubscriptionName()); + s.setString(4,info.getSelector()); s.setLong(5,lastMessageId); + s.setString(6, info.getSubscribedDestination().getQualifiedName()); if(s.executeUpdate()!=1){ - throw new IOException("Could not create durable subscription for: "+clientId); + throw new IOException("Could not create durable subscription for: "+info.getClientId()); } }finally{ close(s); @@ -480,8 +480,9 @@ SubscriptionInfo subscription=new SubscriptionInfo(); subscription.setDestination(destination); subscription.setClientId(clientId); - subscription.setSubcriptionName(subscriptionName); + subscription.setSubscriptionName(subscriptionName); subscription.setSelector(rs.getString(1)); + subscription.setSubscribedDestination(ActiveMQDestination.createDestination(rs.getString(2), ActiveMQDestination.QUEUE_TYPE)); return subscription; }finally{ close(rs); @@ -502,8 +503,9 @@ SubscriptionInfo subscription=new SubscriptionInfo(); subscription.setDestination(destination); subscription.setSelector(rs.getString(1)); - subscription.setSubcriptionName(rs.getString(2)); + subscription.setSubscriptionName(rs.getString(2)); subscription.setClientId(rs.getString(3)); + subscription.setSubscribedDestination(ActiveMQDestination.createDestination(rs.getString(4),ActiveMQDestination.QUEUE_TYPE)); rc.add(subscription); } return (SubscriptionInfo[])rc.toArray(new SubscriptionInfo[rc.size()]); Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalTopicMessageStore.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalTopicMessageStore.java?view=diff&rev=563194&r1=563193&r2=563194 ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalTopicMessageStore.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalTopicMessageStore.java Mon Aug 6 10:07:55 2007 @@ -68,9 +68,9 @@ return longTermStore.lookupSubscription(clientId, subscriptionName); } - public void addSubsciption(String clientId, String subscriptionName, String selector, boolean retroactive) throws IOException { + public void addSubsciption(SubscriptionInfo subscriptionInfo, boolean retroactive) throws IOException { this.peristenceAdapter.checkpoint(true, true); - longTermStore.addSubsciption(clientId, subscriptionName, selector, retroactive); + longTermStore.addSubsciption(subscriptionInfo, retroactive); } public void addMessage(ConnectionContext context, Message message) throws IOException { Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaPersistenceAdapter.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaPersistenceAdapter.java?view=diff&rev=563194&r1=563193&r2=563194 ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaPersistenceAdapter.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaPersistenceAdapter.java Mon Aug 6 10:07:55 2007 @@ -27,6 +27,7 @@ import org.apache.activemq.command.Message; import org.apache.activemq.command.MessageId; import org.apache.activemq.kaha.CommandMarshaller; +import org.apache.activemq.kaha.ContainerId; import org.apache.activemq.kaha.ListContainer; import org.apache.activemq.kaha.MapContainer; import org.apache.activemq.kaha.Marshaller; @@ -71,7 +72,8 @@ try{ Store store=getStore(); for(Iterator i=store.getMapContainerIds().iterator();i.hasNext();){ - Object obj=i.next(); + ContainerId id=(ContainerId) i.next(); + Object obj = id.getKey(); if(obj instanceof ActiveMQDestination){ rc.add((ActiveMQDestination)obj); } Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStoreAdapter.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStoreAdapter.java?view=diff&rev=563194&r1=563193&r2=563194 ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStoreAdapter.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStoreAdapter.java Mon Aug 6 10:07:55 2007 @@ -211,7 +211,7 @@ for(Iterator i=durableSubscribers.iterator();i.hasNext();){ SubscriptionInfo info=(SubscriptionInfo)i.next(); TopicReferenceStore ts=createTopicReferenceStore((ActiveMQTopic)info.getDestination()); - ts.addSubsciption(info.getClientId(),info.getSubscriptionName(),info.getSelector(),false); + ts.addSubsciption(info,false); } } @@ -247,6 +247,20 @@ this.stateStore=createStateStore(getDirectory()); } return this.stateStore; + } + + public void deleteAllMessages() throws IOException{ + super.deleteAllMessages(); + if(stateStore!=null){ + if(stateStore.isInitialized()){ + stateStore.clear(); + }else{ + stateStore.delete(); + } + }else{ + File stateDirectory=new File(getDirectory(),"kr-state"); + StoreFactory.delete(stateDirectory.getAbsolutePath()); + } } private Store createStateStore(File directory){ Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicMessageStore.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicMessageStore.java?view=diff&rev=563194&r1=563193&r2=563194 ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicMessageStore.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicMessageStore.java Mon Aug 6 10:07:55 2007 @@ -106,14 +106,9 @@ return (SubscriptionInfo)subscriberContainer.get(getSubscriptionKey(clientId,subscriptionName)); } - public synchronized void addSubsciption(String clientId,String subscriptionName,String selector,boolean retroactive) + public synchronized void addSubsciption(SubscriptionInfo info,boolean retroactive) throws IOException{ - SubscriptionInfo info=new SubscriptionInfo(); - info.setDestination(destination); - info.setClientId(clientId); - info.setSelector(selector); - info.setSubcriptionName(subscriptionName); - String key=getSubscriptionKey(clientId,subscriptionName); + String key=getSubscriptionKey(info.getClientId(),info.getSubscriptionName()); // if already exists - won't add it again as it causes data files // to hang around if(!subscriberContainer.containsKey(key)){ Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicReferenceStore.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicReferenceStore.java?view=diff&rev=563194&r1=563193&r2=563194 ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicReferenceStore.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicReferenceStore.java Mon Aug 6 10:07:55 2007 @@ -18,6 +18,7 @@ import java.util.Iterator; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; + import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.Message; @@ -46,7 +47,7 @@ subscriberContainer=subsContainer; // load all the Ack containers for(Iterator i=subscriberContainer.keySet().iterator();i.hasNext();){ - Object key=i.next(); + String key=(String) i.next(); addSubscriberMessageContainer(key); } } @@ -102,8 +103,8 @@ } } - protected ListContainer addSubscriberMessageContainer(Object key) throws IOException{ - ListContainer container=store.getListContainer(key,"topic-subs-references"); + protected ListContainer addSubscriberMessageContainer(String key) throws IOException{ + ListContainer container=store.getListContainer(destination,"topic-subs-references-"+key); Marshaller marshaller=new ConsumerMessageRefMarshaller(); container.setMarshaller(marshaller); TopicSubContainer tsc=new TopicSubContainer(container); @@ -141,14 +142,9 @@ } } - public synchronized void addSubsciption(String clientId,String subscriptionName,String selector,boolean retroactive) + public synchronized void addSubsciption(SubscriptionInfo info,boolean retroactive) throws IOException{ - SubscriptionInfo info=new SubscriptionInfo(); - info.setDestination(destination); - info.setClientId(clientId); - info.setSelector(selector); - info.setSubcriptionName(subscriptionName); - String key=getSubscriptionKey(clientId,subscriptionName); + String key=getSubscriptionKey(info.getClientId(), info.getSubscriptionName()); // if already exists - won't add it again as it causes data files // to hang around if(!subscriberContainer.containsKey(key)){ @@ -253,7 +249,7 @@ } } - protected void removeSubscriberMessageContainer(Object key) throws IOException{ + protected void removeSubscriberMessageContainer(String key) throws IOException{ subscriberContainer.remove(key); TopicSubContainer container=(TopicSubContainer)subscriberMessages.remove(key); for(Iterator i=container.iterator();i.hasNext();){ @@ -270,7 +266,7 @@ } } } - store.deleteListContainer(key,"topic-subs-references"); + store.deleteListContainer(destination,"topic-subs-references-"+key); } protected String getSubscriptionKey(String clientId,String subscriberName){ Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryTopicMessageStore.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryTopicMessageStore.java?view=diff&rev=563194&r1=563193&r2=563194 ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryTopicMessageStore.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryTopicMessageStore.java Mon Aug 6 10:07:55 2007 @@ -73,14 +73,9 @@ return (SubscriptionInfo)subscriberDatabase.get(new SubscriptionKey(clientId,subscriptionName)); } - public synchronized void addSubsciption(String clientId,String subscriptionName,String selector,boolean retroactive) + public synchronized void addSubsciption(SubscriptionInfo info,boolean retroactive) throws IOException{ - SubscriptionInfo info=new SubscriptionInfo(); - info.setDestination(destination); - info.setClientId(clientId); - info.setSelector(selector); - info.setSubcriptionName(subscriptionName); - SubscriptionKey key=new SubscriptionKey(clientId,subscriptionName); + SubscriptionKey key=new SubscriptionKey(info); MemoryTopicSub sub=new MemoryTopicSub(); topicSubMap.put(key,sub); if(retroactive){ Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/RecoveryBrokerTest.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/RecoveryBrokerTest.java?view=diff&rev=563194&r1=563193&r2=563194 ============================================================================== --- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/RecoveryBrokerTest.java (original) +++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/RecoveryBrokerTest.java Mon Aug 6 10:07:55 2007 @@ -17,6 +17,8 @@ */ package org.apache.activemq.broker; +import java.util.ArrayList; + import javax.jms.DeliveryMode; import junit.framework.Test; @@ -29,6 +31,7 @@ import org.apache.activemq.command.LocalTransactionId; import org.apache.activemq.command.Message; import org.apache.activemq.command.MessageAck; +import org.apache.activemq.command.MessageId; import org.apache.activemq.command.ProducerInfo; import org.apache.activemq.command.SessionInfo; @@ -39,6 +42,113 @@ */ public class RecoveryBrokerTest extends BrokerRestartTestSupport { + /** + * Used to verify that after a broker restart durable subscriptions that + * use wild cards are still wild card subscription after broker restart. + * + * @throws Exception + */ + public void testWildCardSubscriptionPreservedOnRestart() throws Exception { + ActiveMQDestination dest1 = new ActiveMQTopic("TEST.A"); + ActiveMQDestination dest2 = new ActiveMQTopic("TEST.B"); + ActiveMQDestination dest3 = new ActiveMQTopic("TEST.C"); + ActiveMQDestination wildDest = new ActiveMQTopic("TEST.>"); + + ArrayList sentBeforeRestart = new ArrayList(); + ArrayList sentBeforeCreateConsumer = new ArrayList(); + ArrayList sentAfterCreateConsumer = new ArrayList(); + + // Setup a first connection + { + StubConnection connection1 = createConnection(); + ConnectionInfo connectionInfo1 = createConnectionInfo(); + connectionInfo1.setClientId("A"); + SessionInfo sessionInfo1 = createSessionInfo(connectionInfo1); + ProducerInfo producerInfo1 = createProducerInfo(sessionInfo1); + connection1.send(connectionInfo1); + connection1.send(sessionInfo1); + connection1.send(producerInfo1); + + // Create the durable subscription. + ConsumerInfo consumerInfo1 = createConsumerInfo(sessionInfo1, wildDest); + consumerInfo1.setSubscriptionName("test"); + consumerInfo1.setPrefetchSize(100); + connection1.send(consumerInfo1); + + // Close the subscription. + connection1.send(closeConsumerInfo(consumerInfo1)); + + // Send the messages + for( int i=0; i < 4; i++) { + Message m = createMessage(producerInfo1, dest1, DeliveryMode.PERSISTENT); + connection1.send(m); + sentBeforeRestart.add(m.getMessageId()); + } + connection1.request(closeConnectionInfo(connectionInfo1)); + connection1.stop(); + } + + // Restart the broker. + restartBroker(); + + // Get a connection to the new broker. + { + StubConnection connection2 = createConnection(); + ConnectionInfo connectionInfo2 = createConnectionInfo(); + connectionInfo2.setClientId("A"); + SessionInfo sessionInfo2 = createSessionInfo(connectionInfo2); + connection2.send(connectionInfo2); + connection2.send(sessionInfo2); + + ProducerInfo producerInfo2 = createProducerInfo(sessionInfo2); + connection2.send(producerInfo2); + + // Send messages before the durable subscription is re-activated. + for( int i=0; i < 4; i++) { + Message m = createMessage(producerInfo2, dest2, DeliveryMode.PERSISTENT); + connection2.send(m); + sentBeforeCreateConsumer.add(m.getMessageId()); + } + + // Re-open the subscription. + ConsumerInfo consumerInfo2 = createConsumerInfo(sessionInfo2, wildDest); + consumerInfo2.setSubscriptionName("test"); + consumerInfo2.setPrefetchSize(100); + connection2.send(consumerInfo2); + + // Send messages after the subscription is activated. + for( int i=0; i < 4; i++) { + Message m = createMessage(producerInfo2, dest3, DeliveryMode.PERSISTENT); + connection2.send(m); + sentAfterCreateConsumer.add(m.getMessageId()); + } + + // We should get the recovered messages... + for( int i=0; i < 4 ; i++ ) { + Message m2 = receiveMessage(connection2); + assertNotNull("Recovered message missing: "+i, m2); + assertEquals(sentBeforeRestart.get(i), m2.getMessageId()); + } + + // We should get get the messages that were sent before the sub was reactivated. + for( int i=0; i < 4 ; i++ ) { + Message m2 = receiveMessage(connection2); + assertNotNull("Before activated message missing: "+i, m2); + assertEquals(sentBeforeCreateConsumer.get(i), m2.getMessageId()); + } + + // We should get get the messages that were sent after the sub was reactivated. + for( int i=0; i < 4 ; i++ ) { + Message m2 = receiveMessage(connection2); + assertNotNull("After activated message missing: "+i, m2); + assertEquals(""+i, sentAfterCreateConsumer.get(i), m2.getMessageId()); + } + + assertNoMessagesLeft(connection2); + } + + } + public void testConsumedQueuePersistentMessagesLostOnRestart() throws Exception { ActiveMQDestination destination = new ActiveMQQueue("TEST"); Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/ft/TransactedTopicMasterSlaveTest.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/ft/TransactedTopicMasterSlaveTest.java?view=diff&rev=563194&r1=563193&r2=563194 ============================================================================== --- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/ft/TransactedTopicMasterSlaveTest.java (original) +++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/ft/TransactedTopicMasterSlaveTest.java Mon Aug 6 10:07:55 2007 @@ -39,9 +39,9 @@ broker.start(); slave = new BrokerService(); slave.setBrokerName("slave"); - slave.addConnector("tcp://localhost:62002"); slave.setDeleteAllMessagesOnStartup(true); slave.setMasterConnectorURI("tcp://localhost:62001"); + slave.addConnector("tcp://localhost:62002"); slave.start(); // wait for thing to connect Thread.sleep(1000); @@ -62,8 +62,8 @@ protected BrokerService createBroker() throws Exception,URISyntaxException{ BrokerService broker=new BrokerService(); broker.setBrokerName("master"); - broker.addConnector("tcp://localhost:62001"); broker.setDeleteAllMessagesOnStartup(true); + broker.addConnector("tcp://localhost:62001"); return broker; } Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/openwire/v3/SubscriptionInfoTest.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/openwire/v3/SubscriptionInfoTest.java?view=diff&rev=563194&r1=563193&r2=563194 ============================================================================== --- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/openwire/v3/SubscriptionInfoTest.java (original) +++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/openwire/v3/SubscriptionInfoTest.java Mon Aug 6 10:07:55 2007 @@ -55,5 +55,6 @@ info.setDestination(createActiveMQDestination("Destination:2")); info.setSelector("Selector:3"); info.setSubcriptionName("SubcriptionName:4"); + info.setSubscribedDestination(createActiveMQDestination("SubscribedDestination:5")); } } Modified: activemq/trunk/activemq-jpa-store/src/main/java/org/apache/activemq/store/jpa/JPATopicMessageStore.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-jpa-store/src/main/java/org/apache/activemq/store/jpa/JPATopicMessageStore.java?view=diff&rev=563194&r1=563193&r2=563194 ============================================================================== --- activemq/trunk/activemq-jpa-store/src/main/java/org/apache/activemq/store/jpa/JPATopicMessageStore.java (original) +++ activemq/trunk/activemq-jpa-store/src/main/java/org/apache/activemq/store/jpa/JPATopicMessageStore.java Mon Aug 6 10:07:55 2007 @@ -59,14 +59,15 @@ adapter.commitEntityManager(context,manager); } - public void addSubsciption(String clientId, String subscriptionName, String selector, boolean retroactive) throws IOException { + public void addSubsciption(SubscriptionInfo info, boolean retroactive) throws IOException { EntityManager manager = adapter.beginEntityManager(null); try { StoredSubscription ss = new StoredSubscription(); - ss.setClientId(clientId); - ss.setSubscriptionName(subscriptionName); + ss.setClientId(info.getClientId()); + ss.setSubscriptionName(info.getSubscriptionName()); ss.setDestination(destinationName); - ss.setSelector(selector); + ss.setSelector(info.getSelector()); + ss.setSubscribedDestination(info.getSubscribedDestination().getQualifiedName()); ss.setLastAckedId(-1); if( !retroactive ) { @@ -125,7 +126,8 @@ info.setClientId(ss.getClientId()); info.setDestination(destination); info.setSelector(ss.getSelector()); - info.setSubcriptionName(ss.getSubscriptionName()); + info.setSubscriptionName(ss.getSubscriptionName()); + info.setSubscribedDestination(toSubscribedDestination(ss)); l.add(info); } @@ -171,7 +173,8 @@ rc.setClientId(ss.getClientId()); rc.setDestination(destination); rc.setSelector(ss.getSelector()); - rc.setSubcriptionName(ss.getSubscriptionName()); + rc.setSubscriptionName(ss.getSubscriptionName()); + rc.setSubscribedDestination(toSubscribedDestination(ss)); } } catch (Throwable e) { adapter.rollbackEntityManager(null,manager); @@ -179,6 +182,12 @@ } adapter.commitEntityManager(null,manager); return rc; + } + + private ActiveMQDestination toSubscribedDestination(StoredSubscription ss) { + if( ss.getSubscribedDestination() == null ) + return null; + return ActiveMQDestination.createDestination(ss.getSubscribedDestination(), ActiveMQDestination.QUEUE_TYPE); } public void recoverNextMessages(String clientId, String subscriptionName, int maxReturned, MessageRecoveryListener listener) throws Exception { Modified: activemq/trunk/activemq-jpa-store/src/main/java/org/apache/activemq/store/jpa/JPATopicReferenceStore.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-jpa-store/src/main/java/org/apache/activemq/store/jpa/JPATopicReferenceStore.java?view=diff&rev=563194&r1=563193&r2=563194 ============================================================================== --- activemq/trunk/activemq-jpa-store/src/main/java/org/apache/activemq/store/jpa/JPATopicReferenceStore.java (original) +++ activemq/trunk/activemq-jpa-store/src/main/java/org/apache/activemq/store/jpa/JPATopicReferenceStore.java Mon Aug 6 10:07:55 2007 @@ -57,14 +57,15 @@ adapter.commitEntityManager(context,manager); } - public void addSubsciption(String clientId, String subscriptionName, String selector, boolean retroactive) throws IOException { + public void addSubsciption(SubscriptionInfo info, boolean retroactive) throws IOException { EntityManager manager = adapter.beginEntityManager(null); try { StoredSubscription ss = new StoredSubscription(); - ss.setClientId(clientId); - ss.setSubscriptionName(subscriptionName); + ss.setClientId(info.getClientId()); + ss.setSubscriptionName(info.getSubcriptionName()); ss.setDestination(destinationName); - ss.setSelector(selector); + ss.setSelector(info.getSelector()); + ss.setSubscribedDestination(info.getSubscribedDestination().getQualifiedName()); ss.setLastAckedId(-1); if( !retroactive ) { @@ -123,7 +124,8 @@ info.setClientId(ss.getClientId()); info.setDestination(destination); info.setSelector(ss.getSelector()); - info.setSubcriptionName(ss.getSubscriptionName()); + info.setSubscriptionName(ss.getSubscriptionName()); + info.setSubscribedDestination(toSubscribedDestination(ss)); l.add(info); } @@ -136,6 +138,12 @@ adapter.commitEntityManager(null,manager); return rc; } + + private ActiveMQDestination toSubscribedDestination(StoredSubscription ss) { + if( ss.getSubscribedDestination() == null ) + return null; + return ActiveMQDestination.createDestination(ss.getSubscribedDestination(), ActiveMQDestination.QUEUE_TYPE); + } public int getMessageCount(String clientId, String subscriptionName) throws IOException { Long rc; @@ -169,7 +177,8 @@ rc.setClientId(ss.getClientId()); rc.setDestination(destination); rc.setSelector(ss.getSelector()); - rc.setSubcriptionName(ss.getSubscriptionName()); + rc.setSubscriptionName(ss.getSubscriptionName()); + rc.setSubscribedDestination(toSubscribedDestination(ss)); } } catch (Throwable e) { adapter.rollbackEntityManager(null,manager); Modified: activemq/trunk/activemq-jpa-store/src/main/java/org/apache/activemq/store/jpa/model/StoredSubscription.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-jpa-store/src/main/java/org/apache/activemq/store/jpa/model/StoredSubscription.java?view=diff&rev=563194&r1=563193&r2=563194 ============================================================================== --- activemq/trunk/activemq-jpa-store/src/main/java/org/apache/activemq/store/jpa/model/StoredSubscription.java (original) +++ activemq/trunk/activemq-jpa-store/src/main/java/org/apache/activemq/store/jpa/model/StoredSubscription.java Mon Aug 6 10:07:55 2007 @@ -106,7 +106,8 @@ private long lastAckedId; @Basic private String selector; - + @Basic + private String subscribedDestination; public long getLastAckedId() { return lastAckedId; @@ -154,5 +155,13 @@ public void setId(long id) { this.id = id; + } + + public String getSubscribedDestination() { + return subscribedDestination; + } + + public void setSubscribedDestination(String subscribedDestination) { + this.subscribedDestination = subscribedDestination; } }