activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rajdav...@apache.org
Subject svn commit: r438917 [1/2] - in /incubator/activemq/trunk/activemq-core/src: main/java/org/apache/activemq/broker/jmx/ main/java/org/apache/activemq/broker/region/ main/java/org/apache/activemq/kaha/ main/java/org/apache/activemq/kaha/impl/ test/java/or...
Date Thu, 31 Aug 2006 13:42:18 GMT
Author: rajdavies
Date: Thu Aug 31 06:42:14 2006
New Revision: 438917

URL: http://svn.apache.org/viewvc?rev=438917&view=rev
Log:
Apply patch http://issues.apache.org/activemq/browse/AMQ-877

Added:
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFactory.java   (with props)
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFactoryImpl.java   (with props)
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/EndOfBrowseMarkerQueueMessageReference.java   (with props)
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueMessageReference.java   (with props)
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/CachedContainerListIterator.java   (with props)
    incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/kaha/VolumeTest.java   (with props)
    incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/kaha/impl/CachedListContainerImplTest.java   (with props)
Modified:
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedQueueRegion.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedRegionBroker.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedTempQueueRegion.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedTempTopicRegion.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedTopicRegion.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Destination.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/IndirectMessageReference.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/MessageReference.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueBrowserSubscription.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueRegion.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueSubscription.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TempQueueRegion.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TempTopicRegion.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicRegion.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/ListContainer.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/Store.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/BaseContainerImpl.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/ContainerListIterator.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/IndexRootContainer.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/KahaStore.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/ListContainerImpl.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/MapContainerImpl.java
    incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/ForwardingBridgeTest.java
    incubator/activemq/trunk/activemq-core/src/test/resources/org/apache/activemq/broker/store/kahabroker.xml

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedQueueRegion.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedQueueRegion.java?rev=438917&r1=438916&r2=438917&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedQueueRegion.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedQueueRegion.java Thu Aug 31 06:42:14 2006
@@ -22,22 +22,21 @@
 
 import org.apache.activemq.broker.ConnectionContext;
 import org.apache.activemq.broker.region.Destination;
+import org.apache.activemq.broker.region.DestinationFactory;
 import org.apache.activemq.broker.region.DestinationStatistics;
 import org.apache.activemq.broker.region.QueueRegion;
 import org.apache.activemq.broker.region.Subscription;
-import org.apache.activemq.broker.region.policy.PolicyMap;
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.ConsumerInfo;
 import org.apache.activemq.memory.UsageManager;
-import org.apache.activemq.store.PersistenceAdapter;
 import org.apache.activemq.thread.TaskRunnerFactory;
 
 public class ManagedQueueRegion extends QueueRegion {
 
     private final ManagedRegionBroker regionBroker;
 
-    public ManagedQueueRegion(ManagedRegionBroker broker, DestinationStatistics destinationStatistics, UsageManager memoryManager, TaskRunnerFactory taskRunnerFactory, PersistenceAdapter persistenceAdapter) {
-        super(broker,destinationStatistics, memoryManager, taskRunnerFactory, persistenceAdapter);
+    public ManagedQueueRegion(ManagedRegionBroker broker, DestinationStatistics destinationStatistics, UsageManager memoryManager, TaskRunnerFactory taskRunnerFactory, DestinationFactory destinationFactory) {
+        super(broker,destinationStatistics, memoryManager, taskRunnerFactory, destinationFactory);
         regionBroker = broker;
     }
 

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedRegionBroker.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedRegionBroker.java?rev=438917&r1=438916&r2=438917&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedRegionBroker.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedRegionBroker.java Thu Aug 31 06:42:14 2006
@@ -42,6 +42,8 @@
 import org.apache.activemq.broker.ConnectionContext;
 import org.apache.activemq.broker.jmx.OpenTypeSupport.OpenTypeFactory;
 import org.apache.activemq.broker.region.Destination;
+import org.apache.activemq.broker.region.DestinationFactory;
+import org.apache.activemq.broker.region.DestinationFactoryImpl;
 import org.apache.activemq.broker.region.DestinationInterceptor;
 import org.apache.activemq.broker.region.Queue;
 import org.apache.activemq.broker.region.Region;
@@ -89,9 +91,9 @@
     private Broker contextBroker;
 
     public ManagedRegionBroker(BrokerService brokerService,MBeanServer mbeanServer,ObjectName brokerObjectName,
-                    TaskRunnerFactory taskRunnerFactory,UsageManager memoryManager,PersistenceAdapter adapter, DestinationInterceptor destinationInterceptor)
+                    TaskRunnerFactory taskRunnerFactory,UsageManager memoryManager, DestinationFactory destinationFactory, DestinationInterceptor destinationInterceptor)
                     throws IOException{
-        super(brokerService,taskRunnerFactory,memoryManager,adapter, destinationInterceptor);
+        super(brokerService,taskRunnerFactory,memoryManager, destinationFactory, destinationInterceptor);
         this.mbeanServer=mbeanServer;
         this.brokerObjectName=brokerObjectName;
     }
@@ -119,33 +121,39 @@
     }
 
     protected Region createQueueRegion(UsageManager memoryManager,TaskRunnerFactory taskRunnerFactory,
-                    PersistenceAdapter adapter){
-        return new ManagedQueueRegion(this,destinationStatistics,memoryManager,taskRunnerFactory,adapter);
+            DestinationFactory destinationFactory){
+        return new ManagedQueueRegion(this,destinationStatistics,memoryManager,taskRunnerFactory,destinationFactory);
     }
 
-    protected Region createTempQueueRegion(UsageManager memoryManager,TaskRunnerFactory taskRunnerFactory){
-        return new ManagedTempQueueRegion(this,destinationStatistics,memoryManager,taskRunnerFactory);
+    protected Region createTempQueueRegion(UsageManager memoryManager,TaskRunnerFactory taskRunnerFactory, DestinationFactory destinationFactory){
+        return new ManagedTempQueueRegion(this,destinationStatistics,memoryManager,taskRunnerFactory, destinationFactory);
     }
 
-    protected Region createTempTopicRegion(UsageManager memoryManager,TaskRunnerFactory taskRunnerFactory){
-        return new ManagedTempTopicRegion(this,destinationStatistics,memoryManager,taskRunnerFactory);
+    protected Region createTempTopicRegion(UsageManager memoryManager,TaskRunnerFactory taskRunnerFactory, DestinationFactory destinationFactory){
+        return new ManagedTempTopicRegion(this,destinationStatistics,memoryManager,taskRunnerFactory, destinationFactory);
     }
 
     protected Region createTopicRegion(UsageManager memoryManager,TaskRunnerFactory taskRunnerFactory,
-                    PersistenceAdapter adapter){
-        return new ManagedTopicRegion(this,destinationStatistics,memoryManager,taskRunnerFactory,adapter);
+            DestinationFactory destinationFactory){
+        return new ManagedTopicRegion(this,destinationStatistics,memoryManager,taskRunnerFactory, destinationFactory);
     }
 
     public void register(ActiveMQDestination destName,Destination destination){
+        // TODO refactor to allow views for custom destinations
         try{
             ObjectName objectName=createObjectName(destName);
             DestinationView view;
-            if(destination instanceof Queue){
+            if (destination instanceof Queue) {
                 view=new QueueView(this,(Queue) destination);
-            }else{
+            } else if (destination instanceof Topic){
                 view=new TopicView(this,(Topic) destination);
+            } else {
+                view = null;
+                log.warn("JMX View is not supported for custom destination: " + destination);
+            }
+            if (view != null) {
+                registerDestination(objectName,destName,view);
             }
-            registerDestination(objectName,destName,view);
         }catch(Exception e){
             log.error("Failed to register destination "+destName,e);
         }
@@ -288,13 +296,12 @@
 
     protected void buildExistingSubscriptions() throws Exception{
         Map subscriptions=new HashMap();
-        Set destinations=adaptor.getDestinations();
+        Set destinations=destinationFactory.getDestinations();
         if(destinations!=null){
             for(Iterator iter=destinations.iterator();iter.hasNext();){
                 ActiveMQDestination dest=(ActiveMQDestination) iter.next();
                 if(dest.isTopic()){
-                    TopicMessageStore store=adaptor.createTopicMessageStore((ActiveMQTopic) dest);
-                    SubscriptionInfo[] infos=store.getAllSubscriptions();
+                    SubscriptionInfo[] infos= destinationFactory.getAllDurableSubscriptions((ActiveMQTopic) dest);
                     if(infos!=null){
                         for(int i=0;i<infos.length;i++){
                             SubscriptionInfo info=infos[i];
@@ -356,10 +363,15 @@
     }
 
     protected List getSubscriberMessages(SubscriptionView view){
+        //TODO It is very dangerous operation for big backlogs
+        if (!(destinationFactory instanceof DestinationFactoryImpl)) {
+            throw new RuntimeException("unsupported by " + destinationFactory);
+        }
+        PersistenceAdapter adapter = ((DestinationFactoryImpl)destinationFactory).getPersistenceAdapter(); 
         final List result=new ArrayList();
         try{
             ActiveMQTopic topic=new ActiveMQTopic(view.getDestinationName());
-            TopicMessageStore store=adaptor.createTopicMessageStore(topic);
+            TopicMessageStore store=adapter.createTopicMessageStore(topic);
             store.recover(new MessageRecoveryListener(){
                 public void recoverMessage(Message message) throws Exception{
                     result.add(message);
@@ -373,6 +385,7 @@
             log.error("Failed to browse messages for Subscription "+view,e);
         }
         return result;
+        
     }
 
     protected ObjectName[] getTopics(){

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedTempQueueRegion.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedTempQueueRegion.java?rev=438917&r1=438916&r2=438917&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedTempQueueRegion.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedTempQueueRegion.java Thu Aug 31 06:42:14 2006
@@ -22,6 +22,7 @@
 
 import org.apache.activemq.broker.ConnectionContext;
 import org.apache.activemq.broker.region.Destination;
+import org.apache.activemq.broker.region.DestinationFactory;
 import org.apache.activemq.broker.region.DestinationStatistics;
 import org.apache.activemq.broker.region.Subscription;
 import org.apache.activemq.broker.region.TempQueueRegion;
@@ -34,8 +35,8 @@
 
     private final ManagedRegionBroker regionBroker;
     
-    public ManagedTempQueueRegion(ManagedRegionBroker regionBroker, DestinationStatistics destinationStatistics, UsageManager memoryManager, TaskRunnerFactory taskRunnerFactory) {
-        super(regionBroker,destinationStatistics, memoryManager, taskRunnerFactory);
+    public ManagedTempQueueRegion(ManagedRegionBroker regionBroker, DestinationStatistics destinationStatistics, UsageManager memoryManager, TaskRunnerFactory taskRunnerFactory, DestinationFactory destinationFactory) {
+        super(regionBroker,destinationStatistics, memoryManager, taskRunnerFactory, destinationFactory);
         this.regionBroker = regionBroker;
     }
     

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedTempTopicRegion.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedTempTopicRegion.java?rev=438917&r1=438916&r2=438917&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedTempTopicRegion.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedTempTopicRegion.java Thu Aug 31 06:42:14 2006
@@ -22,6 +22,7 @@
 
 import org.apache.activemq.broker.ConnectionContext;
 import org.apache.activemq.broker.region.Destination;
+import org.apache.activemq.broker.region.DestinationFactory;
 import org.apache.activemq.broker.region.DestinationStatistics;
 import org.apache.activemq.broker.region.Subscription;
 import org.apache.activemq.broker.region.TempTopicRegion;
@@ -34,8 +35,8 @@
 
     private final ManagedRegionBroker regionBroker;
 
-    public ManagedTempTopicRegion(ManagedRegionBroker regionBroker, DestinationStatistics destinationStatistics, UsageManager memoryManager, TaskRunnerFactory taskRunnerFactory) {
-        super(regionBroker,destinationStatistics, memoryManager, taskRunnerFactory);
+    public ManagedTempTopicRegion(ManagedRegionBroker regionBroker, DestinationStatistics destinationStatistics, UsageManager memoryManager, TaskRunnerFactory taskRunnerFactory, DestinationFactory destinationFactory) {
+        super(regionBroker, destinationStatistics, memoryManager, taskRunnerFactory, destinationFactory);
         this.regionBroker = regionBroker;
     }
     

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedTopicRegion.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedTopicRegion.java?rev=438917&r1=438916&r2=438917&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedTopicRegion.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedTopicRegion.java Thu Aug 31 06:42:14 2006
@@ -22,21 +22,21 @@
 
 import org.apache.activemq.broker.ConnectionContext;
 import org.apache.activemq.broker.region.Destination;
+import org.apache.activemq.broker.region.DestinationFactory;
 import org.apache.activemq.broker.region.DestinationStatistics;
 import org.apache.activemq.broker.region.Subscription;
 import org.apache.activemq.broker.region.TopicRegion;
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.ConsumerInfo;
 import org.apache.activemq.memory.UsageManager;
-import org.apache.activemq.store.PersistenceAdapter;
 import org.apache.activemq.thread.TaskRunnerFactory;
 
 public class ManagedTopicRegion extends TopicRegion {
 
     private final ManagedRegionBroker regionBroker;
 
-    public ManagedTopicRegion(ManagedRegionBroker broker, DestinationStatistics destinationStatistics, UsageManager memoryManager, TaskRunnerFactory taskRunnerFactory, PersistenceAdapter persistenceAdapter) {
-        super(broker,destinationStatistics, memoryManager, taskRunnerFactory, persistenceAdapter);
+    public ManagedTopicRegion(ManagedRegionBroker broker, DestinationStatistics destinationStatistics, UsageManager memoryManager, TaskRunnerFactory taskRunnerFactory, DestinationFactory destinationFactory) {
+        super(broker,destinationStatistics, memoryManager, taskRunnerFactory, destinationFactory);
         regionBroker = broker;
     }
 

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java?rev=438917&r1=438916&r2=438917&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java Thu Aug 31 06:42:14 2006
@@ -35,7 +35,6 @@
 import org.apache.activemq.command.Response;
 import org.apache.activemq.filter.DestinationMap;
 import org.apache.activemq.memory.UsageManager;
-import org.apache.activemq.store.PersistenceAdapter;
 import org.apache.activemq.thread.TaskRunnerFactory;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -54,7 +53,7 @@
     protected final DestinationMap destinationMap = new DestinationMap();
     protected final ConcurrentHashMap subscriptions = new ConcurrentHashMap();
     protected final UsageManager memoryManager;
-    protected final PersistenceAdapter persistenceAdapter;
+    protected final DestinationFactory destinationFactory;
     protected final DestinationStatistics destinationStatistics;
     protected final RegionBroker broker;
     protected boolean autoCreateDestinations=true;
@@ -62,12 +61,18 @@
     protected final Object destinationsMutex = new Object();
     protected final Map consumerChangeMutexMap = new HashMap();
 
-    public AbstractRegion(RegionBroker broker,DestinationStatistics destinationStatistics, UsageManager memoryManager, TaskRunnerFactory taskRunnerFactory, PersistenceAdapter persistenceAdapter) {
+    public AbstractRegion(RegionBroker broker,DestinationStatistics destinationStatistics, UsageManager memoryManager, TaskRunnerFactory taskRunnerFactory, DestinationFactory destinationFactory) {
+        if (broker == null) {
+            throw new IllegalArgumentException("null broker");
+        }
         this.broker = broker;
         this.destinationStatistics = destinationStatistics;
         this.memoryManager = memoryManager;
         this.taskRunnerFactory = taskRunnerFactory;
-        this.persistenceAdapter = persistenceAdapter;
+        if (broker == null) {
+            throw new IllegalArgumentException("null destinationFactory");
+        }
+        this.destinationFactory = destinationFactory;
     }
 
     public void start() throws Exception {
@@ -205,15 +210,14 @@
             // eagerly load all destinations into the broker but have an inactive state for the
             // destination which has reduced memory usage.
             //
-            if( persistenceAdapter!=null ) {
-                Set inactiveDests = getInactiveDestinations();
-                for (Iterator iter = inactiveDests.iterator(); iter.hasNext();) {
-                    ActiveMQDestination dest = (ActiveMQDestination) iter.next();
-                    if( sub.matches(dest) ) {
-                        context.getBroker().addDestination(context, dest);
-                    }
-                }
+            Set inactiveDests = getInactiveDestinations();
+            for (Iterator iter = inactiveDests.iterator(); iter.hasNext();) {
+            	ActiveMQDestination dest = (ActiveMQDestination) iter.next();
+            	if( sub.matches(dest) ) {
+            		context.getBroker().addDestination(context, dest);
+            	}
             }
+            
 
             subscriptions.put(info.getConsumerId(), sub);
 
@@ -243,14 +247,14 @@
      * @return Set of all stored destinations
      */
     public Set getDurableDestinations(){
-        return persistenceAdapter.getDestinations();
+        return destinationFactory.getDestinations();
     }
 
     /**
      * @return all Destinations that don't have active consumers
      */
     protected Set getInactiveDestinations() {
-        Set inactiveDests = persistenceAdapter.getDestinations();
+        Set inactiveDests = destinationFactory.getDestinations();
         inactiveDests.removeAll( destinations.keySet() );
         return inactiveDests;
     }
@@ -341,7 +345,10 @@
     }
 
     protected abstract Subscription createSubscription(ConnectionContext context, ConsumerInfo info) throws Exception;
-    abstract protected Destination createDestination(ConnectionContext context, ActiveMQDestination destination) throws Exception;
+    
+    protected Destination createDestination(ConnectionContext context, ActiveMQDestination destination) throws Exception {
+        return destinationFactory.createDestination(context, destination, destinationStatistics);
+    }
 
     public boolean isAutoCreateDestinations() {
         return autoCreateDestinations;

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Destination.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Destination.java?rev=438917&r1=438916&r2=438917&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Destination.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Destination.java Thu Aug 31 06:42:14 2006
@@ -25,9 +25,7 @@
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.Message;
 import org.apache.activemq.command.MessageAck;
-import org.apache.activemq.command.MessageId;
 import org.apache.activemq.memory.UsageManager;
-import org.apache.activemq.store.MessageStore;
 
 /**
  * 
@@ -43,7 +41,6 @@
     void acknowledge(ConnectionContext context, Subscription sub, final MessageAck ack, final MessageReference node) throws IOException;
     
     void gc();
-    Message loadMessage(MessageId messageId) throws IOException;
  
     ActiveMQDestination getActiveMQDestination();
     UsageManager getUsageManager();
@@ -51,7 +48,6 @@
     void dispose(ConnectionContext context) throws IOException;
     
     DestinationStatistics getDestinationStatistics();
-    MessageStore getMessageStore();
     DeadLetterStrategy getDeadLetterStrategy();
     
     public Message[] browse();

Added: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFactory.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFactory.java?rev=438917&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFactory.java (added)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFactory.java Thu Aug 31 06:42:14 2006
@@ -0,0 +1,56 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.region;
+
+import java.io.IOException;
+import java.util.Set;
+
+import org.apache.activemq.broker.ConnectionContext;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ActiveMQTopic;
+import org.apache.activemq.command.SubscriptionInfo;
+
+/**
+ * Used to create Destinations. One instance of DestinationFactory is used per BrokerService. 
+ * 
+ * @author fateev@amazon.com
+ * @version $Revision$
+ */
+public abstract class DestinationFactory {
+    
+    /**
+     * Create destination implementation.
+     */
+    abstract public Destination createDestination(ConnectionContext context, ActiveMQDestination destination, DestinationStatistics destinationStatistics) throws Exception;
+
+    /**
+     * Returns a set of all the {@link org.apache.activemq.command.ActiveMQDestination}
+     * objects that the persistence store is aware exist.
+     */
+    abstract public Set getDestinations();
+
+    /**
+     * Lists all the durable subscirptions for a given destination.
+     */
+    abstract public SubscriptionInfo[] getAllDurableSubscriptions(ActiveMQTopic topic) throws IOException;
+
+    
+    abstract public long getLastMessageBrokerSequenceId() throws IOException;
+
+    abstract public void setRegionBroker(RegionBroker regionBroker);
+}

Propchange: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFactory.java
------------------------------------------------------------------------------
    svn:keywords = Date Author Id Revision HeadURL

Propchange: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFactory.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Added: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFactoryImpl.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFactoryImpl.java?rev=438917&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFactoryImpl.java (added)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFactoryImpl.java Thu Aug 31 06:42:14 2006
@@ -0,0 +1,158 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.region;
+
+import java.io.IOException;
+import java.util.Set;
+
+import javax.jms.JMSException;
+
+import org.apache.activemq.advisory.AdvisorySupport;
+import org.apache.activemq.broker.ConnectionContext;
+import org.apache.activemq.broker.region.policy.PolicyEntry;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.command.ActiveMQTempDestination;
+import org.apache.activemq.command.ActiveMQTopic;
+import org.apache.activemq.command.SubscriptionInfo;
+import org.apache.activemq.memory.UsageManager;
+import org.apache.activemq.store.MessageStore;
+import org.apache.activemq.store.PersistenceAdapter;
+import org.apache.activemq.store.TopicMessageStore;
+import org.apache.activemq.thread.TaskRunnerFactory;
+
+/**
+ * Creates standard ActiveMQ implementations of {@link org.apache.activemq.broker.region.Destination}.
+ * 
+ * @author fateev@amazon.com
+ * @version $Revision$
+ */
+public class DestinationFactoryImpl extends DestinationFactory {
+
+    protected final UsageManager memoryManager;
+    protected final TaskRunnerFactory taskRunnerFactory;
+    protected final PersistenceAdapter persistenceAdapter;
+    protected RegionBroker broker;
+
+    public DestinationFactoryImpl(UsageManager memoryManager, TaskRunnerFactory taskRunnerFactory,
+            PersistenceAdapter persistenceAdapter) {
+        this.memoryManager = memoryManager;
+        this.taskRunnerFactory = taskRunnerFactory;
+        if (persistenceAdapter == null) {
+            throw new IllegalArgumentException("null persistenceAdapter");
+        }
+        this.persistenceAdapter = persistenceAdapter;
+    }
+
+    public void setRegionBroker(RegionBroker broker) {
+        if (broker == null) {
+            throw new IllegalArgumentException("null broker");
+        }
+        this.broker = broker;
+    }
+
+    public Set getDestinations() {
+        return persistenceAdapter.getDestinations();
+    }
+
+    /**
+     * @return instance of {@link Queue} or {@link Topic}
+     */
+    public Destination createDestination(ConnectionContext context, ActiveMQDestination destination, DestinationStatistics destinationStatistics) throws Exception {
+        if (destination.isQueue()) {
+            if (destination.isTemporary()) {
+                final ActiveMQTempDestination tempDest = (ActiveMQTempDestination) destination;
+                return new Queue(destination, memoryManager, null, destinationStatistics, taskRunnerFactory) {
+                    
+                    public void addSubscription(ConnectionContext context,Subscription sub) throws Exception {
+                        // Only consumers on the same connection can consume from 
+                        // the temporary destination
+                        if( !tempDest.getConnectionId().equals( sub.getConsumerInfo().getConsumerId().getConnectionId() ) ) {
+                            throw new JMSException("Cannot subscribe to remote temporary destination: "+tempDest);
+                        }
+                        super.addSubscription(context, sub);
+                    };
+                };
+            } else {
+                MessageStore store = persistenceAdapter.createQueueMessageStore((ActiveMQQueue) destination);
+                Queue queue = new Queue(destination, memoryManager, store, destinationStatistics, taskRunnerFactory);
+                configureQueue(queue, destination);
+                return queue;
+            }
+        } else if (destination.isTemporary()){
+            final ActiveMQTempDestination tempDest = (ActiveMQTempDestination) destination;
+            return new Topic(destination, null, memoryManager, destinationStatistics, taskRunnerFactory) {
+                public void addSubscription(ConnectionContext context,Subscription sub) throws Exception {
+                    // Only consumers on the same connection can consume from 
+                    // the temporary destination
+                    if( !tempDest.getConnectionId().equals( sub.getConsumerInfo().getConsumerId().getConnectionId() ) ) {
+                        throw new JMSException("Cannot subscribe to remote temporary destination: "+tempDest);
+                    }
+                    super.addSubscription(context, sub);
+                };
+            };
+        } else {
+            TopicMessageStore store = null;
+            if (!AdvisorySupport.isAdvisoryTopic(destination)) {
+                store = persistenceAdapter.createTopicMessageStore((ActiveMQTopic) destination);
+            }
+            
+            Topic topic = new Topic(destination, store, memoryManager, destinationStatistics, taskRunnerFactory);
+            configureTopic(topic, destination);
+            
+            return topic;
+        }
+    }
+
+    protected void configureQueue(Queue queue, ActiveMQDestination destination) {
+        if (broker == null) {
+            throw new IllegalStateException("broker property is not set");
+        }
+        if (broker.getDestinationPolicy() != null) {
+            PolicyEntry entry = broker.getDestinationPolicy().getEntryFor(destination);
+            if (entry != null) {
+                entry.configure(queue);
+            }
+        }
+    }
+
+    protected void configureTopic(Topic topic, ActiveMQDestination destination) {
+        if (broker == null) {
+            throw new IllegalStateException("broker property is not set");
+        }
+        if (broker.getDestinationPolicy() != null) {
+            PolicyEntry entry = broker.getDestinationPolicy().getEntryFor(destination);
+            if (entry != null) {
+                entry.configure(topic);
+            }
+        }
+    }
+
+    public long getLastMessageBrokerSequenceId() throws IOException {
+        return persistenceAdapter.getLastMessageBrokerSequenceId();
+    }
+
+    public PersistenceAdapter getPersistenceAdapter() {
+        return persistenceAdapter;
+    }
+
+    public SubscriptionInfo[] getAllDurableSubscriptions(ActiveMQTopic topic) throws IOException {
+        return persistenceAdapter.createTopicMessageStore(topic).getAllSubscriptions();
+    }
+
+}

Propchange: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFactoryImpl.java
------------------------------------------------------------------------------
    svn:keywords = Date Author Id Revision HeadURL

Propchange: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFactoryImpl.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java?rev=438917&r1=438916&r2=438917&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java Thu Aug 31 06:42:14 2006
@@ -23,9 +23,7 @@
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.Message;
 import org.apache.activemq.command.MessageAck;
-import org.apache.activemq.command.MessageId;
 import org.apache.activemq.memory.UsageManager;
-import org.apache.activemq.store.MessageStore;
 
 import java.io.IOException;
 import java.util.Iterator;
@@ -99,10 +97,6 @@
         return next.getMessagesCached();
     }
 
-    public MessageStore getMessageStore() {
-        return next.getMessageStore();
-    }
-
     public String getName() {
         return next.getName();
     }
@@ -113,10 +107,6 @@
 
     public UsageManager getUsageManager() {
         return next.getUsageManager();
-    }
-
-    public Message loadMessage(MessageId messageId) throws IOException {
-        return next.loadMessage(messageId);
     }
 
     public boolean lock(MessageReference node, LockOwner lockOwner) {

Added: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/EndOfBrowseMarkerQueueMessageReference.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/EndOfBrowseMarkerQueueMessageReference.java?rev=438917&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/EndOfBrowseMarkerQueueMessageReference.java (added)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/EndOfBrowseMarkerQueueMessageReference.java Thu Aug 31 06:42:14 2006
@@ -0,0 +1,128 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.region;
+
+import java.io.IOException;
+
+import org.apache.activemq.command.ActiveMQMessage;
+import org.apache.activemq.command.ConsumerId;
+import org.apache.activemq.command.Message;
+import org.apache.activemq.command.MessageId;
+
+/**
+ * Only used by the {@link QueueMessageReference#END_OF_BROWSE_MARKER} 
+ */
+final class EndOfBrowseMarkerQueueMessageReference implements
+		QueueMessageReference {
+
+	private ActiveMQMessage message = new ActiveMQMessage();
+	private volatile int references;
+	
+	public void drop() {
+		throw new RuntimeException("not implemented");
+	}
+
+	public LockOwner getLockOwner() {
+		throw new RuntimeException("not implemented");
+	}
+
+	public boolean isAcked() {
+		return false;
+	}
+
+	public boolean isDropped() {
+		throw new RuntimeException("not implemented");
+	}
+
+	public boolean lock(LockOwner subscription) {
+		throw new RuntimeException("not implemented");
+	}
+
+	public void setAcked(boolean b) {
+		throw new RuntimeException("not implemented");
+	}
+
+	public void unlock() {
+		throw new RuntimeException("not implemented");
+	}
+
+	public int decrementReferenceCount() {
+		return --references;
+	}
+
+	public long getExpiration() {
+		throw new RuntimeException("not implemented");
+	}
+
+	public String getGroupID() {
+		throw new RuntimeException("not implemented");
+	}
+
+	public int getGroupSequence() {
+		throw new RuntimeException("not implemented");
+	}
+
+	public Message getMessage() throws IOException {
+		return message;
+	}
+
+	public Message getMessageHardRef() {
+		throw new RuntimeException("not implemented");
+	}
+
+	public MessageId getMessageId() {
+		return message.getMessageId();
+	}
+
+	public int getRedeliveryCounter() {
+		throw new RuntimeException("not implemented");
+	}
+
+	public int getReferenceCount() {
+		return references;
+	}
+
+	public Destination getRegionDestination() {
+		return null;
+	}
+
+	public int getSize() {
+		throw new RuntimeException("not implemented");
+	}
+
+	public ConsumerId getTargetConsumerId() {
+		throw new RuntimeException("not implemented");
+	}
+
+	public void incrementRedeliveryCounter() {
+		throw new RuntimeException("not implemented");
+	}
+
+	public int incrementReferenceCount() {
+		return ++references;
+	}
+
+	public boolean isExpired() {
+		throw new RuntimeException("not implemented");
+	}
+
+	public boolean isPersistent() {
+		throw new RuntimeException("not implemented");
+	}
+
+}

Propchange: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/EndOfBrowseMarkerQueueMessageReference.java
------------------------------------------------------------------------------
    svn:keywords = Date Author Id Revision HeadURL

Propchange: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/EndOfBrowseMarkerQueueMessageReference.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/IndirectMessageReference.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/IndirectMessageReference.java?rev=438917&r1=438916&r2=438917&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/IndirectMessageReference.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/IndirectMessageReference.java Thu Aug 31 06:42:14 2006
@@ -19,10 +19,10 @@
 
 import java.io.IOException;
 
-import org.apache.activemq.command.ActiveMQMessage;
 import org.apache.activemq.command.ConsumerId;
 import org.apache.activemq.command.Message;
 import org.apache.activemq.command.MessageId;
+import org.apache.activemq.store.MessageStore;
 
 /**
  * Keeps track of a message that is flowing through the Broker.  This 
@@ -31,13 +31,13 @@
  * 
  * @version $Revision: 1.15 $
  */
-public class IndirectMessageReference implements MessageReference {
-    
-    public static final ActiveMQMessage END_OF_BROWSE_MARKER_MESSAGE = new ActiveMQMessage();
-    public static final IndirectMessageReference END_OF_BROWSE_MARKER = new IndirectMessageReference(END_OF_BROWSE_MARKER_MESSAGE);
+public class IndirectMessageReference implements QueueMessageReference {
 
     /** The destination that is managing the message */
     private final Destination regionDestination;
+    
+    private final MessageStore destinationStore;
+    
     /** The id of the message is always valid */
     private final MessageId messageId;
     /** Is the message persistent? */
@@ -63,23 +63,9 @@
     /** the expiration time of the message */
     private long expiration;
     
-    /**
-     * Only used by the END_OF_BROWSE_MARKER singleton
-     */
-    private IndirectMessageReference(ActiveMQMessage message) {
-        this.regionDestination=null;
-        this.message = message;
-        this.messageId=null;
-        this.persistent=false;
-        this.groupID = null;
-        this.groupSequence = 0;
-        this.targetConsumerId=null;
-        this.expiration = message.getExpiration();
-        this.cachedSize = message != null ? message.getSize() : 0;
-    }
-
-    public IndirectMessageReference(Destination destination, Message message) {
+    public IndirectMessageReference(Queue destination, MessageStore destinationStore, Message message) {
         this.regionDestination=destination;
+        this.destinationStore = destinationStore;
         this.message = message;
         this.messageId=message.getMessageId();
         this.persistent=message.isPersistent() && destination.getMessageStore()!=null;
@@ -106,10 +92,11 @@
         if( persistent && rc==1 ) {
             assert message == null;            
             try {
-                message = regionDestination.loadMessage(messageId);
+                message = destinationStore.getMessage(messageId);
                 if( message == null ) {
                     dropped = true;
                 } else {
+                    message.setRegionDestination(regionDestination);
                     message.incrementReferenceCount();
                 }
             } catch (IOException e) {

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/MessageReference.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/MessageReference.java?rev=438917&r1=438916&r2=438917&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/MessageReference.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/MessageReference.java Thu Aug 31 06:42:14 2006
@@ -49,6 +49,8 @@
     public ConsumerId getTargetConsumerId();
     public int getSize();
     public long getExpiration();
+    public String getGroupID();
+    public int getGroupSequence();
     
     /**
      * Returns true if this message is expired

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java?rev=438917&r1=438916&r2=438917&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java Thu Aug 31 06:42:14 2006
@@ -160,7 +160,7 @@
                 // subscription.
                 for (Iterator iter = messages.iterator(); iter.hasNext();) {
 
-                    IndirectMessageReference node = (IndirectMessageReference) iter.next();
+                    QueueMessageReference node = (QueueMessageReference) iter.next();
                     if (node.isDropped()) {
                         continue;
                     }
@@ -220,7 +220,7 @@
                     List messagesToDispatch = new ArrayList();
                     synchronized (messages) {
                         for (Iterator iter = messages.iterator(); iter.hasNext();) {
-                            IndirectMessageReference node = (IndirectMessageReference) iter.next();
+                            QueueMessageReference node = (QueueMessageReference) iter.next();
                             if (node.isDropped()) {
                                 continue;
                             }
@@ -237,7 +237,7 @@
                     // now lets dispatch from the copy of the collection to
                     // avoid deadlocks
                     for (Iterator iter = messagesToDispatch.iterator(); iter.hasNext();) {
-                        IndirectMessageReference node = (IndirectMessageReference) iter.next();
+                        QueueMessageReference node = (QueueMessageReference) iter.next();
                         node.incrementRedeliveryCounter();
                         node.unlock();
                         msgContext.setMessageReference(node);
@@ -316,7 +316,7 @@
         synchronized (messages) {
             for (Iterator iter = messages.iterator(); iter.hasNext();) {
                 // Remove dropped messages from the queue.
-                IndirectMessageReference node = (IndirectMessageReference) iter.next();
+                QueueMessageReference node = (QueueMessageReference) iter.next();
                 if (node.isDropped()) {
                     garbageSize--;
                     iter.remove();
@@ -345,7 +345,7 @@
         }
     }
 
-    public Message loadMessage(MessageId messageId) throws IOException {
+    Message loadMessage(MessageId messageId) throws IOException {
         Message msg = store.getMessage(messageId);
         if (msg != null) {
             msg.setRegionDestination(this);
@@ -460,7 +460,7 @@
     // Implementation methods
     // -------------------------------------------------------------------------
     private MessageReference createMessageReference(Message message) {
-        return new IndirectMessageReference(this, message);
+        return new IndirectMessageReference(this, store, message);
     }
 
     private void dispatch(ConnectionContext context, MessageReference node, Message message) throws Exception {
@@ -504,7 +504,7 @@
         return rc;
     }
 
-    public MessageStore getMessageStore() {
+    MessageStore getMessageStore() {
         return store;
     }
 
@@ -565,7 +565,7 @@
             ConnectionContext c = createConnectionContext();
             for (Iterator iter = messages.iterator(); iter.hasNext();) {
                 try {
-                    IndirectMessageReference r = (IndirectMessageReference) iter.next();
+                    QueueMessageReference r = (QueueMessageReference) iter.next();
 
                     // We should only delete messages that can be locked.
                     if (r.lock(LockOwner.HIGH_PRIORITY_LOCK_OWNER)) {

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueBrowserSubscription.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueBrowserSubscription.java?rev=438917&r1=438916&r2=438917&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueBrowserSubscription.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueBrowserSubscription.java Thu Aug 31 06:42:14 2006
@@ -38,7 +38,7 @@
     }
         
     protected boolean canDispatch(MessageReference node) {
-        return !((IndirectMessageReference)node).isAcked();
+        return !((QueueMessageReference)node).isAcked();
     }
     
     public String toString() {
@@ -53,11 +53,11 @@
 
     public void browseDone() throws Exception {
         browseDone = true;
-        add(IndirectMessageReference.END_OF_BROWSE_MARKER);
+        add(QueueMessageReference.END_OF_BROWSE_MARKER);
     }
     
     protected MessageDispatch createMessageDispatch(MessageReference node, Message message) {
-        if( node == IndirectMessageReference.END_OF_BROWSE_MARKER ) {
+        if( node == QueueMessageReference.END_OF_BROWSE_MARKER ) {
             MessageDispatch md = new MessageDispatch();
             md.setMessage(null);
             md.setConsumerId( info.getConsumerId() );

Added: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueMessageReference.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueMessageReference.java?rev=438917&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueMessageReference.java (added)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueMessageReference.java Thu Aug 31 06:42:14 2006
@@ -0,0 +1,43 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.region;
+
+/**
+ * Queue specific MessageReference.
+ * 
+ * @author fateev@amazon.com
+ * @version $Revision$
+ */
+public interface QueueMessageReference extends MessageReference {
+
+    public static final QueueMessageReference END_OF_BROWSE_MARKER = new EndOfBrowseMarkerQueueMessageReference();
+
+    public boolean isAcked();
+    
+    public void setAcked(boolean b);
+    
+    public void drop();
+ 
+    public boolean isDropped();
+        
+    public boolean lock(LockOwner subscription);
+    
+    public void unlock();
+    
+    public LockOwner getLockOwner();
+}

Propchange: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueMessageReference.java
------------------------------------------------------------------------------
    svn:keywords = Date Author Id Revision HeadURL

Propchange: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueMessageReference.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueRegion.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueRegion.java?rev=438917&r1=438916&r2=438917&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueRegion.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueRegion.java Thu Aug 31 06:42:14 2006
@@ -17,16 +17,10 @@
  */
 package org.apache.activemq.broker.region;
 
-import org.apache.activemq.broker.Broker;
 import org.apache.activemq.broker.ConnectionContext;
-import org.apache.activemq.broker.region.policy.PolicyEntry;
-import org.apache.activemq.broker.region.policy.PolicyMap;
 import org.apache.activemq.command.ActiveMQDestination;
-import org.apache.activemq.command.ActiveMQQueue;
 import org.apache.activemq.command.ConsumerInfo;
 import org.apache.activemq.memory.UsageManager;
-import org.apache.activemq.store.MessageStore;
-import org.apache.activemq.store.PersistenceAdapter;
 import org.apache.activemq.thread.TaskRunnerFactory;
 
 import javax.jms.InvalidSelectorException;
@@ -43,31 +37,13 @@
     
 
     public QueueRegion(RegionBroker broker,DestinationStatistics destinationStatistics, UsageManager memoryManager, TaskRunnerFactory taskRunnerFactory,
-            PersistenceAdapter persistenceAdapter) {
-        super(broker,destinationStatistics, memoryManager, taskRunnerFactory, persistenceAdapter);
+            DestinationFactory destinationFactory) {
+        super(broker,destinationStatistics, memoryManager, taskRunnerFactory, destinationFactory);
     }
 
     public String toString() {
         return "QueueRegion: destinations=" + destinations.size() + ", subscriptions=" + subscriptions.size() + ", memory=" + memoryManager.getPercentUsage()
                 + "%";
-    }
-
-    // Implementation methods
-    // -------------------------------------------------------------------------
-    protected Destination createDestination(ConnectionContext context, ActiveMQDestination destination) throws Exception {
-        MessageStore store = persistenceAdapter.createQueueMessageStore((ActiveMQQueue) destination);
-        Queue queue = new Queue(destination, memoryManager, store, destinationStatistics, taskRunnerFactory);
-        configureQueue(queue, destination);
-        return queue;
-    }
-
-    protected void configureQueue(Queue queue, ActiveMQDestination destination) {
-        if (broker.getDestinationPolicy() != null) {
-            PolicyEntry entry = broker.getDestinationPolicy().getEntryFor(destination);
-            if (entry != null) {
-                entry.configure(queue);
-            }
-        }
     }
 
     protected Subscription createSubscription(ConnectionContext context, ConsumerInfo info) throws InvalidSelectorException {

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueSubscription.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueSubscription.java?rev=438917&r1=438916&r2=438917&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueSubscription.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueSubscription.java Thu Aug 31 06:42:14 2006
@@ -49,11 +49,13 @@
      */
     protected void acknowledge(ConnectionContext context, final MessageAck ack, final MessageReference n) throws IOException {
         
-        final IndirectMessageReference node = (IndirectMessageReference) n;
 
-        final Queue queue = (Queue)node.getRegionDestination();
-        queue.acknowledge(context, this, ack, node);
-        
+
+        final Destination q = n.getRegionDestination();
+        q.acknowledge(context, this, ack, n);
+
+        final QueueMessageReference node = (QueueMessageReference) n;
+        final Queue queue = (Queue)q;
         if( !ack.isInTransaction() ) {
             node.drop();            
             queue.dropEvent();
@@ -72,10 +74,9 @@
     }
     
     protected boolean canDispatch(MessageReference n) throws IOException {
-        IndirectMessageReference node = (IndirectMessageReference) n;
+        QueueMessageReference node = (QueueMessageReference) n;
         if( node.isAcked() )
             return false;
-        
         // Keep message groups together.
         String groupId = node.getGroupID();
         int sequence = node.getGroupSequence();
@@ -121,7 +122,6 @@
         } else {
             return node.lock(this);
         }
-        
     }
 
     /**

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java?rev=438917&r1=438916&r2=438917&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java Thu Aug 31 06:42:14 2006
@@ -87,20 +87,22 @@
     private BrokerId brokerId;
     private String brokerName;
     private Map clientIdSet = new HashMap(); // we will synchronize access
-    protected  PersistenceAdapter adaptor;
     private final DestinationInterceptor destinationInterceptor;
     private ConnectionContext adminConnectionContext;
+    protected DestinationFactory destinationFactory;
         
-    public RegionBroker(BrokerService brokerService,TaskRunnerFactory taskRunnerFactory, UsageManager memoryManager, PersistenceAdapter adapter, DestinationInterceptor destinationInterceptor) throws IOException {
+    public RegionBroker(BrokerService brokerService,TaskRunnerFactory taskRunnerFactory, UsageManager memoryManager, DestinationFactory destinationFactory, DestinationInterceptor destinationInterceptor) throws IOException {
         this.brokerService = brokerService;
+        if (destinationFactory == null) {
+            throw new IllegalArgumentException("null destinationFactory");
+        }
+        this.sequenceGenerator.setLastSequenceId( destinationFactory.getLastMessageBrokerSequenceId() );
+        this.destinationFactory = destinationFactory;
+        queueRegion = createQueueRegion(memoryManager, taskRunnerFactory, destinationFactory);
+        topicRegion = createTopicRegion(memoryManager, taskRunnerFactory, destinationFactory);
         this.destinationInterceptor = destinationInterceptor;
-        this.sequenceGenerator.setLastSequenceId( adapter.getLastMessageBrokerSequenceId() );
-        this.adaptor = adapter;//weird - both are valid spellings ...
-        queueRegion = createQueueRegion(memoryManager, taskRunnerFactory, adapter);
-        topicRegion = createTopicRegion(memoryManager, taskRunnerFactory, adapter);
-        
-        tempQueueRegion = createTempQueueRegion(memoryManager, taskRunnerFactory);
-        tempTopicRegion = createTempTopicRegion(memoryManager, taskRunnerFactory);        
+        tempQueueRegion = createTempQueueRegion(memoryManager, taskRunnerFactory, destinationFactory);
+        tempTopicRegion = createTempTopicRegion(memoryManager, taskRunnerFactory, destinationFactory);        
     }
     
     public Map getDestinationMap() {
@@ -147,20 +149,20 @@
         return topicRegion;
     }
 
-    protected Region createTempTopicRegion(UsageManager memoryManager, TaskRunnerFactory taskRunnerFactory) {
-        return new TempTopicRegion(this,destinationStatistics, memoryManager, taskRunnerFactory);
+    protected Region createTempTopicRegion(UsageManager memoryManager, TaskRunnerFactory taskRunnerFactory, DestinationFactory destinationFactory) {
+        return new TempTopicRegion(this,destinationStatistics, memoryManager, taskRunnerFactory, destinationFactory);
     }
 
-    protected Region createTempQueueRegion(UsageManager memoryManager, TaskRunnerFactory taskRunnerFactory) {
-        return new TempQueueRegion(this,destinationStatistics, memoryManager, taskRunnerFactory);
+    protected Region createTempQueueRegion(UsageManager memoryManager, TaskRunnerFactory taskRunnerFactory, DestinationFactory destinationFactory) {
+        return new TempQueueRegion(this,destinationStatistics, memoryManager, taskRunnerFactory, destinationFactory);
     }
 
-    protected Region createTopicRegion(UsageManager memoryManager, TaskRunnerFactory taskRunnerFactory, PersistenceAdapter adapter) {
-        return new TopicRegion(this,destinationStatistics, memoryManager, taskRunnerFactory, adapter);
+    protected Region createTopicRegion(UsageManager memoryManager, TaskRunnerFactory taskRunnerFactory, DestinationFactory destinationFactory) {
+        return new TopicRegion(this,destinationStatistics, memoryManager, taskRunnerFactory, destinationFactory);
     }
 
-    protected Region createQueueRegion(UsageManager memoryManager, TaskRunnerFactory taskRunnerFactory, PersistenceAdapter adapter) {
-        return new QueueRegion(this,destinationStatistics, memoryManager, taskRunnerFactory, adapter);
+    protected Region createQueueRegion(UsageManager memoryManager, TaskRunnerFactory taskRunnerFactory, DestinationFactory destinationFactory) {
+        return new QueueRegion(this,destinationStatistics, memoryManager, taskRunnerFactory, destinationFactory);
     }
     
     private static PersistenceAdapter createDefaultPersistenceAdapter(UsageManager memoryManager) throws IOException {
@@ -537,7 +539,7 @@
     }
     
     public Set getDurableDestinations(){
-        return adaptor != null ? adaptor.getDestinations() : Collections.EMPTY_SET;
+        return destinationFactory.getDestinations();
     }
     
     public boolean isFaultTolerantConfiguration(){

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TempQueueRegion.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TempQueueRegion.java?rev=438917&r1=438916&r2=438917&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TempQueueRegion.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TempQueueRegion.java Thu Aug 31 06:42:14 2006
@@ -18,12 +18,7 @@
 package org.apache.activemq.broker.region;
 
 import javax.jms.InvalidSelectorException;
-import javax.jms.JMSException;
-
-import org.apache.activemq.broker.Broker;
 import org.apache.activemq.broker.ConnectionContext;
-import org.apache.activemq.command.ActiveMQDestination;
-import org.apache.activemq.command.ActiveMQTempDestination;
 import org.apache.activemq.command.ConsumerInfo;
 import org.apache.activemq.memory.UsageManager;
 import org.apache.activemq.thread.TaskRunnerFactory;
@@ -34,25 +29,9 @@
  */
 public class TempQueueRegion extends AbstractRegion {
 
-    public TempQueueRegion(RegionBroker broker,DestinationStatistics destinationStatistics, UsageManager memoryManager, TaskRunnerFactory taskRunnerFactory) {
-        super(broker,destinationStatistics, memoryManager, taskRunnerFactory, null);
+    public TempQueueRegion(RegionBroker broker,DestinationStatistics destinationStatistics, UsageManager memoryManager, TaskRunnerFactory taskRunnerFactory, DestinationFactory destinationFactory) {
+        super(broker,destinationStatistics, memoryManager, taskRunnerFactory, destinationFactory);
         setAutoCreateDestinations(false);
-    }
-
-    protected Destination createDestination(ConnectionContext context, ActiveMQDestination destination) throws Exception {
-        final ActiveMQTempDestination tempDest = (ActiveMQTempDestination) destination;
-        return new Queue(destination, memoryManager, null, destinationStatistics, taskRunnerFactory) {
-            
-            public void addSubscription(ConnectionContext context,Subscription sub) throws Exception {
-                // Only consumers on the same connection can consume from 
-                // the temporary destination
-                if( !tempDest.getConnectionId().equals( sub.getConsumerInfo().getConsumerId().getConnectionId() ) ) {
-                    throw new JMSException("Cannot subscribe to remote temporary destination: "+tempDest);
-                }
-                super.addSubscription(context, sub);
-            };
-            
-        };
     }
 
     protected Subscription createSubscription(ConnectionContext context, ConsumerInfo info) throws InvalidSelectorException {

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TempTopicRegion.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TempTopicRegion.java?rev=438917&r1=438916&r2=438917&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TempTopicRegion.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TempTopicRegion.java Thu Aug 31 06:42:14 2006
@@ -19,10 +19,7 @@
 
 import javax.jms.JMSException;
 
-import org.apache.activemq.broker.Broker;
 import org.apache.activemq.broker.ConnectionContext;
-import org.apache.activemq.command.ActiveMQDestination;
-import org.apache.activemq.command.ActiveMQTempDestination;
 import org.apache.activemq.command.ConsumerInfo;
 import org.apache.activemq.memory.UsageManager;
 import org.apache.activemq.thread.TaskRunnerFactory;
@@ -33,25 +30,9 @@
  */
 public class TempTopicRegion extends AbstractRegion {
 
-    public TempTopicRegion(RegionBroker broker,DestinationStatistics destinationStatistics, UsageManager memoryManager, TaskRunnerFactory taskRunnerFactory) {
-        super(broker,destinationStatistics, memoryManager, taskRunnerFactory, null);
+    public TempTopicRegion(RegionBroker broker,DestinationStatistics destinationStatistics, UsageManager memoryManager, TaskRunnerFactory taskRunnerFactory, DestinationFactory destinationFactory) {
+        super(broker,destinationStatistics, memoryManager, taskRunnerFactory, destinationFactory);
         setAutoCreateDestinations(false);
-    }
-
-    protected Destination createDestination(ConnectionContext context, ActiveMQDestination destination) throws Exception {
-        final ActiveMQTempDestination tempDest = (ActiveMQTempDestination) destination;
-        return new Topic(destination, null, memoryManager, destinationStatistics, taskRunnerFactory) {
-            
-            public void addSubscription(ConnectionContext context,Subscription sub) throws Exception {
-                // Only consumers on the same connection can consume from 
-                // the temporary destination
-                if( !tempDest.getConnectionId().equals( sub.getConsumerInfo().getConsumerId().getConnectionId() ) ) {
-                    throw new JMSException("Cannot subscribe to remote temporary destination: "+tempDest);
-                }
-                super.addSubscription(context, sub);
-            };
-            
-        };
     }
 
     protected Subscription createSubscription(ConnectionContext context, ConsumerInfo info) throws JMSException {

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicRegion.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicRegion.java?rev=438917&r1=438916&r2=438917&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicRegion.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicRegion.java Thu Aug 31 06:42:14 2006
@@ -17,17 +17,16 @@
  */
 package org.apache.activemq.broker.region;
 
+import java.io.IOException;
 import java.util.Iterator;
 import java.util.Set;
 
 import javax.jms.InvalidDestinationException;
 import javax.jms.JMSException;
 
-import org.apache.activemq.advisory.AdvisorySupport;
 import org.apache.activemq.broker.ConnectionContext;
 import org.apache.activemq.broker.region.policy.PolicyEntry;
 import org.apache.activemq.command.ActiveMQDestination;
-import org.apache.activemq.command.ActiveMQTopic;
 import org.apache.activemq.command.ConnectionId;
 import org.apache.activemq.command.ConsumerId;
 import org.apache.activemq.command.ConsumerInfo;
@@ -35,7 +34,6 @@
 import org.apache.activemq.command.SessionId;
 import org.apache.activemq.command.SubscriptionInfo;
 import org.apache.activemq.memory.UsageManager;
-import org.apache.activemq.store.PersistenceAdapter;
 import org.apache.activemq.store.TopicMessageStore;
 import org.apache.activemq.thread.TaskRunnerFactory;
 import org.apache.activemq.util.LongSequenceGenerator;
@@ -57,8 +55,8 @@
     private boolean keepDurableSubsActive=false;
 
     public TopicRegion(RegionBroker broker,DestinationStatistics destinationStatistics, UsageManager memoryManager, TaskRunnerFactory taskRunnerFactory,
-            PersistenceAdapter persistenceAdapter) {
-        super(broker,destinationStatistics, memoryManager, taskRunnerFactory, persistenceAdapter);
+            DestinationFactory destinationFactory) {
+        super(broker,destinationStatistics, memoryManager, taskRunnerFactory, destinationFactory);
         
     }
 
@@ -160,14 +158,15 @@
     // Implementation methods
     // -------------------------------------------------------------------------
     protected Destination createDestination(ConnectionContext context, ActiveMQDestination destination) throws Exception {
-        TopicMessageStore store = null;
-        if (!AdvisorySupport.isAdvisoryTopic(destination)){
-            store = persistenceAdapter.createTopicMessageStore((ActiveMQTopic) destination);
-        }
-        
-        Topic topic = new Topic(destination, store, memoryManager, destinationStatistics, taskRunnerFactory);
-        configureTopic(topic, destination);
+        Topic topic = (Topic) super.createDestination(context, destination);
+ 
+        recoverDurableSubscriptions(context, topic);
         
+        return topic;
+    }
+
+    private void recoverDurableSubscriptions(ConnectionContext context, Topic topic) throws IOException, JMSException, Exception {
+        TopicMessageStore store = (TopicMessageStore) topic.getMessageStore();
         // Eagerly recover the durable subscriptions
         if (store != null) {            
             SubscriptionInfo[] infos = store.getAllSubscriptions();
@@ -191,8 +190,6 @@
                 topic.addSubscription(context, sub);
             }            
         }
-        
-        return topic;
     }
     
     private ConsumerInfo createInactiveConsumerInfo(SubscriptionInfo info) {

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/ListContainer.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/ListContainer.java?rev=438917&r1=438916&r2=438917&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/ListContainer.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/ListContainer.java Thu Aug 31 06:42:14 2006
@@ -100,6 +100,20 @@
      * @return true if successful
      */
     public boolean doRemove(int position);
-        
+    
+    /**
+     * @return the maximumCacheSize
+     */
+    public int getMaximumCacheSize();
+
+    /**
+     * @param maximumCacheSize the maximumCacheSize to set
+     */
+    public void setMaximumCacheSize(int maximumCacheSize);
+      
+    /**
+     * clear any cached values
+     */
+    public void clearCache();
     
 }

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/Store.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/Store.java?rev=438917&r1=438916&r2=438917&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/Store.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/Store.java Thu Aug 31 06:42:14 2006
@@ -25,6 +25,12 @@
  * @version $Revision: 1.2 $
  */
 public interface Store{
+    
+    public final static Marshaller BytesMarshaller = new BytesMarshaller();
+    
+    public final static Marshaller ObjectMarshaller = new ObjectMarshaller();
+    
+    public final static Marshaller StringMarshaller = new StringMarshaller();
     /**
      * close the store
      * 

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/BaseContainerImpl.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/BaseContainerImpl.java?rev=438917&r1=438916&r2=438917&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/BaseContainerImpl.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/BaseContainerImpl.java Thu Aug 31 06:42:14 2006
@@ -31,7 +31,7 @@
 public abstract class BaseContainerImpl{
     private static final Log log=LogFactory.getLog(BaseContainerImpl.class);
     protected IndexItem root;
-    protected IndexLinkedList list;
+    protected IndexLinkedList indexList;
     protected IndexManager rootIndexManager; // IndexManager that contains the root
     protected IndexManager indexManager;
     protected DataManager dataManager;
@@ -59,8 +59,8 @@
             synchronized(mutex){
                 if (!initialized){
                     initialized= true;
-                    if (this.list == null){
-                        this.list=new DiskIndexLinkedList(indexManager,root);
+                    if (this.indexList == null){
+                        this.indexList=new DiskIndexLinkedList(indexManager,root);
                     }
                 }
             }
@@ -68,23 +68,23 @@
     }
     
     protected void clear(){
-        if (list != null){
-            list.clear();
+        if (indexList != null){
+            indexList.clear();
         }
     }
     
     /**
-     * @return the list
+     * @return the indexList
      */
     public IndexLinkedList getList(){
-        return list;
+        return indexList;
     }
 
     /**
-     * @param list the list to set
+     * @param indexList the indexList to set
      */
-    public void setList(IndexLinkedList list){
-        this.list=list;
+    public void setList(IndexLinkedList indexList){
+        this.indexList=indexList;
     }
 
     public abstract void unload();
@@ -99,7 +99,7 @@
     protected abstract void remove(IndexItem currentItem);
 
     protected final IndexLinkedList getInternalList(){
-        return list;
+        return indexList;
     }
 
     public final void close(){
@@ -143,24 +143,24 @@
         synchronized(mutex){
             loaded=true;
             synchronized(mutex){
-                List list=new ArrayList();
+                List indexList=new ArrayList();
                 try{
                     long nextItem=root.getNextItem();
                     while(nextItem!=Item.POSITION_NOT_SET){
                         IndexItem item=new IndexItem();
                         item.setOffset(nextItem);
-                        list.add(item);
+                        indexList.add(item);
                         nextItem=item.getNextItem();
                     }
                     root.setNextItem(Item.POSITION_NOT_SET);
                     updateIndex(root);
-                    for(int i=0;i<list.size();i++){
-                        IndexItem item=(IndexItem) list.get(i);
+                    for(int i=0;i<indexList.size();i++){
+                        IndexItem item=(IndexItem) indexList.get(i);
                         dataManager.removeInterestInFile(item.getKeyFile());
                         dataManager.removeInterestInFile(item.getValueFile());
                         indexManager.freeIndex(item);
                     }
-                    list.clear();
+                    indexList.clear();
                 }catch(IOException e){
                     log.error("Failed to clear Container "+getId(),e);
                     throw new RuntimeStoreException(e);

Added: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/CachedContainerListIterator.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/CachedContainerListIterator.java?rev=438917&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/CachedContainerListIterator.java (added)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/CachedContainerListIterator.java Thu Aug 31 06:42:14 2006
@@ -0,0 +1,114 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.kaha.impl;
+
+import java.util.ListIterator;
+
+/** 
+* @version $Revision$
+*/
+public class CachedContainerListIterator implements ListIterator{
+    
+    protected ListContainerImpl container;
+    protected IndexLinkedList list;
+    protected int pos = 0;
+    protected int nextPos =0;
+    protected IndexItem currentItem;
+
+    protected CachedContainerListIterator(ListContainerImpl container,int start){
+        this.container=container;
+        this.list=list;
+        this.pos=start;
+        this.nextPos = this.pos;
+    }
+
+    
+    public boolean hasNext(){
+        return nextPos >= 0 && nextPos < container.size();
+    }
+
+    public Object next(){
+        pos = nextPos;
+        Object result = container.getCachedItem(pos);
+        nextPos++;
+        return result;
+    }
+
+    public void remove(){
+        container.remove(pos);
+        nextPos--;
+    }
+   
+    /*
+     * (non-Javadoc)
+     * 
+     * @see java.util.ListIterator#hasPrevious()
+     */
+    public boolean hasPrevious(){
+        return pos >= 0 && pos < container.size();
+    }
+
+    /*
+     * (non-Javadoc)
+     * 
+     * @see java.util.ListIterator#previous()
+     */
+    public Object previous(){
+        Object result = container.getCachedItem(pos);
+        pos--;
+        return result;
+    }
+
+    /*
+     * (non-Javadoc)
+     * 
+     * @see java.util.ListIterator#nextIndex()
+     */
+    public int nextIndex(){
+        return pos +1;
+    }
+
+    /*
+     * (non-Javadoc)
+     * 
+     * @see java.util.ListIterator#previousIndex()
+     */
+    public int previousIndex(){
+        return pos -1;
+    }
+
+    
+    /*
+     * (non-Javadoc)
+     * 
+     * @see java.util.ListIterator#set(E)
+     */
+    public void set(Object o){
+        container.internalSet(pos,o);
+    }
+
+    /*
+     * (non-Javadoc)
+     * 
+     * @see java.util.ListIterator#add(E)
+     */
+    public void add(Object o){
+        container.internalAdd(previousIndex()+1,o);
+      
+    }
+}

Propchange: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/CachedContainerListIterator.java
------------------------------------------------------------------------------
    svn:keywords = Date Author Id Revision HeadURL

Propchange: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/CachedContainerListIterator.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/ContainerListIterator.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/ContainerListIterator.java?rev=438917&r1=438916&r2=438917&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/ContainerListIterator.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/ContainerListIterator.java Thu Aug 31 06:42:14 2006
@@ -103,7 +103,7 @@
      * @see java.util.ListIterator#add(E)
      */
     public void add(Object o){
-        IndexItem item=((ListContainerImpl) container).internalSet(previousIndex()+1,o);
+        IndexItem item=((ListContainerImpl) container).internalAdd(previousIndex()+1,o);
         nextItem=item;
     }
 }

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/IndexRootContainer.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/IndexRootContainer.java?rev=438917&r1=438916&r2=438917&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/IndexRootContainer.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/IndexRootContainer.java Thu Aug 31 06:42:14 2006
@@ -23,6 +23,7 @@
 import java.util.Set;
 import org.apache.activemq.kaha.Marshaller;
 import org.apache.activemq.kaha.ObjectMarshaller;
+import org.apache.activemq.kaha.Store;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import edu.emory.mathcs.backport.java.util.concurrent.ConcurrentHashMap;
@@ -35,7 +36,7 @@
 
 class IndexRootContainer {
     private static final Log log=LogFactory.getLog(IndexRootContainer.class);
-    protected static final Marshaller rootMarshaller = new ObjectMarshaller();
+    protected static final Marshaller rootMarshaller = Store.ObjectMarshaller;
     protected IndexItem root;
     protected IndexManager indexManager;
     protected DataManager dataManager;

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/KahaStore.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/KahaStore.java?rev=438917&r1=438916&r2=438917&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/KahaStore.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/KahaStore.java Thu Aug 31 06:42:14 2006
@@ -40,14 +40,14 @@
     private static final Log log=LogFactory.getLog(KahaStore.class);
     private File directory;
 
-    private IndexRootContainer mapsContainer;
-    private IndexRootContainer listsContainer;
+    protected IndexRootContainer mapsContainer;
+    protected IndexRootContainer listsContainer;
     private Map lists=new ConcurrentHashMap();
     private Map maps=new ConcurrentHashMap();
     
     private Map dataManagers = new ConcurrentHashMap();
     private Map indexManagers = new ConcurrentHashMap();
-    private IndexManager rootIndexManager; //contains all the root indexes
+    protected IndexManager rootIndexManager; //contains all the root indexes
     
     private boolean closed=false;
     private String name;
@@ -221,7 +221,8 @@
         }
         return result;
     }
-
+    
+    
     public void deleteListContainer(Object id) throws IOException{
         initialize();
         ListContainerImpl container=(ListContainerImpl) lists.remove(id);



Mime
View raw message