activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rajdav...@apache.org
Subject svn commit: r381644 - in /incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq: broker/jmx/ broker/region/ broker/region/policy/ memory/list/ store/ store/jdbc/ store/jdbc/adapter/ store/journal/ store/memory/
Date Tue, 28 Feb 2006 12:55:27 GMT
Author: rajdavies
Date: Tue Feb 28 04:55:21 2006
New Revision: 381644

URL: http://svn.apache.org/viewcvs?rev=381644&view=rev
Log:
Added support for browsing Topics

Modified:
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/QueueView.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/TopicView.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/TopicViewMBean.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Destination.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/FixedCountSubscriptionRecoveryPolicy.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/FixedSizedSubscriptionRecoveryPolicy.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/LastImageSubscriptionRecoveryPolicy.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/NoSubscriptionRecoveryPolicy.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/QueryBasedSubscriptionRecoveryPolicy.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/SubscriptionRecoveryPolicy.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/TimedSubscriptionRecoveryPolicy.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/memory/list/DestinationBasedMessageList.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/memory/list/MessageList.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/memory/list/SimpleMessageList.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/MessageRecoveryListener.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageRecoveryListener.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCTopicMessageStore.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/QuickJournalMessageStore.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/QuickJournalTopicMessageStore.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryMessageStore.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryTopicMessageStore.java

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/QueueView.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/QueueView.java?rev=381644&r1=381643&r2=381644&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/QueueView.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/QueueView.java Tue Feb 28 04:55:21 2006
@@ -1,111 +1,39 @@
 /**
- *
+ * 
  * Copyright 2005-2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * 
  * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * 
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
  */
 package org.apache.activemq.broker.jmx;
 
 import javax.management.openmbean.CompositeData;
-import javax.management.openmbean.CompositeDataSupport;
-import javax.management.openmbean.CompositeType;
 import javax.management.openmbean.OpenDataException;
-import javax.management.openmbean.TabularData;
-import javax.management.openmbean.TabularDataSupport;
-import javax.management.openmbean.TabularType;
-
-import org.apache.activemq.broker.jmx.OpenTypeSupport.OpenTypeFactory;
 import org.apache.activemq.broker.region.Queue;
-import org.apache.activemq.command.ActiveMQMessage;
 import org.apache.activemq.command.Message;
-
-public class QueueView implements QueueViewMBean {
-
-    private final Queue destination;
-
-    public QueueView(Queue destination) {
-        this.destination = destination;
+public class QueueView extends DestinationView implements QueueViewMBean{
+    public QueueView(Queue destination){
+        super(destination);
     }
 
-    public void gc() {
-        destination.gc();
-    }
-    public void resetStatistics() {
-        destination.getDestinationStatistics().reset();
-    }
-
-    public long getEnqueueCount() {
-        return destination.getDestinationStatistics().getEnqueues().getCount();
-    
-    }
-    public long getDequeueCount() {
-        return destination.getDestinationStatistics().getDequeues().getCount();
-    }
-
-    public long getConsumerCount() {
-        return destination.getDestinationStatistics().getConsumers().getCount();
-    }
-    
-    public long getMessages() {
-        return destination.getDestinationStatistics().getMessages().getCount();
-    }
-    
-    public long getMessagesCached() {
-        return destination.getDestinationStatistics().getMessagesCached().getCount();
-    }
-    
-    public CompositeData[] browse() throws OpenDataException {
-        Message[] messages = destination.browse();
-        CompositeData c[] = new CompositeData[messages.length];
-        for (int i = 0; i < c.length; i++) {
-            try {
-                c[i] = OpenTypeSupport.convert(messages[i]);
-            } catch (Throwable e) {
-                // TODO Auto-generated catch block
-                e.printStackTrace();
-            }
-        }
-        return c;
-    }
-    
-    public TabularData browseAsTable() throws OpenDataException {
-        OpenTypeFactory factory = OpenTypeSupport.getFactory(ActiveMQMessage.class);
-        
-        Message[] messages = destination.browse();
-        CompositeType ct = factory.getCompositeType();
-        TabularType tt = new TabularType("MessageList", "MessageList", ct, new String[]{"JMSMessageID"});
-        TabularDataSupport rc = new TabularDataSupport(tt);
-        for (int i = 0; i < messages.length; i++) {
-            rc.put(new CompositeDataSupport(ct, factory.getFields(messages[i])));
-        }
-        
-        return rc;
-    }
-
-    
-    public CompositeData getMessage(String messageId) throws OpenDataException {
-        Message rc = destination.getMessage(messageId);
-        if( rc ==null )
+    public CompositeData getMessage(String messageId) throws OpenDataException{
+        Message rc=((Queue) destination).getMessage(messageId);
+        if(rc==null)
             return null;
         return OpenTypeSupport.convert(rc);
     }
-    
-    public void removeMessage(String messageId) {
-        destination.removeMessage(messageId);
+
+    public void removeMessage(String messageId){
+        ((Queue) destination).removeMessage(messageId);
     }
 
-    public void purge() {
-        destination.purge();
+    public void purge(){
+        ((Queue) destination).purge();
     }
-    
 }

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/TopicView.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/TopicView.java?rev=381644&r1=381643&r2=381644&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/TopicView.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/TopicView.java Tue Feb 28 04:55:21 2006
@@ -1,56 +1,22 @@
 /**
- *
+ * 
  * Copyright 2005-2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * 
  * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * 
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
  */
 package org.apache.activemq.broker.jmx;
 
 import org.apache.activemq.broker.region.Topic;
-
-public class TopicView implements TopicViewMBean {
-
-    private final Topic destination;
-
-    public TopicView(Topic destination) {
-        this.destination = destination;
-    }
-
-    public void gc() {
-        destination.gc();
-    }
-    public void resetStatistics() {
-        destination.getDestinationStatistics().reset();
-    }
-
-    public long getEnqueueCount() {
-        return destination.getDestinationStatistics().getEnqueues().getCount();
+public class TopicView extends DestinationView implements TopicViewMBean{
     
+    public TopicView(Topic destination){
+        super(destination);
     }
-    public long getDequeueCount() {
-        return destination.getDestinationStatistics().getDequeues().getCount();
-    }
-
-    public long getConsumerCount() {
-        return destination.getDestinationStatistics().getConsumers().getCount();
-    }
-    
-    public long getMessages() {
-        return destination.getDestinationStatistics().getMessages().getCount();
-    }
-    
-    public long getMessagesCached() {
-        return destination.getDestinationStatistics().getMessagesCached().getCount();
-    }
-    
 }

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/TopicViewMBean.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/TopicViewMBean.java?rev=381644&r1=381643&r2=381644&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/TopicViewMBean.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/TopicViewMBean.java Tue Feb 28 04:55:21 2006
@@ -16,6 +16,10 @@
  */
 package org.apache.activemq.broker.jmx;
 
+import javax.management.openmbean.CompositeData;
+import javax.management.openmbean.OpenDataException;
+import javax.management.openmbean.TabularData;
+
 
 public interface TopicViewMBean {
     
@@ -27,5 +31,8 @@
     public long getConsumerCount();
     public long getMessages();
     public long getMessagesCached();
+    
+    public CompositeData[] browse() throws OpenDataException;
+    public TabularData browseAsTable() throws OpenDataException;
 
 }

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Destination.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Destination.java?rev=381644&r1=381643&r2=381644&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Destination.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Destination.java Tue Feb 28 04:55:21 2006
@@ -52,4 +52,6 @@
     DestinationStatistics getDestinationStatistics();
     MessageStore getMessageStore();
     DeadLetterStrategy getDeadLetterStrategy();
+    
+    public Message[] browse();
 }

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java?rev=381644&r1=381643&r2=381644&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java Tue Feb 28 04:55:21 2006
@@ -98,6 +98,9 @@
                 public void recoverMessageReference(String messageReference) throws Throwable {
                     throw new RuntimeException("Should not be called.");
                 }
+                
+                public void finished(){
+                }
             });
         }
     }

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java?rev=381644&r1=381643&r2=381644&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java Tue Feb 28 04:55:21 2006
@@ -17,7 +17,7 @@
 package org.apache.activemq.broker.region;
 
 import java.io.IOException;
-
+import java.util.Set;
 import org.apache.activemq.advisory.AdvisorySupport;
 import org.apache.activemq.broker.ConnectionContext;
 import org.apache.activemq.broker.region.policy.DeadLetterStrategy;
@@ -41,9 +41,11 @@
 import org.apache.activemq.thread.Valve;
 import org.apache.activemq.transaction.Synchronization;
 import org.apache.activemq.util.SubscriptionKey;
-
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import edu.emory.mathcs.backport.java.util.concurrent.ConcurrentHashMap;
 import edu.emory.mathcs.backport.java.util.concurrent.CopyOnWriteArrayList;
+import edu.emory.mathcs.backport.java.util.concurrent.CopyOnWriteArraySet;
 
 /**
  * The Topic is a destination that sends a copy of a message to every active
@@ -52,7 +54,7 @@
  * @version $Revision: 1.21 $
  */
 public class Topic implements Destination {
-
+    private static final Log log = LogFactory.getLog(Topic.class);
     protected final ActiveMQDestination destination;
     protected final CopyOnWriteArrayList consumers = new CopyOnWriteArrayList();
     protected final Valve dispatchValve = new Valve(true);
@@ -196,6 +198,9 @@
                 public void recoverMessageReference(String messageReference) throws Throwable {
                     throw new RuntimeException("Should not be called.");
                 }
+                
+                public void finished(){
+                }
             });
             
             if( true && subscription.getConsumerInfo().isRetroactive() ) {
@@ -289,6 +294,30 @@
 
     public void stop() throws Exception {
         this.subscriptionRecoveryPolicy.stop();
+    }
+    
+    public Message[] browse(){
+        final Set result=new CopyOnWriteArraySet();
+        try{
+            store.recover(new MessageRecoveryListener(){
+                public void recoverMessage(Message message) throws Throwable{
+                    result.add(message);
+                }
+
+                public void recoverMessageReference(String messageReference) throws Throwable{}
+
+                public void finished(){}
+            });
+            Message[] msgs=subscriptionRecoveryPolicy.browse(getActiveMQDestination());
+            if(msgs!=null){
+                for(int i=0;i<msgs.length;i++){
+                    result.add(msgs[i]);
+                }
+            }
+        }catch(Throwable e){
+            log.warn("Failed to browse Topic: "+getActiveMQDestination().getPhysicalName(),e);
+        }
+        return (Message[]) result.toArray(new Message[result.size()]);
     }
 
     // Properties

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/FixedCountSubscriptionRecoveryPolicy.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/FixedCountSubscriptionRecoveryPolicy.java?rev=381644&r1=381643&r2=381644&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/FixedCountSubscriptionRecoveryPolicy.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/FixedCountSubscriptionRecoveryPolicy.java Tue Feb 28 04:55:21 2006
@@ -1,96 +1,112 @@
 /**
- *
+ * 
  * Copyright 2005-2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * 
  * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * 
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
  */
 package org.apache.activemq.broker.region.policy;
 
+import java.util.ArrayList;
+import java.util.List;
 import org.apache.activemq.broker.ConnectionContext;
 import org.apache.activemq.broker.region.MessageReference;
 import org.apache.activemq.broker.region.Subscription;
 import org.apache.activemq.broker.region.Topic;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.Message;
+import org.apache.activemq.filter.DestinationFilter;
 import org.apache.activemq.filter.MessageEvaluationContext;
-
 /**
- * This implementation of {@link SubscriptionRecoveryPolicy} will only keep 
- * the last message.
+ * This implementation of {@link SubscriptionRecoveryPolicy} will only keep the last message.
  * 
  * @org.apache.xbean.XBean
  * 
  * @version $Revision$
  */
-public class FixedCountSubscriptionRecoveryPolicy implements SubscriptionRecoveryPolicy {
-
+public class FixedCountSubscriptionRecoveryPolicy implements SubscriptionRecoveryPolicy{
     volatile private MessageReference messages[];
     private int maximumSize=100;
     private int tail=0;
 
-    synchronized public boolean add(ConnectionContext context, MessageReference node) throws Throwable {
-        messages[tail++] = node;
-        if( tail >= messages.length )
-            tail = 0;
+    synchronized public boolean add(ConnectionContext context,MessageReference node) throws Throwable{
+        messages[tail++]=node;
+        if(tail>=messages.length)
+            tail=0;
         return true;
     }
 
-    synchronized public void recover(ConnectionContext context, Topic topic, Subscription sub) throws Throwable {
+    synchronized public void recover(ConnectionContext context,Topic topic,Subscription sub) throws Throwable{
         // Re-dispatch the last message seen.
-        int t = tail;
+        int t=tail;
         // The buffer may not have rolled over yet..., start from the front
-        if( messages[t]==null )
+        if(messages[t]==null)
             t=0;
         // Well the buffer is really empty then.
-        if( messages[t]==null )
+        if(messages[t]==null)
             return;
-        
         // Keep dispatching until t hit's tail again.
-        MessageEvaluationContext msgContext = context.getMessageEvaluationContext();
-        do {            
-             MessageReference node = messages[t];
-             try {
-                 msgContext.setDestination(node.getRegionDestination().getActiveMQDestination());
-                 msgContext.setMessageReference(node);                        
-                 if (sub.matches(node, msgContext)) {
-                     sub.add(node);
-                 }
-             } finally {
-                 msgContext.clear();
-             }
-             t++;
-             if( t >= messages.length )
-                 t = 0;
-        } while( t!=tail );
-        
-    }
-
-    public void start() throws Exception {
-        messages = new MessageReference[maximumSize];
+        MessageEvaluationContext msgContext=context.getMessageEvaluationContext();
+        do{
+            MessageReference node=messages[t];
+            try{
+                msgContext.setDestination(node.getRegionDestination().getActiveMQDestination());
+                msgContext.setMessageReference(node);
+                if(sub.matches(node,msgContext)){
+                    sub.add(node);
+                }
+            }finally{
+                msgContext.clear();
+            }
+            t++;
+            if(t>=messages.length)
+                t=0;
+        }while(t!=tail);
+    }
+
+    public void start() throws Exception{
+        messages=new MessageReference[maximumSize];
     }
 
-    public void stop() throws Exception {
-        messages = null;
+    public void stop() throws Exception{
+        messages=null;
     }
-    
-    public int getMaximumSize() {
+
+    public int getMaximumSize(){
         return maximumSize;
     }
 
     /**
      * Sets the maximum number of messages that this destination will hold around in RAM
      */
-    public void setMaximumSize(int maximumSize) {
-        this.maximumSize = maximumSize;
+    public void setMaximumSize(int maximumSize){
+        this.maximumSize=maximumSize;
     }
 
-
+    public Message[] browse(ActiveMQDestination destination) throws Throwable{
+        List result=new ArrayList();
+        DestinationFilter filter=DestinationFilter.parseFilter(destination);
+        int t=tail;
+        if(messages[t]==null)
+            t=0;
+        if(messages[t]!=null){
+            do{
+                MessageReference ref=messages[t];
+                Message message=ref.getMessage();
+                if(filter.matches(message.getDestination())){
+                    result.add(message);
+                }
+                t++;
+                if(t>=messages.length)
+                    t=0;
+            }while(t!=tail);
+        }
+        return (Message[]) result.toArray(new Message[result.size()]);
+    }
 }

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/FixedSizedSubscriptionRecoveryPolicy.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/FixedSizedSubscriptionRecoveryPolicy.java?rev=381644&r1=381643&r2=381644&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/FixedSizedSubscriptionRecoveryPolicy.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/FixedSizedSubscriptionRecoveryPolicy.java Tue Feb 28 04:55:21 2006
@@ -20,11 +20,16 @@
 import org.apache.activemq.broker.region.MessageReference;
 import org.apache.activemq.broker.region.Subscription;
 import org.apache.activemq.broker.region.Topic;
+import org.apache.activemq.broker.region.policy.TimedSubscriptionRecoveryPolicy.TimestampWrapper;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.Message;
+import org.apache.activemq.filter.DestinationFilter;
 import org.apache.activemq.filter.MessageEvaluationContext;
 import org.apache.activemq.memory.list.DestinationBasedMessageList;
 import org.apache.activemq.memory.list.MessageList;
 import org.apache.activemq.memory.list.SimpleMessageList;
 
+import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
 
@@ -103,6 +108,10 @@
 
     public void setUseSharedBuffer(boolean useSharedBuffer) {
         this.useSharedBuffer = useSharedBuffer;
+    }
+    
+    public Message[] browse(ActiveMQDestination destination) throws Throwable{
+        return buffer.browse(destination);
     }
 
     // Implementation methods

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/LastImageSubscriptionRecoveryPolicy.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/LastImageSubscriptionRecoveryPolicy.java?rev=381644&r1=381643&r2=381644&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/LastImageSubscriptionRecoveryPolicy.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/LastImageSubscriptionRecoveryPolicy.java Tue Feb 28 04:55:21 2006
@@ -16,10 +16,15 @@
  */
 package org.apache.activemq.broker.region.policy;
 
+import java.util.ArrayList;
+import java.util.List;
 import org.apache.activemq.broker.ConnectionContext;
 import org.apache.activemq.broker.region.MessageReference;
 import org.apache.activemq.broker.region.Subscription;
 import org.apache.activemq.broker.region.Topic;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.Message;
+import org.apache.activemq.filter.DestinationFilter;
 import org.apache.activemq.filter.MessageEvaluationContext;
 
 /**
@@ -60,6 +65,15 @@
     }
 
     public void stop() throws Exception {
+    }
+
+    public Message[] browse(ActiveMQDestination destination) throws Throwable{
+        List result = new ArrayList();
+        DestinationFilter filter=DestinationFilter.parseFilter(destination);
+        if (filter.matches(lastImage.getMessage().getDestination())){
+            result.add(lastImage.getMessage());
+        }
+        return (Message[])result.toArray(new Message[result.size()]);
     }
 
 }

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/NoSubscriptionRecoveryPolicy.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/NoSubscriptionRecoveryPolicy.java?rev=381644&r1=381643&r2=381644&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/NoSubscriptionRecoveryPolicy.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/NoSubscriptionRecoveryPolicy.java Tue Feb 28 04:55:21 2006
@@ -20,6 +20,8 @@
 import org.apache.activemq.broker.region.MessageReference;
 import org.apache.activemq.broker.region.Subscription;
 import org.apache.activemq.broker.region.Topic;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.Message;
 
 /**
  * This is the default Topic recovery policy which does not recover any messages.
@@ -41,6 +43,10 @@
     }
 
     public void stop() throws Exception {
+    }
+
+    public Message[] browse(ActiveMQDestination dest) throws Throwable{
+        return new Message[0];
     }
 
 }

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/QueryBasedSubscriptionRecoveryPolicy.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/QueryBasedSubscriptionRecoveryPolicy.java?rev=381644&r1=381643&r2=381644&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/QueryBasedSubscriptionRecoveryPolicy.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/QueryBasedSubscriptionRecoveryPolicy.java Tue Feb 28 04:55:21 2006
@@ -98,6 +98,10 @@
     public void setQuery(MessageQuery query) {
         this.query = query;
     }
+    
+    public org.apache.activemq.command.Message[] browse(ActiveMQDestination dest) throws Throwable{
+        return new org.apache.activemq.command.Message[0];
+    }
 
     protected void dispatchInitialMessage(Message message,  Destination regionDestination, MessageEvaluationContext msgContext, Subscription sub) {
         try {

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/SubscriptionRecoveryPolicy.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/SubscriptionRecoveryPolicy.java?rev=381644&r1=381643&r2=381644&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/SubscriptionRecoveryPolicy.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/SubscriptionRecoveryPolicy.java Tue Feb 28 04:55:21 2006
@@ -16,11 +16,14 @@
  */
 package org.apache.activemq.broker.region.policy;
 
+
 import org.apache.activemq.Service;
 import org.apache.activemq.broker.ConnectionContext;
 import org.apache.activemq.broker.region.MessageReference;
 import org.apache.activemq.broker.region.Subscription;
 import org.apache.activemq.broker.region.Topic;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.Message;
 
 /**
  * Abstraction to allow different recovery policies to be plugged
@@ -35,8 +38,9 @@
      * A message was sent to the destination.
      * 
      * @param context
+     * @param message 
      * @param node
-     * @return TODO
+     * @return true if successful
      * @throws Throwable
      */
     boolean add(ConnectionContext context, MessageReference message) throws Throwable;
@@ -45,11 +49,19 @@
      * Let a subscription recover message held by the policy.
      * 
      * @param context
-     * @param topic TODO
-     * @param topic 
+     * @param topic
+     * @param sub 
      * @param node
      * @throws Throwable
      */
     void recover(ConnectionContext context, Topic topic, Subscription sub) throws Throwable;
+    
+    
+    /**
+     * @param dest 
+     * @return messages
+     * @throws Throwable 
+     */
+    Message[] browse(ActiveMQDestination dest) throws Throwable;
 
 }

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/TimedSubscriptionRecoveryPolicy.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/TimedSubscriptionRecoveryPolicy.java?rev=381644&r1=381643&r2=381644&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/TimedSubscriptionRecoveryPolicy.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/TimedSubscriptionRecoveryPolicy.java Tue Feb 28 04:55:21 2006
@@ -16,19 +16,21 @@
  */
 package org.apache.activemq.broker.region.policy;
 
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
 import org.apache.activemq.broker.ConnectionContext;
 import org.apache.activemq.broker.region.MessageReference;
 import org.apache.activemq.broker.region.Subscription;
 import org.apache.activemq.broker.region.Topic;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.Message;
+import org.apache.activemq.filter.DestinationFilter;
 import org.apache.activemq.filter.MessageEvaluationContext;
 import org.apache.activemq.thread.Scheduler;
 
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Iterator;
-import java.util.LinkedList;
-import java.util.List;
-
 /**
  * This implementation of {@link SubscriptionRecoveryPolicy} will keep a timed
  * buffer of messages around in memory and use that to recover new
@@ -121,6 +123,21 @@
 
     public void setRecoverDuration(long recoverDuration) {
         this.recoverDuration = recoverDuration;
+    }
+    
+    public Message[] browse(ActiveMQDestination destination) throws Throwable{
+        List result = new ArrayList();
+        ArrayList copy = new ArrayList(buffer);
+        DestinationFilter filter=DestinationFilter.parseFilter(destination);
+        for (Iterator iter = copy.iterator(); iter.hasNext();) {
+            TimestampWrapper timestampWrapper = (TimestampWrapper) iter.next();
+            MessageReference ref = timestampWrapper.message;
+            Message message=ref.getMessage();
+            if (filter.matches(message.getDestination())){
+                result.add(message);
+            }
+        }
+        return (Message[]) result.toArray(new Message[result.size()]);
     }
 
 }

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/memory/list/DestinationBasedMessageList.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/memory/list/DestinationBasedMessageList.java?rev=381644&r1=381643&r2=381644&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/memory/list/DestinationBasedMessageList.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/memory/list/DestinationBasedMessageList.java Tue Feb 28 04:55:21 2006
@@ -22,11 +22,11 @@
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-
 import org.apache.activemq.broker.region.MessageReference;
 import org.apache.activemq.broker.region.Subscription;
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.ActiveMQMessage;
+import org.apache.activemq.command.Message;
 import org.apache.activemq.filter.DestinationMap;
 import org.apache.activemq.memory.buffer.MessageBuffer;
 import org.apache.activemq.memory.buffer.MessageQueue;
@@ -70,9 +70,13 @@
     }
 
     public List getMessages(Subscription sub) {
+        return getMessages(sub.getConsumerInfo().getDestination());
+    }
+    
+    protected  List getMessages(ActiveMQDestination destination) {
         Set set = null;
         synchronized (lock) {
-            set = subscriptionIndex.get(sub.getConsumerInfo().getDestination());
+            set = subscriptionIndex.get(destination);
         }
         List answer = new ArrayList();
         for (Iterator iter = set.iterator(); iter.hasNext();) {
@@ -81,6 +85,12 @@
         }
         return answer;
     }
+    
+    public Message[] browse(ActiveMQDestination destination) {
+        List result = getMessages(destination);
+        return (Message[])result.toArray(new Message[result.size()]);
+    }
+
 
     public void clear() {
         messageBuffer.clear();

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/memory/list/MessageList.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/memory/list/MessageList.java?rev=381644&r1=381643&r2=381644&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/memory/list/MessageList.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/memory/list/MessageList.java Tue Feb 28 04:55:21 2006
@@ -18,6 +18,8 @@
 
 import org.apache.activemq.broker.region.MessageReference;
 import org.apache.activemq.broker.region.Subscription;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.Message;
 
 import java.util.List;
 
@@ -35,6 +37,14 @@
      * Returns the current list of MessageReference objects for the given subscription
      */
     List getMessages(Subscription sub);
+    
+    /**
+     * @param destination
+     * @return an array of Messages that match the destination
+     */
+    Message[] browse(ActiveMQDestination destination);
 
     void clear();
+    
+    
 }

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/memory/list/SimpleMessageList.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/memory/list/SimpleMessageList.java?rev=381644&r1=381643&r2=381644&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/memory/list/SimpleMessageList.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/memory/list/SimpleMessageList.java Tue Feb 28 04:55:21 2006
@@ -18,8 +18,16 @@
 
 import org.apache.activemq.broker.region.MessageReference;
 import org.apache.activemq.broker.region.Subscription;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.Message;
+import org.apache.activemq.filter.DestinationFilter;
+import org.apache.activemq.network.DemandForwardingBridge;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 
+import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
 
@@ -32,7 +40,7 @@
  * @version $Revision: 1.1 $
  */
 public class SimpleMessageList implements MessageList {
-
+    static final private Log log=LogFactory.getLog(SimpleMessageList.class);
     private LinkedList list = new LinkedList();
     private int maximumSize = 100 * 64 * 1024;
     private int size;
@@ -59,6 +67,27 @@
 
     public List getMessages(Subscription sub) {
         return getList();
+    }
+    
+    public Message[] browse(ActiveMQDestination destination) {
+        List result = new ArrayList();
+        DestinationFilter filter=DestinationFilter.parseFilter(destination);
+        synchronized(lock){
+            for (Iterator i = list.iterator(); i.hasNext();){
+                MessageReference ref = (MessageReference)i.next();
+                Message msg;
+                try{
+                    msg=ref.getMessage();
+                    if (filter.matches(msg.getDestination())){
+                        result.add(msg);
+                    }
+                }catch(IOException e){
+                   log.error("Failed to get Message from MessageReference: " + ref,e);
+                }
+                
+            }
+        }
+        return (Message[])result.toArray(new Message[result.size()]);
     }
 
     /**

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/MessageRecoveryListener.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/MessageRecoveryListener.java?rev=381644&r1=381643&r2=381644&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/MessageRecoveryListener.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/MessageRecoveryListener.java Tue Feb 28 04:55:21 2006
@@ -24,4 +24,5 @@
 public interface MessageRecoveryListener {
     void recoverMessage(Message message) throws Throwable;
     void recoverMessageReference(String messageReference) throws Throwable;
+    void finished();
 }

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageRecoveryListener.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageRecoveryListener.java?rev=381644&r1=381643&r2=381644&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageRecoveryListener.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageRecoveryListener.java Tue Feb 28 04:55:21 2006
@@ -25,4 +25,5 @@
 public interface JDBCMessageRecoveryListener {
     void recoverMessage(long sequenceId, byte[] message) throws IOException, Throwable;
     void recoverMessageReference(String reference) throws IOException, Throwable;
+    void finished();
 }

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java?rev=381644&r1=381643&r2=381644&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java Tue Feb 28 04:55:21 2006
@@ -158,6 +158,9 @@
                 public void recoverMessageReference(String reference) throws IOException, Throwable {
                     listener.recoverMessageReference(reference);
                 }
+                public void finished(){
+                    listener.finished();
+                }
             });
         } catch (SQLException e) {
             JDBCPersistenceAdapter.log("JDBC Failure: ",e);

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCTopicMessageStore.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCTopicMessageStore.java?rev=381644&r1=381643&r2=381644&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCTopicMessageStore.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCTopicMessageStore.java Tue Feb 28 04:55:21 2006
@@ -75,6 +75,10 @@
                         public void recoverMessageReference(String reference) throws IOException, Throwable {
                             listener.recoverMessageReference(reference);
                         }
+                        
+                        public void finished(){
+                            listener.finished();
+                        }
                     });
         } catch (SQLException e) {
             JDBCPersistenceAdapter.log("JDBC Failure: ",e);

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java?rev=381644&r1=381643&r2=381644&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java Tue Feb 28 04:55:21 2006
@@ -337,12 +337,12 @@
                     listener.recoverMessage(rs.getLong(1), getBinaryData(rs, 2));
                 }
             }
-
         }
         finally {
             close(rs);
             close(s);
-        }
+            listener.finished();
+        }     
     }
 
     
@@ -402,7 +402,9 @@
         finally {
             close(rs);
             close(s);
+            listener.finished();
         }
+        
     }
 
     /**

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/QuickJournalMessageStore.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/QuickJournalMessageStore.java?rev=381644&r1=381643&r2=381644&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/QuickJournalMessageStore.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/QuickJournalMessageStore.java Tue Feb 28 04:55:21 2006
@@ -360,6 +360,10 @@
                 Message message = (Message) peristenceAdapter.readCommand(loc);
                 listener.recoverMessage(message);
             }
+            
+            public void finished(){
+                listener.finished();
+            }
         });
     }
 

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/QuickJournalTopicMessageStore.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/QuickJournalTopicMessageStore.java?rev=381644&r1=381643&r2=381644&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/QuickJournalTopicMessageStore.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/QuickJournalTopicMessageStore.java Tue Feb 28 04:55:21 2006
@@ -63,6 +63,10 @@
                 Message message = (Message) peristenceAdapter.readCommand(loc);
                 listener.recoverMessage(message);
             }
+            
+            public void finished(){
+                listener.finished();
+            }
         });
 
     }

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryMessageStore.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryMessageStore.java?rev=381644&r1=381643&r2=381644&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryMessageStore.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryMessageStore.java Tue Feb 28 04:55:21 2006
@@ -82,6 +82,7 @@
                     listener.recoverMessage((Message) msg);
                 }
             }
+            listener.finished();
         }
     }
 

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryTopicMessageStore.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryTopicMessageStore.java?rev=381644&r1=381643&r2=381644&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryTopicMessageStore.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryTopicMessageStore.java Tue Feb 28 04:55:21 2006
@@ -107,6 +107,7 @@
                     pastLastAck=entry.getKey().equals(lastAck);
                 }
             }
+            listener.finished();
         }
     }
 



Mime
View raw message