activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chir...@apache.org
Subject svn commit: r633639 [1/7] - in /activemq/sandbox/activemq-router: ./ src/ src/main/ src/main/java/ src/main/java/org/ src/main/java/org/apache/ src/main/java/org/apache/activemq/ src/main/java/org/apache/activemq/broker/ src/main/java/org/apache/active...
Date Tue, 04 Mar 2008 21:01:57 GMT
Author: chirino
Date: Tue Mar  4 13:01:41 2008
New Revision: 633639

URL: http://svn.apache.org/viewvc?rev=633639&view=rev
Log:
This is an experiment in trying to simplify the core routing logic in the AcitveMQ broker
using leasons learnt over time.  Will add more doco to a README file soon.  


Added:
    activemq/sandbox/activemq-router/activemq.patch
    activemq/sandbox/activemq-router/pom.xml
    activemq/sandbox/activemq-router/src/
    activemq/sandbox/activemq-router/src/main/
    activemq/sandbox/activemq-router/src/main/java/
    activemq/sandbox/activemq-router/src/main/java/org/
    activemq/sandbox/activemq-router/src/main/java/org/apache/
    activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/
    activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/
    activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/RouterBrokerService.java
    activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/RouterClientConnection.java
    activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/RouterRegionBroker.java
    activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/RouterSubscription.java
    activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/package.html
    activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/
    activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/api/
    activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/api/ClientConnection.java
    activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/api/ClientSubscription.java
    activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/api/Destination.java
    activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/api/DestinationManager.java
    activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/api/RequestContext.java
    activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/api/StoreSubscription.java
    activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/api/package.html
    activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/core/
    activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/core/BroadcastDestination.java
    activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/core/BroadcastStoreSubscription.java
    activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/core/MultiDestinationClientSubscription.java
    activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/core/QualityOfService.java
    activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/core/Router.java
    activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/core/RouterFactory.java
    activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/core/SimpleDestinationManager.java
    activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/core/TransportClientConnection.java
    activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/core/package.html
    activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/core/queue/
    activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/core/queue/BroadcastQueue.java
    activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/core/queue/QueueSubscription.java
    activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/core/topic/
    activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/core/topic/Topic.java
    activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/core/topic/TopicSubscription.java
    activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/index/
    activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/index/api/
    activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/index/api/DataIndex.java
    activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/index/api/DataIndexManager.java
    activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/index/api/Index.java
    activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/index/api/IndexEntry.java
    activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/index/api/IndexManager.java
    activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/index/api/ReferenceIndex.java
    activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/index/api/package.html
    activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/index/jpa/
    activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/index/jpa/JpaDataIndex.java
    activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/index/jpa/JpaDataIndexManager.java
    activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/index/jpa/JpaIndex.java
    activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/index/jpa/JpaIndexManager.java
    activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/index/jpa/JpaReferenceIndex.java
    activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/index/jpa/model/
    activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/index/jpa/model/IndexRecord.java
    activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/index/jpa/model/Record.java
    activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/index/jpa/model/ReferenceRecord.java
    activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/index/jpa/model/StoreMetadata.java
    activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/index/jpa/model/package.html
    activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/index/jpa/package.html
    activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/store/
    activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/store/api/
    activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/store/api/CacheEntry.java
    activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/store/api/DataStore.java
    activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/store/api/DataStoreManager.java
    activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/store/api/ReferenceStore.java
    activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/store/api/Store.java
    activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/store/api/StoreManager.java
    activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/store/api/package.html
    activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/store/journal/
    activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/store/journal/JournalDataStore.java
    activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/store/journal/JournalDataStoreManager.java
    activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/store/journal/JournalDataStoreManagerFactory.java
    activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/store/journal/JournalReferenceStore.java
    activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/store/journal/JournalStore.java
    activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/store/journal/JournalStoreManager.java
    activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/store/journal/MyH2Dictionary.java
    activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/store/journal/data/
    activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/store/journal/data/AddMessageJournalEntry.java
    activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/store/journal/data/Journal.java
    activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/store/journal/data/JournalEntry.java
    activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/store/journal/data/RemoveMessageJournalEntry.java
    activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/store/journal/data/RemoveReferenceJournalEntry.java
    activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/store/journal/data/StoreJournalEntry.java
    activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/store/journal/data/StoreTxJournalEntry.java
    activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/store/journal/data/TraceJournalEntry.java
    activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/store/journal/data/TxJournalEntry.java
    activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/store/journal/data/package.html
    activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/store/journal/package.html
    activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/store/memory/
    activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/store/memory/MemoryDataStore.java
    activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/store/memory/MemoryDataStoreManager.java
    activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/store/memory/MemoryReferenceStore.java
    activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/store/memory/MemoryStore.java
    activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/store/memory/MemoryStoreManager.java
    activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/store/memory/package.html
    activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/util/
    activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/util/DuplicateAndMissedChecker.java
    activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/util/LinkedNode.java
    activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/util/LinkedNodeList.java
    activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/util/OrderChecker.java
    activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/util/Selectable.java
    activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/util/Selector.java
    activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/util/SelectorThreadPool.java
    activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/util/Sequence.java
    activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/util/SequenceSet.java
    activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/util/Usage.java
    activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/util/package.html
    activemq/sandbox/activemq-router/src/main/resources/
    activemq/sandbox/activemq-router/src/main/resources/META-INF/
    activemq/sandbox/activemq-router/src/main/resources/META-INF/persistence.xml
    activemq/sandbox/activemq-router/src/test/
    activemq/sandbox/activemq-router/src/test/java/
    activemq/sandbox/activemq-router/src/test/java/org/
    activemq/sandbox/activemq-router/src/test/java/org/apache/
    activemq/sandbox/activemq-router/src/test/java/org/apache/activemq/
    activemq/sandbox/activemq-router/src/test/java/org/apache/activemq/broker/
    activemq/sandbox/activemq-router/src/test/java/org/apache/activemq/broker/router/
    activemq/sandbox/activemq-router/src/test/java/org/apache/activemq/broker/router/api/
    activemq/sandbox/activemq-router/src/test/java/org/apache/activemq/broker/router/api/StubDestination.java
    activemq/sandbox/activemq-router/src/test/java/org/apache/activemq/broker/router/index/
    activemq/sandbox/activemq-router/src/test/java/org/apache/activemq/broker/router/index/IndexTestSupport.java
    activemq/sandbox/activemq-router/src/test/java/org/apache/activemq/broker/router/index/jpa/
    activemq/sandbox/activemq-router/src/test/java/org/apache/activemq/broker/router/index/jpa/JpaIndexTest.java
    activemq/sandbox/activemq-router/src/test/java/org/apache/activemq/broker/router/perf/
    activemq/sandbox/activemq-router/src/test/java/org/apache/activemq/broker/router/perf/OriginalQueueTests.java
    activemq/sandbox/activemq-router/src/test/java/org/apache/activemq/broker/router/perf/OriginalTopicTests.java
    activemq/sandbox/activemq-router/src/test/java/org/apache/activemq/broker/router/perf/QueueTests.java
    activemq/sandbox/activemq-router/src/test/java/org/apache/activemq/broker/router/perf/TopicTests.java
    activemq/sandbox/activemq-router/src/test/java/org/apache/activemq/broker/router/queue/
    activemq/sandbox/activemq-router/src/test/java/org/apache/activemq/broker/router/queue/TheoryTest.java
    activemq/sandbox/activemq-router/src/test/java/org/apache/activemq/broker/router/store/
    activemq/sandbox/activemq-router/src/test/java/org/apache/activemq/broker/router/store/StoreTestSupport.java
    activemq/sandbox/activemq-router/src/test/java/org/apache/activemq/broker/router/store/journal/
    activemq/sandbox/activemq-router/src/test/java/org/apache/activemq/broker/router/store/journal/JournalStoreTest.java
    activemq/sandbox/activemq-router/src/test/java/org/apache/activemq/broker/router/store/journal/data/
    activemq/sandbox/activemq-router/src/test/java/org/apache/activemq/broker/router/store/journal/data/JournalTest.java
    activemq/sandbox/activemq-router/src/test/java/org/apache/activemq/broker/router/store/memory/
    activemq/sandbox/activemq-router/src/test/java/org/apache/activemq/broker/router/store/memory/MemoryStoreTest.java
    activemq/sandbox/activemq-router/src/test/java/org/apache/activemq/broker/router/util/
    activemq/sandbox/activemq-router/src/test/java/org/apache/activemq/broker/router/util/DuplicateAndMissedCheckerTest.java
    activemq/sandbox/activemq-router/src/test/java/org/apache/activemq/broker/router/util/LinkedNodeTest.java
    activemq/sandbox/activemq-router/src/test/java/org/apache/activemq/broker/router/util/OrderCheckerTest.java
    activemq/sandbox/activemq-router/src/test/resources/
    activemq/sandbox/activemq-router/src/test/resources/log4j.properties

Added: activemq/sandbox/activemq-router/activemq.patch
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-router/activemq.patch?rev=633639&view=auto
==============================================================================
--- activemq/sandbox/activemq-router/activemq.patch (added)
+++ activemq/sandbox/activemq-router/activemq.patch Tue Mar  4 13:01:41 2008
@@ -0,0 +1,101 @@
+Index: activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java
+===================================================================
+--- activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java	(revision 633603)
++++ activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java	(working copy)
+@@ -125,7 +125,7 @@
+     private SystemUsage consumerSystemUsaage;
+     private PersistenceAdapter persistenceAdapter;
+     private PersistenceAdapterFactory persistenceFactory;
+-    private DestinationFactory destinationFactory;
++    protected DestinationFactory destinationFactory;
+     private MessageAuthorizationPolicy messageAuthorizationPolicy;
+     private List<TransportConnector> transportConnectors = new CopyOnWriteArrayList<TransportConnector>();
+     private List<NetworkConnector> networkConnectors = new CopyOnWriteArrayList<NetworkConnector>();
+@@ -1358,11 +1358,11 @@
+     }
+     
+     protected PersistenceAdapter registerPersistenceAdapterMBean(PersistenceAdapter adaptor) throws IOException {
+-        MBeanServer mbeanServer = getManagementContext().getMBeanServer();
+-        if (mbeanServer != null) {
+-
+-          
+-        }
++//        MBeanServer mbeanServer = getManagementContext().getMBeanServer();
++//        if (mbeanServer != null) {
++//
++//          
++//        }
+         return adaptor;
+     }
+ 
+@@ -1484,8 +1484,7 @@
+             }
+         };
+ 
+-        RegionBroker rBroker = (RegionBroker)regionBroker;
+-        rBroker.getDestinationStatistics().setEnabled(enableStatistics);
++//        RegionBroker rBroker = (RegionBroker)regionBroker;
+ 
+         if (isUseJmx()) {
+             ManagedRegionBroker managedBroker = (ManagedRegionBroker)regionBroker;
+@@ -1525,10 +1524,14 @@
+         configureServices(destinationInterceptors);
+ 
+         DestinationInterceptor destinationInterceptor = new CompositeDestinationInterceptor(destinationInterceptors);
+-        RegionBroker regionBroker = null;
+         if (destinationFactory == null) {
+             destinationFactory = new DestinationFactoryImpl(this, getTaskRunnerFactory(), getPersistenceAdapter());
+         }
++        return createRegionBroker(destinationInterceptor);
++    }
++    
++    protected Broker createRegionBroker(DestinationInterceptor destinationInterceptor) throws IOException {
++ 		    RegionBroker regionBroker;
+         if (isUseJmx()) {
+             MBeanServer mbeanServer = getManagementContext().getMBeanServer();
+             regionBroker = new ManagedRegionBroker(this, mbeanServer, getBrokerObjectName(), getTaskRunnerFactory(), getConsumerSystemUsage(), destinationFactory,
+@@ -1540,9 +1543,11 @@
+ 
+         regionBroker.setKeepDurableSubsActive(keepDurableSubsActive);
+         regionBroker.setBrokerName(getBrokerName());
+-        return regionBroker;
+-    }
++        regionBroker.getDestinationStatistics().setEnabled(enableStatistics);
+ 
++		    return regionBroker;
++	}
++
+     /**
+      * Create the default destination interceptor
+      */
+Index: activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java
+===================================================================
+--- activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java	(revision 633603)
++++ activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java	(working copy)
+@@ -726,7 +726,7 @@
+     }
+ 
+     public void dispatchSync(Command message) {
+-        getStatistics().getEnqueues().increment();
++//        getStatistics().getEnqueues().increment();
+         try {
+             processDispatch(message);
+         } catch (IOException e) {
+@@ -736,7 +736,7 @@
+ 
+     public void dispatchAsync(Command message) {
+         if (!stopped.get()) {
+-            getStatistics().getEnqueues().increment();
++//            getStatistics().getEnqueues().increment();
+             if (taskRunner == null) {
+                 dispatchSync(message);
+             } else {
+@@ -777,7 +777,7 @@
+                     sub.run();
+                 }
+             }
+-            getStatistics().getDequeues().increment();
++//            getStatistics().getDequeues().increment();
+         }
+     }
+ 

Added: activemq/sandbox/activemq-router/pom.xml
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-router/pom.xml?rev=633639&view=auto
==============================================================================
--- activemq/sandbox/activemq-router/pom.xml (added)
+++ activemq/sandbox/activemq-router/pom.xml Tue Mar  4 13:01:41 2008
@@ -0,0 +1,211 @@
+<?xml version="1.0" encoding="UTF-8"?>
+
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one or more
+    contributor license agreements.  See the NOTICE file distributed with
+    this work for additional information regarding copyright ownership.
+    The ASF licenses this file to You under the Apache License, Version 2.0
+    (the "License"); you may not use this file except in compliance with
+    the License.  You may obtain a copy of the License at
+   
+    http://www.apache.org/licenses/LICENSE-2.0
+   
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+-->
+
+<project
+  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"
+  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+  xmlns="http://maven.apache.org/POM/4.0.0">
+  
+  <modelVersion>4.0.0</modelVersion>
+
+  <parent>
+    <groupId>org.apache.activemq</groupId>
+    <artifactId>activemq-parent</artifactId>
+    <version>5.1-SNAPSHOT</version>
+  </parent>
+
+  <artifactId>activemq-router</artifactId>
+  <packaging>jar</packaging>
+  <name>ActiveMQ :: Router</name>
+
+  <dependencies>
+    <!-- activemq -->
+    <dependency>
+      <groupId>${pom.groupId}</groupId>
+      <artifactId>activemq-core</artifactId>
+    </dependency>
+    
+    <dependency>
+      <groupId>${pom.groupId}</groupId>
+      <artifactId>activemq-core</artifactId>
+      <scope>test</scope>
+      <type>test-jar</type>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.openjpa</groupId>
+      <artifactId>openjpa</artifactId>
+      <version>1.0.2</version>
+    </dependency>
+        
+    <dependency>
+      <groupId>commons-dbcp</groupId>
+      <artifactId>commons-dbcp</artifactId>
+      <version>1.2.2</version>
+      <optional>true</optional>
+    </dependency>
+    
+	<dependency>
+	    <groupId>hsqldb</groupId>
+	    <artifactId>hsqldb</artifactId>
+	    <version>1.8.0.7</version>
+	    <optional>true</optional>
+	</dependency>
+
+	<dependency>
+	    <groupId>com.h2database</groupId>
+	    <artifactId>h2</artifactId>
+	    <version>1.0.67</version>
+	</dependency>
+	
+<!-- 
+    <dependency>
+      <groupId>commons-collections</groupId>
+      <artifactId>commons-collections</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>commons-primitives</groupId>
+      <artifactId>commons-primitives</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>commons-pool</groupId>
+      <artifactId>commons-pool</artifactId>
+      <optional>true</optional>
+    </dependency>
+ -->
+ 
+    <dependency>
+      <groupId>org.apache.xbean</groupId>
+      <artifactId>xbean-spring</artifactId>
+      <optional>true</optional>
+    </dependency>
+    
+    <dependency>
+      <groupId>org.springframework</groupId>
+      <artifactId>spring</artifactId>
+      <optional>true</optional>
+    </dependency>
+
+	<!-- For testing -->
+    <dependency>
+      <groupId>junit</groupId>
+      <artifactId>junit</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>commons-logging</groupId>
+      <artifactId>commons-logging</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>log4j</groupId>
+      <artifactId>log4j</artifactId>
+      <scope>test</scope>
+    </dependency>
+
+  </dependencies>
+  
+  <build>
+    <plugins>
+
+<!-- 
+      <plugin>
+        <groupId>org.apache.xbean</groupId>
+        <artifactId>maven-xbean-plugin</artifactId>
+        <version>${xbean-version}</version>
+        <executions>
+          <execution>
+            <configuration>
+              <namespace>http://activemq.org/activemq-jpa-store/config/1.0</namespace>
+              <schema>target/xbean/activemq-jpa-store.xsd</schema>
+            </configuration>
+            <goals>
+              <goal>mapping</goal>
+            </goals>
+          </execution>
+        </executions>
+      </plugin>
+ -->
+    
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-antrun-plugin</artifactId>
+
+        <executions>
+          <execution>
+            <phase>process-classes</phase>
+            <configuration>
+              <tasks>
+                <path id="cp">
+                  <path refid="maven.test.classpath"/>
+                  <path refid="maven.compile.classpath"/>
+                  <path refid="maven.dependency.classpath"/>
+                </path>
+                <taskdef name="openjpac" classname="org.apache.openjpa.ant.PCEnhancerTask">
+                  <classpath refid="cp"/>
+                </taskdef>
+                <openjpac directory="${basedir}/target/jpa-classes">
+                  <classpath refid="cp"/>
+                  <fileset dir="${basedir}/target/classes">
+                    <include name="org/apache/activemq/broker/router/index/jpa/model/*.class"/>
+                  </fileset>
+                </openjpac>
+                <copy todir="${basedir}/target/classes">
+                  <fileset dir="${basedir}/target/jpa-classes"/>
+                </copy>
+              </tasks>
+            </configuration>
+            <goals>
+              <goal>run</goal>
+            </goals>
+          </execution>
+        </executions>
+      </plugin>
+      <plugin>
+        <artifactId>maven-surefire-plugin</artifactId>
+        <configuration>
+          <!-- 
+          <forkMode>pertest</forkMode>
+          -->
+          <forkMode>once</forkMode>
+          <childDelegation>false</childDelegation>
+          <useFile>true</useFile>
+          <argLine>-Xmx512M</argLine>
+          <systemProperties>
+            <property>
+              <name>org.apache.activemq.default.directory.prefix</name>
+              <value>target/</value>
+            </property>
+          </systemProperties>
+          <includes>
+            <include>**/*Test.*</include>
+          </includes>
+          <excludes>
+            <!-- These are performance tests and are not functional unit tests -->
+            <exclude>**/broker/router/performance/**</exclude>
+          </excludes>
+          
+        </configuration>
+      </plugin>
+    </plugins>
+  </build>
+
+</project>

Added: activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/RouterBrokerService.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/RouterBrokerService.java?rev=633639&view=auto
==============================================================================
--- activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/RouterBrokerService.java (added)
+++ activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/RouterBrokerService.java Tue Mar  4 13:01:41 2008
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker;
+
+import java.io.IOException;
+
+import org.apache.activemq.broker.region.DestinationInterceptor;
+
+/**
+ * 
+ * @author chirino
+ */
+public class RouterBrokerService extends BrokerService {
+    private RouterRegionBroker regionBroker;
+
+    @Override
+    protected Broker createRegionBroker(DestinationInterceptor destinationInterceptor) throws IOException {
+        regionBroker = new RouterRegionBroker(this, getTaskRunnerFactory(), getConsumerSystemUsage(), destinationFactory, destinationInterceptor);
+        regionBroker.setBrokerName(getBrokerName());
+        return regionBroker;
+    }
+
+    public RouterRegionBroker getRegionBroker() {
+        return regionBroker;
+    }
+
+    public void setRegionBroker(RouterRegionBroker regionBroker) {
+        this.regionBroker = regionBroker;
+    }
+}

Added: activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/RouterClientConnection.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/RouterClientConnection.java?rev=633639&view=auto
==============================================================================
--- activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/RouterClientConnection.java (added)
+++ activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/RouterClientConnection.java Tue Mar  4 13:01:41 2008
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker;
+
+import org.apache.activemq.broker.ConnectionContext;
+import org.apache.activemq.broker.router.api.ClientConnection;
+import org.apache.activemq.broker.router.api.ClientConnection.TransmitQueue;
+import org.apache.activemq.command.Command;
+import org.apache.activemq.command.Message;
+
+/**
+ * 
+ * @author chirino
+ */
+public class RouterClientConnection implements ClientConnection, TransmitQueue {
+
+    private final ConnectionContext context;
+
+    public RouterClientConnection(ConnectionContext context) {
+        this.context = context;
+    }
+
+    public TransmitQueue getTransmissionQueue() {
+        return this;
+    }
+
+    public boolean isAllowedToConsume(Message node) {
+        return true;
+    }
+
+    public void start() throws Exception {
+        context.getConnection().start();
+    }
+
+    public void stop() throws Exception {
+        context.getConnection().stop();
+    }
+
+    public void enqueue(Command command) {
+        context.getConnection().dispatchAsync(command);
+    }
+
+    public void enqueueFirst(Command command) {
+        context.getConnection().dispatchAsync(command);
+    }
+
+    public ConnectionContext getConnectionContext() {
+        return context;
+    }
+
+}

Added: activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/RouterRegionBroker.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/RouterRegionBroker.java?rev=633639&view=auto
==============================================================================
--- activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/RouterRegionBroker.java (added)
+++ activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/RouterRegionBroker.java Tue Mar  4 13:01:41 2008
@@ -0,0 +1,334 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.activemq.broker.region.DestinationFactory;
+import org.apache.activemq.broker.region.DestinationInterceptor;
+import org.apache.activemq.broker.region.RegionBroker;
+import org.apache.activemq.broker.router.api.ClientSubscription;
+import org.apache.activemq.broker.router.api.Destination;
+import org.apache.activemq.broker.router.api.RequestContext;
+import org.apache.activemq.broker.router.core.Router;
+import org.apache.activemq.broker.router.core.RouterFactory;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.BrokerId;
+import org.apache.activemq.command.BrokerInfo;
+import org.apache.activemq.command.ConnectionInfo;
+import org.apache.activemq.command.ConsumerId;
+import org.apache.activemq.command.ConsumerInfo;
+import org.apache.activemq.command.DestinationInfo;
+import org.apache.activemq.command.Message;
+import org.apache.activemq.command.MessageAck;
+import org.apache.activemq.command.MessageDispatch;
+import org.apache.activemq.command.MessageDispatchNotification;
+import org.apache.activemq.command.MessagePull;
+import org.apache.activemq.command.ProducerInfo;
+import org.apache.activemq.command.RemoveSubscriptionInfo;
+import org.apache.activemq.command.Response;
+import org.apache.activemq.command.TransactionId;
+import org.apache.activemq.thread.TaskRunnerFactory;
+import org.apache.activemq.usage.SystemUsage;
+import org.apache.activemq.util.IdGenerator;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ * One day we should revisit the RegionBroker concept. But since the existing
+ * connectors use it.. let override it's implementation so that we use the
+ * Router instead.
+ * 
+ * @author chirino
+ */
+public class RouterRegionBroker extends RegionBroker {
+    private static final Log LOG = LogFactory.getLog(RouterRegionBroker.class);
+    private static final IdGenerator BROKER_ID_GENERATOR = new IdGenerator();
+
+    private final BrokerService brokerService;
+    private final AtomicBoolean stopped = new AtomicBoolean();
+    private ConnectionContext adminConnectionContext;
+    private BrokerId brokerId;
+    private Router router;
+
+    private final Map<String, RouterClientConnection> clientIdSet = new ConcurrentHashMap<String, RouterClientConnection>();
+    private final CopyOnWriteArrayList<Connection> connections = new CopyOnWriteArrayList<Connection>();
+    private final Map<ConsumerId, ClientSubscription> subscriptions = new ConcurrentHashMap<ConsumerId, ClientSubscription>();
+
+    public RouterRegionBroker(BrokerService brokerService, TaskRunnerFactory taskRunnerFactory, SystemUsage memoryManager, DestinationFactory destinationFactory,
+            DestinationInterceptor destinationInterceptor) throws IOException {
+        super(brokerService, taskRunnerFactory, memoryManager, destinationFactory, destinationInterceptor);
+        this.brokerService = brokerService;
+    }
+
+    // /////////////////////////////////////////////////////////////////////////////
+    //
+    // Properties
+    //
+    // /////////////////////////////////////////////////////////////////////////////
+    //
+    // Lifecycle Operations...
+    //
+    // /////////////////////////////////////////////////////////////////////////////
+    public void start() throws Exception {
+        RouterFactory factory = new RouterFactory();
+        factory.setBrokerName(getBrokerName());
+        router = factory.createRouter();
+        router.start();
+    }
+
+    public void stop() throws Exception {
+        router.stop();
+    }
+
+    public void brokerServiceStarted() {
+    }
+
+    public void gc() {
+        router.fushCaches();
+    }
+
+    // /////////////////////////////////////////////////////////////////////////////
+    //
+    // Client State Operations
+    //
+    // /////////////////////////////////////////////////////////////////////////////
+    public void addConnection(ConnectionContext context, ConnectionInfo info) throws Exception {
+        super.addConnection(context, info);
+        clientIdSet.put(info.getClientId(), new RouterClientConnection(context));
+    }
+
+    public void removeConnection(ConnectionContext context, ConnectionInfo info, Throwable error) throws Exception {
+        super.removeConnection(context, info, error);
+        clientIdSet.remove(info.getClientId());
+    }
+
+    public void addProducer(ConnectionContext context, ProducerInfo info) throws Exception {
+        // Look up the destination.. if it does not exist fail.
+        if (info.getDestination() != null) {
+            Set<Destination> destinations = router.getDestinationManager().getDestinations(info.getDestination());
+            for (Destination destination : destinations) {
+                destination.getProducerCounter().incrementAndGet();
+            }
+        }
+    }
+
+    public void removeProducer(ConnectionContext context, ProducerInfo info) throws Exception {
+        if (info.getDestination() != null) {
+            Set<Destination> destinations = router.getDestinationManager().getDestinations(info.getDestination());
+            for (Destination destination : destinations) {
+                destination.getProducerCounter().decrementAndGet();
+            }
+        }
+    }
+
+    public org.apache.activemq.broker.region.Subscription addConsumer(ConnectionContext context, ConsumerInfo info) throws Exception {
+        RouterClientConnection client;
+        client = clientIdSet.get(context.getClientId());
+
+        ClientSubscription subscription = router.createSubscription(info);
+        subscription.setClientConnection(client);
+        subscriptions.put(info.getConsumerId(), subscription);
+        subscription.start();
+        return new RouterSubscription(subscription);
+
+    }
+
+    public void removeConsumer(ConnectionContext context, ConsumerInfo info) throws Exception {
+        ClientSubscription subscription = router.getSubscription(info.getConsumerId());
+        if (subscription != null) {
+            router.removeSubscription(subscription);
+            subscription.stop();
+        }
+    }
+
+    public void removeSubscription(ConnectionContext context, RemoveSubscriptionInfo info) throws Exception {
+    }
+
+    // /////////////////////////////////////////////////////////////////////////////
+    //
+    // Destination Management Operations
+    //
+    // /////////////////////////////////////////////////////////////////////////////
+    public ActiveMQDestination[] getDestinations() throws Exception {
+        return super.getDestinations();
+    }
+
+    public Set<ActiveMQDestination> getDurableDestinations() {
+        return null;
+    }
+
+    public Map<ActiveMQDestination, org.apache.activemq.broker.region.Destination> getDestinationMap() {
+        return null;
+    }
+
+    public Set<org.apache.activemq.broker.region.Destination> getDestinations(ActiveMQDestination destination) {
+        return null;
+    }
+
+    public void addDestinationInfo(ConnectionContext context, DestinationInfo info) throws Exception {
+    }
+
+    public void removeDestinationInfo(ConnectionContext context, DestinationInfo info) throws Exception {
+    }
+
+    public org.apache.activemq.broker.region.Destination addDestination(ConnectionContext context, ActiveMQDestination destination) throws Exception {
+        return null;
+    }
+
+    public void removeDestination(ConnectionContext context, ActiveMQDestination destination, long timeout) throws Exception {
+    }
+
+    // /////////////////////////////////////////////////////////////////////////////
+    //
+    // Core Messaging Operations
+    //
+    // /////////////////////////////////////////////////////////////////////////////
+    public void send(ProducerBrokerExchange producerExchange, final Message message) throws Exception {
+
+        final RouterClientConnection client;
+        client = clientIdSet.get(producerExchange.getConnectionContext().getClientId());
+
+        // Don't auto response.. we will send a manual response once the message
+        // is securely on disk.
+        producerExchange.getConnectionContext().setDontSendReponse(true);
+
+        ActiveMQDestination name = message.getDestination();
+        final Set<Destination> destinations = router.getDestinationManager().getDestinations(name);
+
+        Runnable completionHandler = null;
+        if (message.isResponseRequired()) {
+
+            // TODO: this impl is making an assumption that the
+            // completionHandler is only
+            // going to get called once per destination. Need to verify that
+            // assumption.
+
+            // Send the message response once all destinations have secured the
+            // message.
+            completionHandler = new Runnable() {
+                AtomicInteger completeCounter = new AtomicInteger(destinations.size());
+
+                public void run() {
+                    if (completeCounter.decrementAndGet() == 0) {
+                        Response response = new Response();
+                        response.setCorrelationId(message.getCommandId());
+                        client.getTransmissionQueue().enqueueFirst(response);
+                    }
+                }
+            };
+        }
+
+        RequestContext requestContext = new RequestContext();
+        requestContext.router = router;
+        requestContext.clientConnection = client;
+        requestContext.command = message;
+        requestContext.connectionContext = producerExchange.getConnectionContext();
+
+        for (Destination destination : destinations) {
+            destination.enqueue(requestContext, message, completionHandler);
+        }
+    }
+
+    public void acknowledge(ConsumerBrokerExchange consumerExchange, MessageAck ack) throws Exception {
+
+        final RouterClientConnection client;
+        client = clientIdSet.get(consumerExchange.getConnectionContext().getClientId());
+
+        RequestContext requestContext = new RequestContext();
+        requestContext.router = router;
+        requestContext.clientConnection = client;
+        requestContext.command = ack;
+
+        ClientSubscription subscription = subscriptions.get(ack.getConsumerId());
+        subscription.acknowledge(requestContext, ack);
+    }
+
+    public void messageExpired(ConnectionContext context, org.apache.activemq.broker.region.MessageReference messageReference) {
+    }
+
+    public void postProcessDispatch(MessageDispatch messageDispatch) {
+    }
+
+    public void preProcessDispatch(MessageDispatch messageDispatch) {
+    }
+
+    public void sendToDeadLetterQueue(ConnectionContext context, org.apache.activemq.broker.region.MessageReference messageReference) {
+    }
+
+    public Response messagePull(ConnectionContext context, MessagePull pull) throws Exception {
+        return null;
+    }
+
+    public boolean isExpired(org.apache.activemq.broker.region.MessageReference messageReference) {
+        return false;
+    }
+
+    // /////////////////////////////////////////////////////////////////////////////
+    //
+    // Network/Cluster Operations
+    //
+    // /////////////////////////////////////////////////////////////////////////////
+    public void addBroker(Connection connection, BrokerInfo info) {
+    }
+
+    public void removeBroker(Connection connection, BrokerInfo info) {
+    }
+
+    public void processDispatchNotification(MessageDispatchNotification messageDispatchNotification) throws Exception {
+    }
+
+    public BrokerInfo[] getPeerBrokerInfos() {
+        return null;
+    }
+
+    // /////////////////////////////////////////////////////////////////////////////
+    //
+    // Transaction Operations...
+    //
+    // /////////////////////////////////////////////////////////////////////////////
+    public void beginTransaction(ConnectionContext context, TransactionId xid) throws Exception {
+    }
+
+    public void commitTransaction(ConnectionContext context, TransactionId xid, boolean onePhase) throws Exception {
+    }
+
+    public void forgetTransaction(ConnectionContext context, TransactionId transactionId) throws Exception {
+    }
+
+    public int prepareTransaction(ConnectionContext context, TransactionId xid) throws Exception {
+        return 0;
+    }
+
+    public void rollbackTransaction(ConnectionContext context, TransactionId xid) throws Exception {
+    }
+
+    public TransactionId[] getPreparedTransactions(ConnectionContext context) throws Exception {
+        return null;
+    }
+
+    public Router getRouter() {
+        return router;
+    }
+
+}

Added: activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/RouterSubscription.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/RouterSubscription.java?rev=633639&view=auto
==============================================================================
--- activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/RouterSubscription.java (added)
+++ activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/RouterSubscription.java Tue Mar  4 13:01:41 2008
@@ -0,0 +1,214 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker;
+
+import java.io.IOException;
+import java.util.List;
+
+import javax.jms.InvalidSelectorException;
+import javax.management.ObjectName;
+
+import org.apache.activemq.broker.ConnectionContext;
+import org.apache.activemq.broker.region.Destination;
+import org.apache.activemq.broker.region.MessageReference;
+import org.apache.activemq.broker.region.Subscription;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ConsumerInfo;
+import org.apache.activemq.command.MessageAck;
+import org.apache.activemq.command.MessageDispatchNotification;
+import org.apache.activemq.command.MessagePull;
+import org.apache.activemq.command.Response;
+import org.apache.activemq.filter.MessageEvaluationContext;
+
+/**
+ * 
+ * @author chirino
+ */
+public class RouterSubscription implements Subscription {
+
+    private final org.apache.activemq.broker.router.api.ClientSubscription sub;
+
+    public RouterSubscription(org.apache.activemq.broker.router.api.ClientSubscription sub) {
+        this.sub = sub;
+    }
+
+    public void acknowledge(ConnectionContext context, MessageAck ack) throws Exception {
+        // TODO Auto-generated method stub
+
+    }
+
+    public void add(MessageReference node) throws Exception {
+        // TODO Auto-generated method stub
+
+    }
+
+    public void add(ConnectionContext context, Destination destination) throws Exception {
+        // TODO Auto-generated method stub
+
+    }
+
+    public void destroy() {
+        // TODO Auto-generated method stub
+
+    }
+
+    public void gc() {
+        // TODO Auto-generated method stub
+
+    }
+
+    public ConsumerInfo getConsumerInfo() {
+        // TODO Auto-generated method stub
+        return null;
+    }
+
+    public long getDequeueCounter() {
+        // TODO Auto-generated method stub
+        return 0;
+    }
+
+    public long getDispatchedCounter() {
+        // TODO Auto-generated method stub
+        return 0;
+    }
+
+    public int getDispatchedQueueSize() {
+        // TODO Auto-generated method stub
+        return 0;
+    }
+
+    public long getEnqueueCounter() {
+        // TODO Auto-generated method stub
+        return 0;
+    }
+
+    public ObjectName getObjectName() {
+        // TODO Auto-generated method stub
+        return null;
+    }
+
+    public int getPendingQueueSize() {
+        // TODO Auto-generated method stub
+        return 0;
+    }
+
+    public int getPrefetchSize() {
+        // TODO Auto-generated method stub
+        return 0;
+    }
+
+    public String getSelector() {
+        // TODO Auto-generated method stub
+        return null;
+    }
+
+    public boolean isHighWaterMark() {
+        // TODO Auto-generated method stub
+        return false;
+    }
+
+    public boolean isLowWaterMark() {
+        // TODO Auto-generated method stub
+        return false;
+    }
+
+    public boolean isRecoveryRequired() {
+        // TODO Auto-generated method stub
+        return false;
+    }
+
+    public boolean isSlave() {
+        // TODO Auto-generated method stub
+        return false;
+    }
+
+    public boolean matches(MessageReference node, MessageEvaluationContext context) throws IOException {
+        // TODO Auto-generated method stub
+        return false;
+    }
+
+    public boolean matches(ActiveMQDestination destination) {
+        // TODO Auto-generated method stub
+        return false;
+    }
+
+    public void processMessageDispatchNotification(MessageDispatchNotification mdn) throws Exception {
+        // TODO Auto-generated method stub
+
+    }
+
+    public Response pullMessage(ConnectionContext context, MessagePull pull) throws Exception {
+        // TODO Auto-generated method stub
+        return null;
+    }
+
+    public void remove(ConnectionContext context, Destination destination) throws Exception {
+        // TODO Auto-generated method stub
+
+    }
+
+    public void setObjectName(ObjectName objectName) {
+        // TODO Auto-generated method stub
+
+    }
+
+    public void setSelector(String selector) throws InvalidSelectorException, UnsupportedOperationException {
+        // TODO Auto-generated method stub
+
+    }
+
+    public void updateConsumerPrefetch(int newPrefetch) {
+        // TODO Auto-generated method stub
+
+    }
+
+    public boolean addRecoveredMessage(ConnectionContext context, MessageReference message) throws Exception {
+        // TODO Auto-generated method stub
+        return false;
+    }
+
+    public ActiveMQDestination getActiveMQDestination() {
+        // TODO Auto-generated method stub
+        return null;
+    }
+
+    public List<MessageReference> getInFlightMessages() {
+        // TODO Auto-generated method stub
+        return null;
+    }
+
+    public int getInFlightSize() {
+        // TODO Auto-generated method stub
+        return 0;
+    }
+
+    public int getInFlightUsage() {
+        // TODO Auto-generated method stub
+        return 0;
+    }
+
+    public boolean isBrowser() {
+        // TODO Auto-generated method stub
+        return false;
+    }
+
+    public boolean isFull() {
+        // TODO Auto-generated method stub
+        return false;
+    }
+
+}

Added: activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/package.html
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/package.html?rev=633639&view=auto
==============================================================================
--- activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/package.html (added)
+++ activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/package.html Tue Mar  4 13:01:41 2008
@@ -0,0 +1,27 @@
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one or more
+    contributor license agreements.  See the NOTICE file distributed with
+    this work for additional information regarding copyright ownership.
+    The ASF licenses this file to You under the Apache License, Version 2.0
+    (the "License"); you may not use this file except in compliance with
+    the License.  You may obtain a copy of the License at
+   
+    http://www.apache.org/licenses/LICENSE-2.0
+   
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+-->
+<html>
+<head>
+</head>
+<body>
+
+Provides a BrokerService which uses a Router back end to route the messages.  Right now this mostly bridge code 
+from the existing BrokerSevice API to the new Router APIs.  We should look at cleaning up the these interfaces
+to avoid some of the leaky abstractions.
+
+</body>
+</html>

Added: activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/api/ClientConnection.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/api/ClientConnection.java?rev=633639&view=auto
==============================================================================
--- activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/api/ClientConnection.java (added)
+++ activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/api/ClientConnection.java Tue Mar  4 13:01:41 2008
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.router.api;
+
+import org.apache.activemq.command.Command;
+import org.apache.activemq.command.Message;
+
+/**
+ * 
+ * @author chirino
+ */
+public interface ClientConnection {
+
+    public interface TransmitQueue {
+        public void enqueue(Command command);
+
+        public void enqueueFirst(Command command);
+    }
+
+    public void start() throws Exception;
+
+    public void stop() throws Exception;
+
+    public TransmitQueue getTransmissionQueue();
+
+    /**
+     * Called by a Subscription
+     * 
+     * @param node
+     * @return
+     */
+    public boolean isAllowedToConsume(Message node);
+
+}

Added: activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/api/ClientSubscription.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/api/ClientSubscription.java?rev=633639&view=auto
==============================================================================
--- activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/api/ClientSubscription.java (added)
+++ activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/api/ClientSubscription.java Tue Mar  4 13:01:41 2008
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.router.api;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.activemq.Service;
+import org.apache.activemq.broker.router.store.api.CacheEntry;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ConsumerInfo;
+import org.apache.activemq.command.Message;
+import org.apache.activemq.command.MessageAck;
+import org.apache.activemq.filter.MessageEvaluationContext;
+
+/**
+ * 
+ * @author chirino
+ */
+public interface ClientSubscription extends Service {
+
+    void acknowledge(RequestContext requestContext, MessageAck messageAck) throws Exception;
+
+    void setClientConnection(ClientConnection clientConnection);
+
+    boolean matches(ActiveMQDestination name);
+
+    public boolean matches(Message node, MessageEvaluationContext context) throws IOException;
+
+    ActiveMQDestination getDestinationName();
+
+    ConsumerInfo getInfo();
+
+    public boolean offer(StoreSubscription source, CacheEntry reference) throws Exception;
+
+    public void offer(StoreSubscription source, List<CacheEntry> list) throws Exception;
+
+    public void addStoreSubscription(StoreSubscription destination);
+
+    public void removeStoreSubscription(StoreSubscription destination);
+
+    public void wakeup();
+
+}

Added: activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/api/Destination.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/api/Destination.java?rev=633639&view=auto
==============================================================================
--- activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/api/Destination.java (added)
+++ activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/api/Destination.java Tue Mar  4 13:01:41 2008
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.router.api;
+
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.activemq.Service;
+import org.apache.activemq.broker.router.core.Router;
+import org.apache.activemq.broker.router.store.api.CacheEntry;
+import org.apache.activemq.broker.router.store.api.DataStore;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.Message;
+import org.apache.activemq.command.MessageAck;
+import org.apache.activemq.usage.SystemUsage;
+
+/**
+ * 
+ * @author chirino
+ */
+public interface Destination extends Service {
+
+    public void addSubscription(ClientSubscription subscription) throws Exception;
+
+    public void removeSubscription(ClientSubscription subscription) throws Exception;
+
+    public Router getRouter();
+
+    public ActiveMQDestination getName();
+
+    public void enqueue(RequestContext requestContext, Message message, Runnable onStored) throws Exception;
+
+    public boolean lockForDispatch(ClientSubscription source, CacheEntry ref);
+
+    public void dequeue(RequestContext context, MessageAck ack, DataStore dataStore, long storeId) throws Exception;
+
+    public AtomicInteger getProducerCounter();
+
+    public AtomicInteger getConsumerCounter();
+
+    public AtomicInteger getEnqueueCounter();
+
+    public SystemUsage getSystemUsage();
+
+}

Added: activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/api/DestinationManager.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/api/DestinationManager.java?rev=633639&view=auto
==============================================================================
--- activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/api/DestinationManager.java (added)
+++ activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/api/DestinationManager.java Tue Mar  4 13:01:41 2008
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.router.api;
+
+import java.util.Set;
+
+import org.apache.activemq.broker.router.core.SimpleDestinationManager.DestinationManagerCallback;
+import org.apache.activemq.command.ActiveMQDestination;
+
+/**
+ * 
+ * @author chirino
+ */
+public interface DestinationManager {
+
+    public abstract Set<Destination> getDestinations(ActiveMQDestination name) throws Exception;
+
+    public abstract Destination createDestination(ActiveMQDestination name) throws Exception;
+
+    public abstract void destroyDestination(ActiveMQDestination name, boolean force) throws Exception;
+
+    public abstract boolean isAutoCreate();
+
+    public abstract void setAutoCreate(boolean autoCreate);
+
+    public abstract Object getMutex();
+
+    public DestinationManagerCallback getDestinationManagerCallback();
+
+    public void setDestinationManagerCallback(DestinationManagerCallback destinationManagerCallback);
+}
\ No newline at end of file

Added: activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/api/RequestContext.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/api/RequestContext.java?rev=633639&view=auto
==============================================================================
--- activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/api/RequestContext.java (added)
+++ activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/api/RequestContext.java Tue Mar  4 13:01:41 2008
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.router.api;
+
+import org.apache.activemq.broker.ConnectionContext;
+import org.apache.activemq.broker.router.core.Router;
+import org.apache.activemq.command.Command;
+import org.apache.activemq.transaction.Transaction;
+
+/**
+ * 
+ * @author chirino
+ */
+public class RequestContext {
+
+    public Router router;
+    public ClientConnection clientConnection;
+    public boolean autoRespond = true;
+    public Command command;
+    public Transaction transaction;
+
+    public ConnectionContext connectionContext;
+
+    /**
+     * @return the transaction being used.
+     */
+    public Transaction getTransaction() {
+        return transaction;
+    }
+
+    /**
+     * @param transaction
+     *            being used.
+     */
+    public void setTransaction(Transaction transaction) {
+        this.transaction = transaction;
+    }
+
+    public boolean isInTransaction() {
+        return transaction != null;
+    }
+
+}

Added: activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/api/StoreSubscription.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/api/StoreSubscription.java?rev=633639&view=auto
==============================================================================
--- activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/api/StoreSubscription.java (added)
+++ activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/api/StoreSubscription.java Tue Mar  4 13:01:41 2008
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.router.api;
+
+/**
+ * 
+ * @author chirino
+ */
+public interface StoreSubscription {
+    public void wakeup();
+
+}

Added: activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/api/package.html
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/api/package.html?rev=633639&view=auto
==============================================================================
--- activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/api/package.html (added)
+++ activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/api/package.html Tue Mar  4 13:01:41 2008
@@ -0,0 +1,25 @@
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one or more
+    contributor license agreements.  See the NOTICE file distributed with
+    this work for additional information regarding copyright ownership.
+    The ASF licenses this file to You under the Apache License, Version 2.0
+    (the "License"); you may not use this file except in compliance with
+    the License.  You may obtain a copy of the License at
+   
+    http://www.apache.org/licenses/LICENSE-2.0
+   
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+-->
+<html>
+<head>
+</head>
+<body>
+
+The interfaces of the core components in the Router. 
+
+</body>
+</html>

Added: activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/core/BroadcastDestination.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/core/BroadcastDestination.java?rev=633639&view=auto
==============================================================================
--- activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/core/BroadcastDestination.java (added)
+++ activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/core/BroadcastDestination.java Tue Mar  4 13:01:41 2008
@@ -0,0 +1,114 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.router.core;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.activemq.broker.router.api.ClientSubscription;
+import org.apache.activemq.broker.router.api.Destination;
+import org.apache.activemq.broker.router.api.RequestContext;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.Message;
+import org.apache.activemq.usage.SystemUsage;
+
+/**
+ * This Destination implementation broadcasts messages to all subscribers.
+ * 
+ * In the queue case the subscriber gets an exclusive lock before dispatching
+ * the message to the consumer. This works well for a small number of consumers
+ * or if the consumers are using selectors to partition the messages between the
+ * consumers.
+ * 
+ * @author chirino
+ */
+abstract public class BroadcastDestination implements Destination, org.apache.activemq.Service {
+
+    protected final Router router;
+    protected final ActiveMQDestination name;
+    private final SystemUsage systemUsage = new SystemUsage();
+
+    private final AtomicInteger producerCounter = new AtomicInteger();
+    private final AtomicInteger consumerCounter = new AtomicInteger();
+    private final AtomicInteger enqueueCounter = new AtomicInteger();
+
+    public BroadcastDestination(Router router, ActiveMQDestination name) {
+        this.router = router;
+        this.name = name;
+        systemUsage.getMemoryUsage().setLimit(1024 * 1024 * 20);
+    }
+
+    public Router getRouter() {
+        return router;
+    }
+
+    public ActiveMQDestination getName() {
+        return name;
+    }
+
+    public AtomicInteger getProducerCounter() {
+        return producerCounter;
+    }
+
+    public AtomicInteger getConsumerCounter() {
+        return consumerCounter;
+    }
+
+    public AtomicInteger getEnqueueCounter() {
+        return enqueueCounter;
+    }
+
+    public SystemUsage getSystemUsage() {
+        return systemUsage;
+    }
+
+    public void enqueue(RequestContext ctx, Message message, Runnable onStored) throws Exception {
+        while (!systemUsage.getMemoryUsage().waitForSpace(100)) {
+            if (ctx.connectionContext.getStopping().get()) {
+                throw new IOException("Connection closed, enqueue aborted.");
+            }
+        }
+        enqueueCounter.incrementAndGet();
+        QualityOfService qos = chooseQosFor(message);
+        qos.enqueue(ctx, message, onStored);
+    }
+
+    public void addSubscription(ClientSubscription subscription) throws Exception {
+        List<QualityOfService> qoses = getAllQos();
+        for (QualityOfService qos : qoses) {
+            qos.addSubscription(subscription);
+        }
+    }
+
+    public void removeSubscription(ClientSubscription subscription) throws Exception {
+        List<QualityOfService> qoses = getAllQos();
+        for (QualityOfService qos : qoses) {
+            qos.removeSubscription(subscription);
+        }
+    }
+
+    public void start() throws Exception {
+    }
+
+    public void stop() throws Exception {
+    }
+
+    abstract protected QualityOfService chooseQosFor(Message message);
+
+    abstract protected List<QualityOfService> getAllQos();
+}

Added: activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/core/BroadcastStoreSubscription.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/core/BroadcastStoreSubscription.java?rev=633639&view=auto
==============================================================================
--- activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/core/BroadcastStoreSubscription.java (added)
+++ activemq/sandbox/activemq-router/src/main/java/org/apache/activemq/broker/router/core/BroadcastStoreSubscription.java Tue Mar  4 13:01:41 2008
@@ -0,0 +1,299 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.router.core;
+
+import java.io.IOException;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import org.apache.activemq.broker.router.api.ClientSubscription;
+import org.apache.activemq.broker.router.api.StoreSubscription;
+import org.apache.activemq.broker.router.store.api.CacheEntry;
+import org.apache.activemq.broker.router.store.api.DataStore;
+import org.apache.activemq.broker.router.store.api.ReferenceStore;
+import org.apache.activemq.broker.router.util.Selectable;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.Message;
+import org.apache.activemq.filter.MessageEvaluationContext;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ * 
+ * @author chirino
+ */
+public class BroadcastStoreSubscription implements StoreSubscription {
+
+    final private Log log;
+
+    private final int maxPrefetch;
+    private final DataStore dataStore;
+    private final ClientSubscription clientSubscription;
+    private final ReferenceStore store;
+    private final LinkedList<CacheEntry> prefetch;
+
+    // Used to track recovery of the subscription...
+    private boolean recoveringFromDataStore;
+    private CacheEntry lastRecover;
+    private CacheEntry recoverPosition;
+    private CacheEntry storePosition;
+
+    private final ReadWriteLock rwl = new ReentrantReadWriteLock();
+    private boolean enqueueToStore;
+    private AtomicBoolean dequeueFromStore = new AtomicBoolean();
+
+    private Selectable<Runnable> storePumpTask;
+
+    public BroadcastStoreSubscription(DataStore dataStore, ReferenceStore store, ClientSubscription clientSubscription) {
+        this.dataStore = dataStore;
+        this.store = store;
+        this.clientSubscription = clientSubscription;
+        this.maxPrefetch = Math.max(1, clientSubscription.getInfo().getPrefetchSize());
+        this.prefetch = new LinkedList<CacheEntry>();
+        this.log = LogFactory.getLog(BroadcastStoreSubscription.class.getName() + ".consumer=" + clientSubscription.getInfo().getConsumerId());
+    }
+
+    public ClientSubscription getClientSubscription() {
+        return clientSubscription;
+    }
+
+    public ReferenceStore getStore() {
+        return store;
+    }
+
+    public DataStore getDataStore() {
+        return dataStore;
+    }
+
+    public void start() throws Exception {
+        storePumpTask = dataStore.getDestination().getRouter().getDestinationPrefetchSelector().create(new Runnable() {
+            final AtomicInteger runCounter = new AtomicInteger();
+
+            public void run() {
+                try {
+                    assert runCounter.incrementAndGet() == 1 : "Run counter was not 1";
+                    pump();
+                } catch (Throwable e) {
+                    e.printStackTrace();
+                } finally {
+                    runCounter.decrementAndGet();
+                }
+            }
+        });
+        wakeup();
+    }
+
+    public void stop() throws Exception {
+        storePumpTask.close();
+    }
+
+    public boolean offer(CacheEntry cacheEntry) throws Exception {
+        rwl.readLock().lock();
+        try {
+            if (enqueueToStore) {
+                store.addReference(cacheEntry);
+            } else {
+                if (!clientSubscription.offer(this, cacheEntry)) {
+                    rwl.readLock().unlock();
+                    rwl.writeLock().lock();
+                    try {
+                        if (!enqueueToStore) {
+                            log.debug("Next is full..  spooling to store");
+                            enqueueToStore = true;
+                            storePumpTask.setEnabled(true);
+                        }
+                    } finally {
+                        rwl.readLock().lock();
+                        rwl.writeLock().unlock();
+                    }
+                    store.addReference(cacheEntry);
+                    // wakeup();
+                }
+            }
+        } finally {
+            rwl.readLock().unlock();
+        }
+        return true;
+    }
+
+    /**
+     * 
+     * @return true while the store is not empty.
+     * @throws Exception
+     */
+    public void pump() throws Exception {
+
+        if (!dequeueFromStore.getAndSet(false)) {
+            return;
+        }
+
+        log.debug("Pumping ");
+        for (;;) {
+            if (prefetch.isEmpty()) {
+                long start = System.currentTimeMillis();
+                load(prefetch, maxPrefetch - prefetch.size());
+                long end = System.currentTimeMillis();
+                log.debug("Loaded Prefetch: " + prefetch.size() + " msgs in " + (end - start) + " msec");
+            }
+            if (prefetch.isEmpty()) {
+                // Looks like the store is empty..
+                // try to enqueue directly to the next guy.
+                rwl.writeLock().lock();
+                try {
+                    load(prefetch, maxPrefetch - prefetch.size());
+                    if (prefetch.isEmpty()) {
+                        log.debug("store is empty.. will enqueue directly");
+                        enqueueToStore = false;
+                        return;
+                    } else {
+                        // Prefetch is not empty, while we are blocking message
+                        // dispatching.. see if we can empty it out so that
+                        // we can switch to direct dispatching..
+                        int size = prefetch.size();
+                        log.debug("Offering subscription " + size + " messages");
+                        clientSubscription.offer(this, prefetch);
+                        if (prefetch.isEmpty()) {
+                            // Yay it emptied out.. we can direct dispatch...
+                            log.debug("store is empty.. will enqueue directly");
+                            enqueueToStore = false;
+                            return;
+                        }
+                    }
+                } finally {
+                    rwl.writeLock().unlock();
+                }
+            }
+
+            // Try to offer those prefetch messages to the
+            // next guy. If he takes any he will remove those messages.
+            int size = prefetch.size();
+            log.debug("Offering subscription " + size + " messages");
+            clientSubscription.offer(this, prefetch);
+            if (prefetch.size() == size) {
+                log.debug("sub did not accept more message.. pump done.");
+                return;
+            }
+        }
+    }
+
+    public void wakeup() {
+        rwl.readLock().lock();
+        try {
+            if (enqueueToStore) {
+                log.debug("Enabling pump store task");
+                dequeueFromStore.set(true);
+                storePumpTask.setEnabled(true);
+            } else {
+                log.debug("Not enabling pump store task, we are doing direct dispatch");
+            }
+        } finally {
+            rwl.readLock().unlock();
+        }
+    }
+
+    /**
+     * We try really hard to avoid creating contention hear with the add()
+     * operation. The load() operation is slow and we should be able to
+     * concurrently add() while it's working.
+     * 
+     * @param target
+     * @param max
+     * @return
+     * @throws Exception
+     */
+    public void load(LinkedList<CacheEntry> target, int max) throws Exception {
+        int counter = 0;
+
+        // We have to keep looping because the subscription might not
+        // be interested in all the messages we initially load.
+        while (recoveringFromDataStore && counter < max) {
+
+            // This also loads the message into memory.
+
+            long start = System.currentTimeMillis();
+            int c = max - counter;
+            log.debug("Loading up to " + c + " msgs");
+            List<CacheEntry> data = dataStore.load(recoverPosition, lastRecover, c);
+            long end = System.currentTimeMillis();
+            log.debug("Loaded: " + data.size() + " msgs in " + (end - start) + " msec");
+
+            if (data.size() < max - counter) {
+                recoveringFromDataStore = false;
+            }
+
+            if (data.size() > 0) {
+                recoverPosition = data.get(data.size() - 1);
+            }
+
+            // Not every item in the data store may match the selection criteria
+            // of the
+            // subscription.. so check the message to see if they match.
+            ActiveMQDestination name = dataStore.getDestination().getName();
+            for (CacheEntry mr : data) {
+                try {
+                    // message could be null due to the store doing dirty
+                    // reads...
+                    Message message = mr.getMessage();
+                    MessageEvaluationContext mec = new MessageEvaluationContext();
+                    mec.setDestination(name);
+                    mec.setMessageReference(message);
+                    if (message != null && clientSubscription.matches(message, mec)) {
+                        counter++;
+                        target.add(mr);
+                    } else {
+                        // If it did not match.. then unload it.
+                        mr.unload();
+                    }
+                } catch (Throwable ignore) {
+                    // If it did not match.. then unload it.
+                    ignore.printStackTrace();
+                    mr.unload();
+                }
+            }
+        }
+
+        if (counter < max) {
+            long start = System.currentTimeMillis();
+            int c = max - counter;
+            log.debug("Loading up to " + c + " msgs");
+            List<CacheEntry> data = store.remove(storePosition, null, c);
+            long end = System.currentTimeMillis();
+            log.debug("Loaded: " + data.size() + " msgs in " + (end - start) + " msec");
+            if (data.size() > 0) {
+                storePosition = data.get(data.size() - 1);
+                counter += data.size();
+                target.addAll(data);
+            }
+        }
+    }
+
+    public void recoverUntil(CacheEntry until) throws Exception {
+        recoveringFromDataStore = true;
+        this.recoverPosition = store.getLastAddedEntry();
+        this.lastRecover = until;
+        this.enqueueToStore = true;
+    }
+
+    public boolean matches(Message message, MessageEvaluationContext ec) throws IOException {
+        return clientSubscription.matches(message, ec);
+    }
+
+}



Mime
View raw message