activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rajdav...@apache.org
Subject svn commit: r901188 [1/2] - in /activemq/trunk/activemq-core/src: main/java/org/apache/activemq/broker/ main/java/org/apache/activemq/broker/region/ main/java/org/apache/activemq/broker/region/cursors/ main/java/org/apache/activemq/store/kahadb/plist/ ...
Date Wed, 20 Jan 2010 13:27:06 GMT
Author: rajdavies
Date: Wed Jan 20 13:27:05 2010
New Revision: 901188

URL: http://svn.apache.org/viewvc?rev=901188&view=rev
Log:
Fix for https://issues.apache.org/activemq/browse/AMQ-2575

Added:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/plist/
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/plist/EntryLocation.java   (with props)
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/plist/PList.java   (with props)
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/plist/PListEntry.java   (with props)
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/plist/PListStore.java   (with props)
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/kahadb/plist/
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/kahadb/plist/PListTest.java   (with props)
Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/Broker.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerFilter.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/EmptyBroker.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ErrorBroker.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/MutableBrokerFilter.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/usage/SystemUsage.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/usage/TempUsage.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/TempStorageBlockedBrokerTest.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/KahaDBQueueTest.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/PerfConsumer.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/SimpleNonPersistentQueueTest.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/SimpleQueueTest.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/Broker.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/Broker.java?rev=901188&r1=901187&r2=901188&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/Broker.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/Broker.java Wed Jan 20 13:27:05 2010
@@ -32,7 +32,7 @@
 import org.apache.activemq.command.ProducerInfo;
 import org.apache.activemq.command.SessionInfo;
 import org.apache.activemq.command.TransactionId;
-import org.apache.activemq.kaha.Store;
+import org.apache.activemq.store.kahadb.plist.PListStore;
 import org.apache.activemq.usage.Usage;
 
 /**
@@ -266,7 +266,7 @@
     /**
      * @return the temp data store
      */
-    Store getTempDataStore();
+    PListStore getTempDataStore();
 
     /**
      * @return the URI that can be used to connect to the local Broker

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerFilter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerFilter.java?rev=901188&r1=901187&r2=901188&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerFilter.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerFilter.java Wed Jan 20 13:27:05 2010
@@ -38,7 +38,7 @@
 import org.apache.activemq.command.Response;
 import org.apache.activemq.command.SessionInfo;
 import org.apache.activemq.command.TransactionId;
-import org.apache.activemq.kaha.Store;
+import org.apache.activemq.store.kahadb.plist.PListStore;
 import org.apache.activemq.usage.Usage;
 
 /**
@@ -230,7 +230,7 @@
         next.setAdminConnectionContext(adminConnectionContext);
     }
 
-    public Store getTempDataStore() {
+    public PListStore getTempDataStore() {
         return next.getTempDataStore();
     }
 

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java?rev=901188&r1=901187&r2=901188&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java Wed Jan 20 13:27:05 2010
@@ -64,8 +64,6 @@
 import org.apache.activemq.broker.scheduler.SchedulerBroker;
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.BrokerId;
-import org.apache.activemq.kaha.Store;
-import org.apache.activemq.kaha.StoreFactory;
 import org.apache.activemq.network.ConnectionFilter;
 import org.apache.activemq.network.DiscoveryNetworkConnector;
 import org.apache.activemq.network.NetworkConnector;
@@ -77,6 +75,7 @@
 import org.apache.activemq.store.PersistenceAdapter;
 import org.apache.activemq.store.PersistenceAdapterFactory;
 import org.apache.activemq.store.amq.AMQPersistenceAdapterFactory;
+import org.apache.activemq.store.kahadb.plist.PListStore;
 import org.apache.activemq.store.memory.MemoryPersistenceAdapter;
 import org.apache.activemq.thread.TaskRunnerFactory;
 import org.apache.activemq.transport.TransportFactory;
@@ -160,7 +159,7 @@
     private BrokerId brokerId;
     private DestinationInterceptor[] destinationInterceptors;
     private ActiveMQDestination[] destinations;
-    private Store tempDataStore;
+    private PListStore tempDataStore;
     private int persistenceThreadPriority = Thread.MAX_PRIORITY;
     private boolean useLocalHostBrokerName;
     private final CountDownLatch stoppedLatch = new CountDownLatch(1);
@@ -538,7 +537,7 @@
             stopper.stop(broker);
         }
         if (tempDataStore != null) {
-            tempDataStore.close();
+            tempDataStore.stop();
         }
         stopper.stop(persistenceAdapter);
         if (isUseJmx()) {
@@ -1330,7 +1329,7 @@
     /**
      * @return the tempDataStore
      */
-    public synchronized Store getTempDataStore() {
+    public synchronized PListStore getTempDataStore() {
         if (tempDataStore == null) {
             if (!isPersistent()) {
                 return null;
@@ -1355,8 +1354,10 @@
                     String str = result ? "Successfully deleted" : "Failed to delete";
                     LOG.info(str + " temporary storage");
                 }
-                tempDataStore = StoreFactory.open(getTmpDataDirectory(), "rw");
-            } catch (IOException e) {
+                this.tempDataStore = new PListStore();
+                this.tempDataStore.setDirectory(getTmpDataDirectory());
+                this.tempDataStore.start();
+            } catch (Exception e) {
                 throw new RuntimeException(e);
             }
         }
@@ -1367,7 +1368,7 @@
      * @param tempDataStore
      *            the tempDataStore to set
      */
-    public void setTempDataStore(Store tempDataStore) {
+    public void setTempDataStore(PListStore tempDataStore) {
         this.tempDataStore = tempDataStore;
     }
 

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/EmptyBroker.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/EmptyBroker.java?rev=901188&r1=901187&r2=901188&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/EmptyBroker.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/EmptyBroker.java Wed Jan 20 13:27:05 2010
@@ -39,7 +39,7 @@
 import org.apache.activemq.command.Response;
 import org.apache.activemq.command.SessionInfo;
 import org.apache.activemq.command.TransactionId;
-import org.apache.activemq.kaha.Store;
+import org.apache.activemq.store.kahadb.plist.PListStore;
 import org.apache.activemq.usage.Usage;
 
 /**
@@ -225,7 +225,7 @@
         return null;
     }
 
-    public Store getTempDataStore() {
+    public PListStore getTempDataStore() {
         return null;
     }
 

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ErrorBroker.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ErrorBroker.java?rev=901188&r1=901187&r2=901188&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ErrorBroker.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ErrorBroker.java Wed Jan 20 13:27:05 2010
@@ -39,7 +39,7 @@
 import org.apache.activemq.command.Response;
 import org.apache.activemq.command.SessionInfo;
 import org.apache.activemq.command.TransactionId;
-import org.apache.activemq.kaha.Store;
+import org.apache.activemq.store.kahadb.plist.PListStore;
 import org.apache.activemq.usage.Usage;
 
 /**
@@ -233,7 +233,7 @@
         throw new BrokerStoppedException(this.message);
     }
 
-    public Store getTempDataStore() {
+    public PListStore getTempDataStore() {
         throw new BrokerStoppedException(this.message);
     }
 

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/MutableBrokerFilter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/MutableBrokerFilter.java?rev=901188&r1=901187&r2=901188&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/MutableBrokerFilter.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/MutableBrokerFilter.java Wed Jan 20 13:27:05 2010
@@ -20,7 +20,6 @@
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicReference;
-
 import org.apache.activemq.broker.region.Destination;
 import org.apache.activemq.broker.region.MessageReference;
 import org.apache.activemq.broker.region.Subscription;
@@ -40,7 +39,7 @@
 import org.apache.activemq.command.Response;
 import org.apache.activemq.command.SessionInfo;
 import org.apache.activemq.command.TransactionId;
-import org.apache.activemq.kaha.Store;
+import org.apache.activemq.store.kahadb.plist.PListStore;
 import org.apache.activemq.usage.Usage;
 
 /**
@@ -243,7 +242,7 @@
         return getNext().messagePull(context, pull);
     }
 
-    public Store getTempDataStore() {
+    public PListStore getTempDataStore() {
         return getNext().getTempDataStore();
     }
 

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java?rev=901188&r1=901187&r2=901188&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java Wed Jan 20 13:27:05 2010
@@ -25,7 +25,6 @@
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CopyOnWriteArrayList;
-
 import javax.jms.InvalidClientIDException;
 import javax.jms.JMSException;
 import org.apache.activemq.broker.Broker;
@@ -53,8 +52,8 @@
 import org.apache.activemq.command.RemoveSubscriptionInfo;
 import org.apache.activemq.command.Response;
 import org.apache.activemq.command.TransactionId;
-import org.apache.activemq.kaha.Store;
 import org.apache.activemq.state.ConnectionState;
+import org.apache.activemq.store.kahadb.plist.PListStore;
 import org.apache.activemq.thread.TaskRunnerFactory;
 import org.apache.activemq.usage.SystemUsage;
 import org.apache.activemq.util.BrokerSupport;
@@ -112,12 +111,14 @@
         tempTopicRegion = createTempTopicRegion(memoryManager, taskRunnerFactory, destinationFactory);
     }
 
+    @Override
     public Map<ActiveMQDestination, Destination> getDestinationMap() {
         Map<ActiveMQDestination, Destination> answer = getQueueRegion().getDestinationMap();
         answer.putAll(getTopicRegion().getDestinationMap());
         return answer;
     }
 
+    @Override
     public Set <Destination> getDestinations(ActiveMQDestination destination) {
         switch (destination.getDestinationType()) {
         case ActiveMQDestination.QUEUE_TYPE:
@@ -133,6 +134,7 @@
         }
     }
 
+    @Override
     public Broker getAdaptor(Class type) {
         if (type.isInstance(this)) {
             return this;
@@ -172,6 +174,7 @@
         return new QueueRegion(this, destinationStatistics, memoryManager, taskRunnerFactory, destinationFactory);
     }
 
+    @Override
     public void start() throws Exception {
         ((TopicRegion)topicRegion).setKeepDurableSubsActive(keepDurableSubsActive);
         started = true;
@@ -181,6 +184,7 @@
         tempTopicRegion.start();
     }
 
+    @Override
     public void stop() throws Exception {
         started = false;
         ServiceStopper ss = new ServiceStopper();
@@ -197,6 +201,7 @@
         return brokerService != null ? brokerService.getDestinationPolicy() : null;
     }
 
+    @Override
     public void addConnection(ConnectionContext context, ConnectionInfo info) throws Exception {
         String clientId = info.getClientId();
         if (clientId == null) {
@@ -224,6 +229,7 @@
         connections.add(context.getConnection());
     }
 
+    @Override
     public void removeConnection(ConnectionContext context, ConnectionInfo info, Throwable error) throws Exception {
         String clientId = info.getClientId();
         if (clientId == null) {
@@ -247,6 +253,7 @@
         return connectionId == connectionId2 || (connectionId != null && connectionId.equals(connectionId2));
     }
 
+    @Override
     public Connection[] getClients() throws Exception {
         ArrayList<Connection> l = new ArrayList<Connection>(connections);
         Connection rc[] = new Connection[l.size()];
@@ -254,6 +261,7 @@
         return rc;
     }
 
+    @Override
     public Destination addDestination(ConnectionContext context, ActiveMQDestination destination) throws Exception {
 
         Destination answer;
@@ -285,6 +293,7 @@
 
     }
 
+    @Override
     public void removeDestination(ConnectionContext context, ActiveMQDestination destination, long timeout) throws Exception {
 
         if (destinations.containsKey(destination)) {
@@ -309,16 +318,19 @@
 
     }
 
+    @Override
     public void addDestinationInfo(ConnectionContext context, DestinationInfo info) throws Exception {
         addDestination(context, info.getDestination());
 
     }
 
+    @Override
     public void removeDestinationInfo(ConnectionContext context, DestinationInfo info) throws Exception {
         removeDestination(context, info.getDestination(), info.getTimeout());
 
     }
 
+    @Override
     public ActiveMQDestination[] getDestinations() throws Exception {
         ArrayList<ActiveMQDestination> l;
 
@@ -329,6 +341,7 @@
         return rc;
     }
 
+    @Override
     public void addProducer(ConnectionContext context, ProducerInfo info)
             throws Exception {
         ActiveMQDestination destination = info.getDestination();
@@ -353,6 +366,7 @@
         }
     }
 
+    @Override
     public void removeProducer(ConnectionContext context, ProducerInfo info) throws Exception {
         ActiveMQDestination destination = info.getDestination();
         if (destination != null) {
@@ -373,6 +387,7 @@
         }
     }
 
+    @Override
     public Subscription addConsumer(ConnectionContext context, ConsumerInfo info) throws Exception {
         ActiveMQDestination destination = info.getDestination();
         switch (destination.getDestinationType()) {
@@ -393,6 +408,7 @@
         }
     }
 
+    @Override
     public void removeConsumer(ConnectionContext context, ConsumerInfo info) throws Exception {
         ActiveMQDestination destination = info.getDestination();
         switch (destination.getDestinationType()) {
@@ -413,10 +429,12 @@
         }
     }
 
+    @Override
     public void removeSubscription(ConnectionContext context, RemoveSubscriptionInfo info) throws Exception {
         topicRegion.removeSubscription(context, info);
     }
 
+    @Override
     public void send(ProducerBrokerExchange producerExchange, Message message) throws Exception {
         message.setBrokerInTime(System.currentTimeMillis());
         if (producerExchange.isMutable() || producerExchange.getRegion() == null) {
@@ -445,6 +463,7 @@
         producerExchange.getRegion().send(producerExchange, message);
     }
 
+    @Override
     public void acknowledge(ConsumerBrokerExchange consumerExchange, MessageAck ack) throws Exception {
         if (consumerExchange.isWildcard() || consumerExchange.getRegion() == null) {
             ActiveMQDestination destination = ack.getDestination();
@@ -470,6 +489,7 @@
         consumerExchange.getRegion().acknowledge(consumerExchange, ack);
     }
 
+    @Override
     public Response messagePull(ConnectionContext context, MessagePull pull) throws Exception {
         ActiveMQDestination destination = pull.getDestination();
         switch (destination.getDestinationType()) {
@@ -489,35 +509,43 @@
         }
     }
 
+    @Override
     public TransactionId[] getPreparedTransactions(ConnectionContext context) throws Exception {
         throw new IllegalAccessException("Transaction operation not implemented by this broker.");
     }
 
+    @Override
     public void beginTransaction(ConnectionContext context, TransactionId xid) throws Exception {
         throw new IllegalAccessException("Transaction operation not implemented by this broker.");
     }
 
+    @Override
     public int prepareTransaction(ConnectionContext context, TransactionId xid) throws Exception {
         throw new IllegalAccessException("Transaction operation not implemented by this broker.");
     }
 
+    @Override
     public void rollbackTransaction(ConnectionContext context, TransactionId xid) throws Exception {
         throw new IllegalAccessException("Transaction operation not implemented by this broker.");
     }
 
+    @Override
     public void commitTransaction(ConnectionContext context, TransactionId xid, boolean onePhase) throws Exception {
         throw new IllegalAccessException("Transaction operation not implemented by this broker.");
     }
 
+    @Override
     public void forgetTransaction(ConnectionContext context, TransactionId transactionId) throws Exception {
         throw new IllegalAccessException("Transaction operation not implemented by this broker.");
     }
 
+    @Override
     public void gc() {
         queueRegion.gc();
         topicRegion.gc();
     }
 
+    @Override
     public BrokerId getBrokerId() {
         if (brokerId == null) {
             brokerId = new BrokerId(BROKER_ID_GENERATOR.generateId());
@@ -529,6 +557,7 @@
         this.brokerId = brokerId;
     }
 
+    @Override
     public String getBrokerName() {
         if (brokerName == null) {
             try {
@@ -552,22 +581,26 @@
         return new JMSException("Unknown destination type: " + destination.getDestinationType());
     }
 
+    @Override
     public synchronized void addBroker(Connection connection, BrokerInfo info) {
         brokerInfos.add(info);
     }
 
+    @Override
     public synchronized void removeBroker(Connection connection, BrokerInfo info) {
         if (info != null) {
             brokerInfos.remove(info);
         }
     }
 
+    @Override
     public synchronized BrokerInfo[] getPeerBrokerInfos() {
         BrokerInfo[] result = new BrokerInfo[brokerInfos.size()];
         result = brokerInfos.toArray(result);
         return result;
     }
 
+    @Override
     public void preProcessDispatch(MessageDispatch messageDispatch) {
         Message message = messageDispatch.getMessage();
         if (message != null) {
@@ -580,9 +613,11 @@
         }
     }
 
+    @Override
     public void postProcessDispatch(MessageDispatch messageDispatch) {
     }
 
+    @Override
     public void processDispatchNotification(MessageDispatchNotification messageDispatchNotification) throws Exception {
         ActiveMQDestination destination = messageDispatchNotification.getDestination();
         switch (destination.getDestinationType()) {
@@ -607,10 +642,12 @@
         return brokerService.isSlave();
     }
 
+    @Override
     public boolean isStopped() {
         return !started;
     }
 
+    @Override
     public Set<ActiveMQDestination> getDurableDestinations() {
         return destinationFactory.getDestinations();
     }
@@ -634,10 +671,12 @@
         return destinationInterceptor;
     }
 
+    @Override
     public ConnectionContext getAdminConnectionContext() {
         return adminConnectionContext;
     }
 
+    @Override
     public void setAdminConnectionContext(ConnectionContext adminConnectionContext) {
         this.adminConnectionContext = adminConnectionContext;
     }
@@ -646,21 +685,26 @@
         return connectionStates;
     }
 
-    public Store getTempDataStore() {
+    @Override
+    public PListStore getTempDataStore() {
         return brokerService.getTempDataStore();
     }
 
+    @Override
     public URI getVmConnectorURI() {
         return brokerService.getVmConnectorURI();
     }
 
+    @Override
     public void brokerServiceStarted() {
     }
 
+    @Override
     public BrokerService getBrokerService() {
         return brokerService;
     }
 
+    @Override
     public boolean isExpired(MessageReference messageReference) {
         boolean expired = false;
         if (messageReference.isExpired()) {
@@ -688,6 +732,7 @@
     }
 
     
+    @Override
     public void messageExpired(ConnectionContext context, MessageReference node) {
         if (LOG.isDebugEnabled()) {
             LOG.debug("Message expired " + node);
@@ -695,6 +740,7 @@
         getRoot().sendToDeadLetterQueue(context, node);
     }
     
+    @Override
     public void sendToDeadLetterQueue(ConnectionContext context,
 	        MessageReference node){
 		try{
@@ -739,6 +785,7 @@
 		}
 	}
 
+    @Override
     public Broker getRoot() {
         try {
             return getBrokerService().getBroker();
@@ -751,6 +798,7 @@
     /**
      * @return the broker sequence id
      */
+    @Override
     public long getBrokerSequenceId() {
         synchronized(sequenceGenerator) {
             return sequenceGenerator.getNextSequenceId();

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java?rev=901188&r1=901187&r2=901188&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java Wed Jan 20 13:27:05 2010
@@ -21,7 +21,6 @@
 import java.util.LinkedList;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
-
 import org.apache.activemq.broker.Broker;
 import org.apache.activemq.broker.ConnectionContext;
 import org.apache.activemq.broker.region.Destination;
@@ -29,15 +28,17 @@
 import org.apache.activemq.broker.region.QueueMessageReference;
 import org.apache.activemq.command.Message;
 import org.apache.activemq.filter.NonCachedMessageEvaluationContext;
-import org.apache.activemq.kaha.CommandMarshaller;
-import org.apache.activemq.kaha.ListContainer;
-import org.apache.activemq.kaha.Store;
 import org.apache.activemq.openwire.OpenWireFormat;
+import org.apache.activemq.store.kahadb.plist.PList;
+import org.apache.activemq.store.kahadb.plist.PListEntry;
+import org.apache.activemq.store.kahadb.plist.PListStore;
 import org.apache.activemq.usage.SystemUsage;
 import org.apache.activemq.usage.Usage;
 import org.apache.activemq.usage.UsageListener;
+import org.apache.activemq.wireformat.WireFormat;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.kahadb.util.ByteSequence;
 
 /**
  * persist pending messages pending message (messages awaiting dispatch to a
@@ -46,31 +47,34 @@
  * @version $Revision$
  */
 public class FilePendingMessageCursor extends AbstractPendingMessageCursor implements UsageListener {
-    private static final Log LOG = LogFactory.getLog(FilePendingMessageCursor.class);
+    static final Log LOG = LogFactory.getLog(FilePendingMessageCursor.class);
     private static final AtomicLong NAME_COUNT = new AtomicLong();
     protected Broker broker;
-    private Store store;
-    private String name;
+    private final PListStore store;
+    private final String name;
     private LinkedList<MessageReference> memoryList = new LinkedList<MessageReference>();
-    private ListContainer<MessageReference> diskList;
+    private PList diskList;
     private Iterator<MessageReference> iter;
     private Destination regionDestination;
     private boolean iterating;
     private boolean flushRequired;
-    private AtomicBoolean started = new AtomicBoolean();
+    private final AtomicBoolean started = new AtomicBoolean();
+    private final WireFormat wireFormat = new OpenWireFormat();
     /**
+     * @param broker
      * @param name
      * @param store
      */
-    public FilePendingMessageCursor(Broker broker,String name) {
-        this.useCache=false;
+    public FilePendingMessageCursor(Broker broker, String name) {
+        this.useCache = false;
         this.broker = broker;
-        //the store can be null if the BrokerService has persistence 
-        //turned off
-        this.store= broker.getTempDataStore();
+        // the store can be null if the BrokerService has persistence
+        // turned off
+        this.store = broker.getTempDataStore();
         this.name = NAME_COUNT.incrementAndGet() + "_" + name;
     }
 
+    @Override
     public void start() throws Exception {
         if (started.compareAndSet(false, true)) {
             super.start();
@@ -80,6 +84,7 @@
         }
     }
 
+    @Override
     public void stop() throws Exception {
         if (started.compareAndSet(true, false)) {
             super.stop();
@@ -92,13 +97,14 @@
     /**
      * @return true if there are no pending messages
      */
+    @Override
     public synchronized boolean isEmpty() {
-        if(memoryList.isEmpty() && isDiskListEmpty()){
+        if (memoryList.isEmpty() && isDiskListEmpty()) {
             return true;
         }
         for (Iterator<MessageReference> iterator = memoryList.iterator(); iterator.hasNext();) {
             MessageReference node = iterator.next();
-            if (node== QueueMessageReference.NULL_MESSAGE){
+            if (node == QueueMessageReference.NULL_MESSAGE) {
                 continue;
             }
             if (!node.isDropped()) {
@@ -109,18 +115,22 @@
         }
         return isDiskListEmpty();
     }
-    
-    
 
     /**
      * reset the cursor
      */
+    @Override
     public synchronized void reset() {
         iterating = true;
         last = null;
-        iter = isDiskListEmpty() ? memoryList.iterator() : getDiskList().listIterator();
+        if (isDiskListEmpty()) {
+            this.iter = this.memoryList.iterator();
+        } else {
+            this.iter = new DiskIterator();
+        }
     }
 
+    @Override
     public synchronized void release() {
         iterating = false;
         if (flushRequired) {
@@ -129,10 +139,11 @@
         }
     }
 
+    @Override
     public synchronized void destroy() throws Exception {
         stop();
         for (Iterator<MessageReference> i = memoryList.iterator(); i.hasNext();) {
-            Message node = (Message)i.next();
+            Message node = (Message) i.next();
             node.decrementReferenceCount();
         }
         memoryList.clear();
@@ -141,26 +152,22 @@
 
     private void destroyDiskList() throws Exception {
         if (!isDiskListEmpty()) {
-            Iterator<MessageReference> iterator = diskList.iterator();
-            while (iterator.hasNext()) {
-                iterator.next();
-                iterator.remove();
-            }
-            diskList.clear();
-        }   
-	    store.deleteListContainer(name, "TopicSubscription");
+            store.removePList(name);
+        }
     }
 
+    @Override
     public synchronized LinkedList<MessageReference> pageInList(int maxItems) {
         LinkedList<MessageReference> result = new LinkedList<MessageReference>();
         int count = 0;
         for (Iterator<MessageReference> i = memoryList.iterator(); i.hasNext() && count < maxItems;) {
-            result.add(i.next());
+            MessageReference ref = i.next();
+            result.add(ref);
             count++;
         }
         if (count < maxItems && !isDiskListEmpty()) {
-            for (Iterator<MessageReference> i = getDiskList().iterator(); i.hasNext() && count < maxItems;) {
-                Message message = (Message)i.next();
+            for (Iterator<MessageReference> i = new DiskIterator(); i.hasNext() && count < maxItems;) {
+                Message message = (Message) i.next();
                 message.setRegionDestination(regionDestination);
                 message.setMemoryUsage(this.getSystemUsage().getMemoryUsage());
                 message.incrementReferenceCount();
@@ -176,12 +183,13 @@
      * 
      * @param node
      */
+    @Override
     public synchronized void addMessageLast(MessageReference node) {
         if (!node.isExpired()) {
             try {
                 regionDestination = node.getMessage().getRegionDestination();
                 if (isDiskListEmpty()) {
-                    if (hasSpace() || this.store==null) {
+                    if (hasSpace() || this.store == null) {
                         memoryList.add(node);
                         node.incrementReferenceCount();
                         return;
@@ -200,11 +208,11 @@
                     }
                 }
                 systemUsage.getTempUsage().waitForSpace();
-                getDiskList().add(node);
+                ByteSequence bs = getByteSequence(node.getMessage());
+                getDiskList().addLast(node.getMessageId().toString(), bs);
 
             } catch (Exception e) {
-                LOG.error("Caught an Exception adding a message: " + node
-                        + " first to FilePendingMessageCursor ", e);
+                LOG.error("Caught an Exception adding a message: " + node + " first to FilePendingMessageCursor ", e);
                 throw new RuntimeException(e);
             }
         } else {
@@ -217,6 +225,7 @@
      * 
      * @param node
      */
+    @Override
     public synchronized void addMessageFirst(MessageReference node) {
         if (!node.isExpired()) {
             try {
@@ -242,11 +251,11 @@
                 }
                 systemUsage.getTempUsage().waitForSpace();
                 node.decrementReferenceCount();
-                getDiskList().addFirst(node);
+                ByteSequence bs = getByteSequence(node.getMessage());
+                getDiskList().addFirst(node.getMessageId().toString(), bs);
 
             } catch (Exception e) {
-                LOG.error("Caught an Exception adding a message: " + node
-                        + " first to FilePendingMessageCursor ", e);
+                LOG.error("Caught an Exception adding a message: " + node + " first to FilePendingMessageCursor ", e);
                 throw new RuntimeException(e);
             }
         } else {
@@ -257,6 +266,7 @@
     /**
      * @return true if there pending messages to dispatch
      */
+    @Override
     public synchronized boolean hasNext() {
         return iter.hasNext();
     }
@@ -264,8 +274,9 @@
     /**
      * @return the next pending message
      */
+    @Override
     public synchronized MessageReference next() {
-        Message message = (Message)iter.next();
+        Message message = (Message) iter.next();
         last = message;
         if (!isDiskListEmpty()) {
             // got from disk
@@ -279,10 +290,11 @@
     /**
      * remove the message at the cursor position
      */
+    @Override
     public synchronized void remove() {
         iter.remove();
         if (last != null) {
-        	last.decrementReferenceCount();
+            last.decrementReferenceCount();
         }
     }
 
@@ -290,18 +302,24 @@
      * @param node
      * @see org.apache.activemq.broker.region.cursors.AbstractPendingMessageCursor#remove(org.apache.activemq.broker.region.MessageReference)
      */
+    @Override
     public synchronized void remove(MessageReference node) {
         if (memoryList.remove(node)) {
-        	node.decrementReferenceCount();
+            node.decrementReferenceCount();
         }
         if (!isDiskListEmpty()) {
-            getDiskList().remove(node);
+            try {
+                getDiskList().remove(node.getMessageId().toString());
+            } catch (IOException e) {
+                throw new RuntimeException(e);
+            }
         }
     }
 
     /**
      * @return the number of pending messages
      */
+    @Override
     public synchronized int size() {
         return memoryList.size() + (isDiskListEmpty() ? 0 : getDiskList().size());
     }
@@ -309,31 +327,37 @@
     /**
      * clear all pending messages
      */
+    @Override
     public synchronized void clear() {
         memoryList.clear();
         if (!isDiskListEmpty()) {
-            getDiskList().clear();
+            try {
+                getDiskList().destroy();
+            } catch (IOException e) {
+                throw new RuntimeException(e);
+            }
         }
-        last=null;
+        last = null;
     }
 
-	public synchronized boolean isFull() {
+    @Override
+    public synchronized boolean isFull() {
 
-		return super.isFull()
-				|| (systemUsage != null && systemUsage.getTempUsage().isFull());
+        return super.isFull() || (systemUsage != null && systemUsage.getTempUsage().isFull());
 
-	}
+    }
 
+    @Override
     public boolean hasMessagesBufferedToDeliver() {
         return !isEmpty();
     }
 
+    @Override
     public void setSystemUsage(SystemUsage usageManager) {
         super.setSystemUsage(usageManager);
     }
 
-    public void onUsageChanged(Usage usage, int oldPercentUsage,
-            int newPercentUsage) {
+    public void onUsageChanged(Usage usage, int oldPercentUsage, int newPercentUsage) {
         if (newPercentUsage >= getMemoryUsageHighWaterMark()) {
             synchronized (this) {
                 flushRequired = true;
@@ -347,7 +371,8 @@
             }
         }
     }
-    
+
+    @Override
     public boolean isTransient() {
         return true;
     }
@@ -355,7 +380,7 @@
     protected boolean isSpaceInMemoryList() {
         return hasSpace() && isDiskListEmpty();
     }
-    
+
     protected synchronized void expireOldMessages() {
         if (!memoryList.isEmpty()) {
             LinkedList<MessageReference> tmpList = new LinkedList<MessageReference>(this.memoryList);
@@ -364,21 +389,29 @@
                 MessageReference node = tmpList.removeFirst();
                 if (node.isExpired()) {
                     discard(node);
-                }else {
+                } else {
                     memoryList.add(node);
-                }               
+                }
             }
         }
 
     }
 
     protected synchronized void flushToDisk() {
-       
+
         if (!memoryList.isEmpty()) {
             while (!memoryList.isEmpty()) {
                 MessageReference node = memoryList.removeFirst();
                 node.decrementReferenceCount();
-                getDiskList().addLast(node);
+                ByteSequence bs;
+                try {
+                    bs = getByteSequence(node.getMessage());
+                    getDiskList().addLast(node.getMessageId().toString(), bs);
+                } catch (IOException e) {
+                    LOG.error("Failed to write to disk list", e);
+                    throw new RuntimeException(e);
+                }
+
             }
             memoryList.clear();
         }
@@ -388,19 +421,18 @@
         return diskList == null || diskList.isEmpty();
     }
 
-    protected ListContainer<MessageReference> getDiskList() {
+    protected PList getDiskList() {
         if (diskList == null) {
             try {
-                diskList = store.getListContainer(name, "TopicSubscription", true);
-                diskList.setMarshaller(new CommandMarshaller(new OpenWireFormat()));
-            } catch (IOException e) {
+                diskList = store.getPList(name);
+            } catch (Exception e) {
                 LOG.error("Caught an IO Exception getting the DiskList " + name, e);
                 throw new RuntimeException(e);
             }
         }
         return diskList;
     }
-    
+
     protected void discard(MessageReference message) {
         message.decrementReferenceCount();
         if (LOG.isDebugEnabled()) {
@@ -408,4 +440,68 @@
         }
         broker.getRoot().sendToDeadLetterQueue(new ConnectionContext(new NonCachedMessageEvaluationContext()), message);
     }
+
+    protected ByteSequence getByteSequence(Message message) throws IOException {
+        org.apache.activemq.util.ByteSequence packet = wireFormat.marshal(message);
+        return new ByteSequence(packet.data, packet.offset, packet.length);
+    }
+
+    protected Message getMessage(ByteSequence bs) throws IOException {
+        org.apache.activemq.util.ByteSequence packet = new org.apache.activemq.util.ByteSequence(bs.getData(), bs
+                .getOffset(), bs.getLength());
+        return (Message) this.wireFormat.unmarshal(packet);
+
+    }
+
+    final class DiskIterator implements Iterator<MessageReference> {
+        private PListEntry next = null;
+        private PListEntry current = null;
+        PList list;
+
+        DiskIterator() {
+            try {
+                this.list = getDiskList();
+                synchronized (this.list) {
+                    this.current = this.list.getFirst();
+                    this.next = this.current;
+                }
+            } catch (Exception e) {
+                throw new RuntimeException(e);
+            }
+        }
+
+        public boolean hasNext() {
+            return this.next != null;
+        }
+
+        public MessageReference next() {
+            this.current = next;
+            try {
+                ByteSequence bs = this.current.getByteSequence();
+                synchronized (this.list) {
+                    this.current = this.list.refresh(this.current);
+                    this.next = this.list.getNext(this.current);
+                }
+                return getMessage(bs);
+            } catch (IOException e) {
+                LOG.error("I/O error", e);
+                throw new RuntimeException(e);
+            }
+        }
+
+        public void remove() {
+            try {
+                synchronized (this.list) {
+                    this.current = this.list.refresh(this.current);
+                    this.list.remove(this.current);
+                }
+
+            } catch (IOException e) {
+                LOG.error("I/O error", e);
+                throw new RuntimeException(e);
+            }
+
+        }
+
+    }
 }

Added: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/plist/EntryLocation.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/plist/EntryLocation.java?rev=901188&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/plist/EntryLocation.java (added)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/plist/EntryLocation.java Wed Jan 20 13:27:05 2010
@@ -0,0 +1,162 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.store.kahadb.plist;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import org.apache.kahadb.journal.Location;
+import org.apache.kahadb.page.Page;
+import org.apache.kahadb.util.VariableMarshaller;
+
+class EntryLocation {
+    static final long NOT_SET = -1;
+    private String id;
+    private Page<EntryLocation> page;
+    private long next;
+    private long prev;
+    private Location location;
+
+    static class EntryLocationMarshaller extends VariableMarshaller<EntryLocation> {
+        static final EntryLocationMarshaller INSTANCE = new EntryLocationMarshaller();
+        public EntryLocation readPayload(DataInput dataIn) throws IOException {
+            EntryLocation result = new EntryLocation();
+            result.readExternal(dataIn);
+            return result;
+        }
+
+        public void writePayload(EntryLocation value, DataOutput dataOut) throws IOException {
+            value.writeExternal(dataOut);
+        }
+    }
+    EntryLocation(Location location) {
+        this.location = location;
+
+    }
+
+    EntryLocation() {
+    }
+    
+    EntryLocation copy() {
+        EntryLocation result = new EntryLocation();
+        result.id=this.id;
+        result.location=this.location;
+        result.next=this.next;
+        result.prev=this.prev;
+        result.page=this.page;
+        return result;
+    }
+
+    void reset() {
+        this.id = "";
+        this.next = NOT_SET;
+        this.prev = NOT_SET;
+    }
+
+    public void readExternal(DataInput in) throws IOException {
+        this.id = in.readUTF();
+        this.prev = in.readLong();
+        this.next = in.readLong();
+        if (this.location == null) {
+            this.location = new Location();
+        }
+        this.location.readExternal(in);
+    }
+
+    public void writeExternal(DataOutput out) throws IOException {
+        out.writeUTF(this.id);
+        out.writeLong(this.prev);
+        out.writeLong(this.next);
+        if (this.location == null) {
+            this.location = new Location();
+        }
+        this.location.writeExternal(out);
+    }
+
+    /**
+     * @return the jobId
+     */
+    String getId() {
+        return this.id;
+    }
+
+    /**
+     * @param id
+     *            the id to set
+     */
+    void setId(String id) {
+        this.id = id;
+    }
+
+    Location getLocation() {
+        return this.location;
+    }
+
+    /**
+     * @param location
+     *            the location to set
+     */
+    void setLocation(Location location) {
+        this.location = location;
+    }
+
+    /**
+     * @return the next
+     */
+    long getNext() {
+        return this.next;
+    }
+
+    /**
+     * @param next
+     *            the next to set
+     */
+    void setNext(long next) {
+        this.next = next;
+    }
+
+    /**
+     * @return the prev
+     */
+    long getPrev() {
+        return this.prev;
+    }
+
+    /**
+     * @param prev
+     *            the prev to set
+     */
+    void setPrev(long prev) {
+        this.prev = prev;
+    }
+
+    /**
+     * @return the page
+     */
+    Page<EntryLocation> getPage() {
+        return this.page;
+    }
+
+    /**
+     * @param page
+     *            the page to set
+     */
+    void setPage(Page<EntryLocation> page) {
+        this.page = page;
+    }
+
+}

Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/plist/EntryLocation.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/plist/EntryLocation.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Added: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/plist/PList.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/plist/PList.java?rev=901188&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/plist/PList.java (added)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/plist/PList.java Wed Jan 20 13:27:05 2010
@@ -0,0 +1,468 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.store.kahadb.plist;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+import org.apache.activemq.store.kahadb.plist.EntryLocation.EntryLocationMarshaller;
+import org.apache.kahadb.journal.Location;
+import org.apache.kahadb.page.Page;
+import org.apache.kahadb.page.Transaction;
+import org.apache.kahadb.util.ByteSequence;
+
+public class PList {
+    final PListStore store;
+    private String name;
+    private long rootId = EntryLocation.NOT_SET;
+    private long lastId = EntryLocation.NOT_SET;
+    private final AtomicBoolean loaded = new AtomicBoolean();
+    private int size = 0;
+
+    PList(PListStore store) {
+
+        this.store = store;
+    }
+
+    public void setName(String name) {
+        this.name = name;
+    }
+
+    /*
+     * (non-Javadoc)
+     * @see org.apache.activemq.beanstalk.JobScheduler#getName()
+     */
+    public String getName() {
+        return this.name;
+    }
+
+    public synchronized int size() {
+        return this.size;
+    }
+
+    public synchronized boolean isEmpty() {
+        return size == 0;
+    }
+
+    /**
+     * @return the rootId
+     */
+    public long getRootId() {
+        return this.rootId;
+    }
+
+    /**
+     * @param rootId
+     *            the rootId to set
+     */
+    public void setRootId(long rootId) {
+        this.rootId = rootId;
+    }
+
+    /**
+     * @return the lastId
+     */
+    public long getLastId() {
+        return this.lastId;
+    }
+
+    /**
+     * @param lastId
+     *            the lastId to set
+     */
+    public void setLastId(long lastId) {
+        this.lastId = lastId;
+    }
+
+    /**
+     * @return the loaded
+     */
+    public boolean isLoaded() {
+        return this.loaded.get();
+    }
+
+    void read(DataInput in) throws IOException {
+        this.rootId = in.readLong();
+        this.name = in.readUTF();
+    }
+
+    public void write(DataOutput out) throws IOException {
+        out.writeLong(this.rootId);
+        out.writeUTF(name);
+    }
+
+    public synchronized void destroy() throws IOException {
+        this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() {
+            public void execute(Transaction tx) throws IOException {
+                destroy(tx);
+            }
+        });
+    }
+
+    void destroy(Transaction tx) throws IOException {
+        // start from the first
+        EntryLocation entry = loadEntry(tx, getRoot(tx).getNext());
+        while (entry != null) {
+            EntryLocation toRemove = entry.copy();
+            entry = loadEntry(tx, entry.getNext());
+            doRemove(tx, toRemove);
+        }
+    }
+
+    synchronized void load(Transaction tx) throws IOException {
+        if (loaded.compareAndSet(false, true)) {
+            final Page<EntryLocation> p = tx.load(this.rootId, null);
+            if (p.getType() == Page.PAGE_FREE_TYPE) {
+                // Need to initialize it..
+                EntryLocation root = createEntry(p, "root", EntryLocation.NOT_SET, EntryLocation.NOT_SET);
+
+                storeEntry(tx, root);
+                this.lastId = root.getPage().getPageId();
+            } else {
+                // find last id
+                long nextId = this.rootId;
+                while (nextId != EntryLocation.NOT_SET) {
+                    EntryLocation next = getNext(tx, nextId);
+                    if (next != null) {
+                        this.lastId = next.getPage().getPageId();
+                        nextId = next.getNext();
+                        this.size++;
+                    }
+                }
+            }
+        }
+    }
+
+    synchronized public void unload() {
+        if (loaded.compareAndSet(true, false)) {
+        }
+    }
+
+    synchronized public void addLast(final String id, final ByteSequence bs) throws IOException {
+        this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() {
+            public void execute(Transaction tx) throws IOException {
+                addLast(tx, id, bs);
+            }
+        });
+    }
+
+    void addLast(Transaction tx, String id, ByteSequence bs) throws IOException {
+        Location location = this.store.write(bs, false);
+        EntryLocation entry = createEntry(tx, id, this.lastId, EntryLocation.NOT_SET);
+        entry.setLocation(location);
+        storeEntry(tx, entry);
+        this.store.incrementJournalCount(tx, location);
+
+        EntryLocation last = loadEntry(tx, this.lastId);
+        last.setNext(entry.getPage().getPageId());
+        storeEntry(tx, last);
+        this.lastId = entry.getPage().getPageId();
+        this.size++;
+    }
+
+    synchronized public void addFirst(final String id, final ByteSequence bs) throws IOException {
+        this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() {
+            public void execute(Transaction tx) throws IOException {
+                addFirst(tx, id, bs);
+            }
+        });
+    }
+
+    void addFirst(Transaction tx, String id, ByteSequence bs) throws IOException {
+        Location location = this.store.write(bs, false);
+        EntryLocation entry = createEntry(tx, id, EntryLocation.NOT_SET, EntryLocation.NOT_SET);
+        entry.setLocation(location);
+        EntryLocation oldFirst = getFirst(tx);
+        if (oldFirst != null) {
+            oldFirst.setPrev(entry.getPage().getPageId());
+            storeEntry(tx, oldFirst);
+            entry.setNext(oldFirst.getPage().getPageId());
+
+        }
+        EntryLocation root = getRoot(tx);
+        root.setNext(entry.getPage().getPageId());
+        storeEntry(tx, root);
+        storeEntry(tx, entry);
+
+        this.store.incrementJournalCount(tx, location);
+        this.size++;
+    }
+
+    synchronized public boolean remove(final String id) throws IOException {
+        final AtomicBoolean result = new AtomicBoolean();
+        this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() {
+            public void execute(Transaction tx) throws IOException {
+                result.set(remove(tx, id));
+            }
+        });
+        return result.get();
+    }
+
+    synchronized public boolean remove(final int position) throws IOException {
+        final AtomicBoolean result = new AtomicBoolean();
+        this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() {
+            public void execute(Transaction tx) throws IOException {
+                result.set(remove(tx, position));
+            }
+        });
+        return result.get();
+    }
+
+    synchronized public boolean remove(final PListEntry entry) throws IOException {
+        final AtomicBoolean result = new AtomicBoolean();
+        this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() {
+            public void execute(Transaction tx) throws IOException {
+                result.set(doRemove(tx, entry.getEntry()));
+            }
+        });
+        return result.get();
+    }
+
+    synchronized public PListEntry get(final int position) throws IOException {
+        PListEntry result = null;
+        final AtomicReference<EntryLocation> ref = new AtomicReference<EntryLocation>();
+        this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() {
+            public void execute(Transaction tx) throws IOException {
+                ref.set(get(tx, position));
+            }
+        });
+        if (ref.get() != null) {
+            ByteSequence bs = this.store.getPayload(ref.get().getLocation());
+            result = new PListEntry(ref.get(), bs);
+        }
+        return result;
+    }
+
+    synchronized public PListEntry getFirst() throws IOException {
+        PListEntry result = null;
+        final AtomicReference<EntryLocation> ref = new AtomicReference<EntryLocation>();
+        this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() {
+            public void execute(Transaction tx) throws IOException {
+                ref.set(getFirst(tx));
+            }
+        });
+        if (ref.get() != null) {
+            ByteSequence bs = this.store.getPayload(ref.get().getLocation());
+            result = new PListEntry(ref.get(), bs);
+        }
+        return result;
+    }
+
+    synchronized public PListEntry getLast() throws IOException {
+        PListEntry result = null;
+        final AtomicReference<EntryLocation> ref = new AtomicReference<EntryLocation>();
+        this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() {
+            public void execute(Transaction tx) throws IOException {
+                ref.set(getLast(tx));
+            }
+        });
+        if (ref.get() != null) {
+            ByteSequence bs = this.store.getPayload(ref.get().getLocation());
+            result = new PListEntry(ref.get(), bs);
+        }
+        return result;
+    }
+
+    synchronized public PListEntry getNext(PListEntry entry) throws IOException {
+        PListEntry result = null;
+        final long nextId = entry != null ? entry.getEntry().getNext() : this.rootId;
+        if (nextId != EntryLocation.NOT_SET) {
+            final AtomicReference<EntryLocation> ref = new AtomicReference<EntryLocation>();
+            this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() {
+                public void execute(Transaction tx) throws IOException {
+                    ref.set(getNext(tx, nextId));
+                }
+            });
+            if (ref.get() != null) {
+                ByteSequence bs = this.store.getPayload(ref.get().getLocation());
+                result = new PListEntry(ref.get(), bs);
+            }
+        }
+        return result;
+    }
+
+    synchronized public PListEntry refresh(final PListEntry entry) throws IOException {
+        PListEntry result = null;
+        final AtomicReference<EntryLocation> ref = new AtomicReference<EntryLocation>();
+        this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() {
+            public void execute(Transaction tx) throws IOException {
+                ref.set(loadEntry(tx, entry.getEntry().getPage().getPageId()));
+            }
+        });
+        if (ref.get() != null) {
+
+            result = new PListEntry(ref.get(), entry.getByteSequence());
+        }
+        return result;
+    }
+
+    boolean remove(Transaction tx, String id) throws IOException {
+        boolean result = false;
+        long nextId = this.rootId;
+        while (nextId != EntryLocation.NOT_SET) {
+            EntryLocation entry = getNext(tx, nextId);
+            if (entry != null) {
+                if (entry.getId().equals(id)) {
+                    result = doRemove(tx, entry);
+                    break;
+                }
+            }
+        }
+        return result;
+    }
+
+    boolean remove(Transaction tx, int position) throws IOException {
+        boolean result = false;
+        long nextId = this.rootId;
+        int count = 0;
+        while (nextId != EntryLocation.NOT_SET) {
+            EntryLocation entry = getNext(tx, nextId);
+            if (entry != null) {
+                if (count == position) {
+                    result = doRemove(tx, entry);
+                    break;
+                }
+            }
+            count++;
+        }
+        return result;
+    }
+
+    EntryLocation get(Transaction tx, int position) throws IOException {
+        EntryLocation result = null;
+        long nextId = this.rootId;
+        int count = -1;
+        while (nextId != EntryLocation.NOT_SET) {
+            EntryLocation entry = getNext(tx, nextId);
+            if (entry != null) {
+                if (count == position) {
+                    result = entry;
+                    break;
+                }
+                nextId = entry.getNext();
+            } else {
+                break;
+            }
+            count++;
+        }
+        return result;
+    }
+
+    EntryLocation getFirst(Transaction tx) throws IOException {
+        long offset = getRoot(tx).getNext();
+        if (offset != EntryLocation.NOT_SET) {
+            return loadEntry(tx, offset);
+        }
+        return null;
+    }
+
+    EntryLocation getLast(Transaction tx) throws IOException {
+        if (this.lastId != EntryLocation.NOT_SET) {
+            return loadEntry(tx, this.lastId);
+        }
+        return null;
+    }
+
+    boolean doRemove(Transaction tx, EntryLocation entry) throws IOException {
+        boolean result = false;
+        if (entry != null) {
+
+            EntryLocation prev = getPrevious(tx, entry.getPrev());
+            EntryLocation next = getNext(tx, entry.getNext());
+            long prevId = prev != null ? prev.getPage().getPageId() : this.rootId;
+            long nextId = next != null ? next.getPage().getPageId() : EntryLocation.NOT_SET;
+
+            if (next != null) {
+                next.setPrev(prevId);
+                storeEntry(tx, next);
+            } else {
+                // we are deleting the last one in the list
+                this.lastId = prevId;
+            }
+            if (prev != null) {
+                prev.setNext(nextId);
+                storeEntry(tx, prev);
+            }
+
+            this.store.decrementJournalCount(tx, entry.getLocation());
+            entry.reset();
+            storeEntry(tx, entry);
+            tx.free(entry.getPage().getPageId());
+            result = true;
+            this.size--;
+        }
+        return result;
+    }
+
+    private EntryLocation createEntry(Transaction tx, String id, long previous, long next) throws IOException {
+        Page<EntryLocation> p = tx.allocate();
+        EntryLocation result = new EntryLocation();
+        result.setPage(p);
+        p.set(result);
+        result.setId(id);
+        result.setPrev(previous);
+        result.setNext(next);
+        return result;
+    }
+
+    private EntryLocation createEntry(Page<EntryLocation> p, String id, long previous, long next) throws IOException {
+        EntryLocation result = new EntryLocation();
+        result.setPage(p);
+        p.set(result);
+        result.setId(id);
+        result.setPrev(previous);
+        result.setNext(next);
+        return result;
+    }
+
+    EntryLocation loadEntry(Transaction tx, long pageId) throws IOException {
+        Page<EntryLocation> page = tx.load(pageId, EntryLocationMarshaller.INSTANCE);
+        EntryLocation entry = page.get();
+        entry.setPage(page);
+        return entry;
+    }
+    private void storeEntry(Transaction tx, EntryLocation entry) throws IOException {
+        tx.store(entry.getPage(), EntryLocationMarshaller.INSTANCE, true);
+    }
+
+    EntryLocation getNext(Transaction tx, long next) throws IOException {
+        EntryLocation result = null;
+        if (next != EntryLocation.NOT_SET) {
+            result = loadEntry(tx, next);
+        }
+        return result;
+    }
+
+    private EntryLocation getPrevious(Transaction tx, long previous) throws IOException {
+        EntryLocation result = null;
+        if (previous != EntryLocation.NOT_SET) {
+            result = loadEntry(tx, previous);
+        }
+        return result;
+    }
+
+    private EntryLocation getRoot(Transaction tx) throws IOException {
+        EntryLocation result = loadEntry(tx, this.rootId);
+        return result;
+    }
+
+    ByteSequence getPayload(EntryLocation entry) throws IOException {
+        return this.store.getPayload(entry.getLocation());
+    }
+}

Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/plist/PList.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/plist/PList.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Added: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/plist/PListEntry.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/plist/PListEntry.java?rev=901188&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/plist/PListEntry.java (added)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/plist/PListEntry.java Wed Jan 20 13:27:05 2010
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.store.kahadb.plist;
+
+import org.apache.kahadb.util.ByteSequence;
+
+public class PListEntry {
+
+    private final ByteSequence byteSequence;
+    private final EntryLocation entry;
+
+    PListEntry(EntryLocation entry, ByteSequence bs) {
+        this.entry = entry;
+        this.byteSequence = bs;
+    }
+
+    /**
+     * @return the byteSequence
+     */
+    public ByteSequence getByteSequence() {
+        return this.byteSequence;
+    }
+
+    public String getId() {
+        return this.entry.getId();
+    }
+
+    /**
+     * @return the entry
+     */
+    EntryLocation getEntry() {
+        return this.entry;
+    }
+
+    public PListEntry copy() {
+        return new PListEntry(this.entry, this.byteSequence);
+    }
+
+    @Override
+    public String toString() {
+        return this.entry.getId() + "[pageId=" + this.entry.getPage().getPageId() + ",next=" + this.entry.getNext()
+                + "]";
+    }
+
+}

Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/plist/PListEntry.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/plist/PListEntry.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Added: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/plist/PListStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/plist/PListStore.java?rev=901188&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/plist/PListStore.java (added)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/plist/PListStore.java Wed Jan 20 13:27:05 2010
@@ -0,0 +1,387 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.store.kahadb.plist;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.Map.Entry;
+import org.apache.activemq.util.IOHelper;
+import org.apache.activemq.util.ServiceStopper;
+import org.apache.activemq.util.ServiceSupport;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.kahadb.index.BTreeIndex;
+import org.apache.kahadb.journal.Journal;
+import org.apache.kahadb.journal.Location;
+import org.apache.kahadb.page.Page;
+import org.apache.kahadb.page.PageFile;
+import org.apache.kahadb.page.Transaction;
+import org.apache.kahadb.util.ByteSequence;
+import org.apache.kahadb.util.IntegerMarshaller;
+import org.apache.kahadb.util.LockFile;
+import org.apache.kahadb.util.StringMarshaller;
+import org.apache.kahadb.util.VariableMarshaller;
+
+public class PListStore extends ServiceSupport {
+    static final Log LOG = LogFactory.getLog(PListStore.class);
+    private static final int DATABASE_LOCKED_WAIT_DELAY = 10 * 1000;
+
+    public static final int CLOSED_STATE = 1;
+    public static final int OPEN_STATE = 2;
+
+    private File directory;
+    PageFile pageFile;
+    private Journal journal;
+    private LockFile lockFile;
+    private boolean failIfDatabaseIsLocked;
+    private int journalMaxFileLength = Journal.DEFAULT_MAX_FILE_LENGTH;
+    private int journalMaxWriteBatchSize = Journal.DEFAULT_MAX_WRITE_BATCH_SIZE;
+    private boolean enableIndexWriteAsync = false;
+    // private int indexWriteBatchSize = PageFile.DEFAULT_WRITE_BATCH_SIZE;
+    MetaData metaData = new MetaData(this);
+    final MetaDataMarshaller metaDataMarshaller = new MetaDataMarshaller(this);
+    Map<String, PList> persistentLists = new HashMap<String, PList>();
+
+    protected class MetaData {
+        protected MetaData(PListStore store) {
+            this.store = store;
+            LinkedList list = new LinkedList();
+        }
+        private final PListStore store;
+        Page<MetaData> page;
+        BTreeIndex<Integer, Integer> journalRC;
+        BTreeIndex<String, PList> storedSchedulers;
+
+        void createIndexes(Transaction tx) throws IOException {
+            this.storedSchedulers = new BTreeIndex<String, PList>(pageFile, tx.allocate().getPageId());
+            this.journalRC = new BTreeIndex<Integer, Integer>(pageFile, tx.allocate().getPageId());
+        }
+
+        void load(Transaction tx) throws IOException {
+            this.storedSchedulers.setKeyMarshaller(StringMarshaller.INSTANCE);
+            this.storedSchedulers.setValueMarshaller(new JobSchedulerMarshaller(this.store));
+            this.storedSchedulers.load(tx);
+            this.journalRC.setKeyMarshaller(IntegerMarshaller.INSTANCE);
+            this.journalRC.setValueMarshaller(IntegerMarshaller.INSTANCE);
+            this.journalRC.load(tx);
+        }
+
+        void loadLists(Transaction tx, Map<String, PList> schedulers) throws IOException {
+            for (Iterator<Entry<String, PList>> i = this.storedSchedulers.iterator(tx); i.hasNext();) {
+                Entry<String, PList> entry = i.next();
+                entry.getValue().load(tx);
+                schedulers.put(entry.getKey(), entry.getValue());
+            }
+        }
+
+        public void read(DataInput is) throws IOException {
+            this.storedSchedulers = new BTreeIndex<String, PList>(pageFile, is.readLong());
+            this.storedSchedulers.setKeyMarshaller(StringMarshaller.INSTANCE);
+            this.storedSchedulers.setValueMarshaller(new JobSchedulerMarshaller(this.store));
+            this.journalRC = new BTreeIndex<Integer, Integer>(pageFile, is.readLong());
+            this.journalRC.setKeyMarshaller(IntegerMarshaller.INSTANCE);
+            this.journalRC.setValueMarshaller(IntegerMarshaller.INSTANCE);
+        }
+
+        public void write(DataOutput os) throws IOException {
+            os.writeLong(this.storedSchedulers.getPageId());
+            os.writeLong(this.journalRC.getPageId());
+
+        }
+    }
+
+    class MetaDataMarshaller extends VariableMarshaller<MetaData> {
+        private final PListStore store;
+
+        MetaDataMarshaller(PListStore store) {
+            this.store = store;
+        }
+        public MetaData readPayload(DataInput dataIn) throws IOException {
+            MetaData rc = new MetaData(this.store);
+            rc.read(dataIn);
+            return rc;
+        }
+
+        public void writePayload(MetaData object, DataOutput dataOut) throws IOException {
+            object.write(dataOut);
+        }
+    }
+
+    class ValueMarshaller extends VariableMarshaller<List<EntryLocation>> {
+        public List<EntryLocation> readPayload(DataInput dataIn) throws IOException {
+            List<EntryLocation> result = new ArrayList<EntryLocation>();
+            int size = dataIn.readInt();
+            for (int i = 0; i < size; i++) {
+                EntryLocation jobLocation = new EntryLocation();
+                jobLocation.readExternal(dataIn);
+                result.add(jobLocation);
+            }
+            return result;
+        }
+
+        public void writePayload(List<EntryLocation> value, DataOutput dataOut) throws IOException {
+            dataOut.writeInt(value.size());
+            for (EntryLocation jobLocation : value) {
+                jobLocation.writeExternal(dataOut);
+            }
+        }
+    }
+
+    class JobSchedulerMarshaller extends VariableMarshaller<PList> {
+        private final PListStore store;
+        JobSchedulerMarshaller(PListStore store) {
+            this.store = store;
+        }
+        public PList readPayload(DataInput dataIn) throws IOException {
+            PList result = new PList(this.store);
+            result.read(dataIn);
+            return result;
+        }
+
+        public void writePayload(PList js, DataOutput dataOut) throws IOException {
+            js.write(dataOut);
+        }
+    }
+
+    public File getDirectory() {
+        return directory;
+    }
+
+    public void setDirectory(File directory) {
+        this.directory = directory;
+    }
+    
+    public long size() {
+        if ( !isStarted() ) {
+            return 0;
+        }
+        try {
+            return journal.getDiskSize() + pageFile.getDiskSize();
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    public PList getPList(final String name) throws Exception {
+        PList result = this.persistentLists.get(name);
+        if (result == null) {
+            final PList pl = new PList(this);
+            pl.setName(name);
+            getPageFile().tx().execute(new Transaction.Closure<IOException>() {
+                public void execute(Transaction tx) throws IOException {
+                    pl.setRootId(tx.allocate().getPageId());
+                    pl.load(tx);
+                    metaData.storedSchedulers.put(tx, name, pl);
+                }
+            });
+            result = pl;
+            this.persistentLists.put(name, pl);
+        }
+        final PList load = result;
+        getPageFile().tx().execute(new Transaction.Closure<IOException>() {
+            public void execute(Transaction tx) throws IOException {
+                load.load(tx);
+            }
+        });
+       
+        return result;
+    }
+
+    synchronized public boolean removePList(final String name) throws Exception {
+        boolean result = false;
+        final PList pl = this.persistentLists.remove(name);
+        result = pl != null;
+        if (result) {
+            getPageFile().tx().execute(new Transaction.Closure<IOException>() {
+                public void execute(Transaction tx) throws IOException {
+                    metaData.storedSchedulers.remove(tx, name);
+                    pl.destroy(tx);
+                }
+            });
+        }
+        return result;
+    }
+
+    @Override
+    protected synchronized void doStart() throws Exception {
+        if (this.directory == null) {
+            this.directory = new File(IOHelper.getDefaultDataDirectory() + File.pathSeparator + "delayedDB");
+        }
+        IOHelper.mkdirs(this.directory);
+        lock();
+        this.journal = new Journal();
+        this.journal.setDirectory(directory);
+        this.journal.setMaxFileLength(getJournalMaxFileLength());
+        this.journal.setWriteBatchSize(getJournalMaxWriteBatchSize());
+        this.journal.start();
+        this.pageFile = new PageFile(directory, "scheduleDB");
+        this.pageFile.load();
+
+        this.pageFile.tx().execute(new Transaction.Closure<IOException>() {
+            public void execute(Transaction tx) throws IOException {
+                if (pageFile.getPageCount() == 0) {
+                    Page<MetaData> page = tx.allocate();
+                    assert page.getPageId() == 0;
+                    page.set(metaData);
+                    metaData.page = page;
+                    metaData.createIndexes(tx);
+                    tx.store(metaData.page, metaDataMarshaller, true);
+
+                } else {
+                    Page<MetaData> page = tx.load(0, metaDataMarshaller);
+                    metaData = page.get();
+                    metaData.page = page;
+                }
+                metaData.load(tx);
+                metaData.loadLists(tx, persistentLists);
+            }
+        });
+
+        this.pageFile.flush();
+        LOG.info(this + " started");
+    }
+    
+    @Override
+    protected synchronized void doStop(ServiceStopper stopper) throws Exception {
+        for (PList pl : this.persistentLists.values()) {
+            pl.unload();
+        }
+        if (this.pageFile != null) {
+            this.pageFile.unload();
+        }
+        if (this.journal != null) {
+            journal.close();
+        }
+        if (this.lockFile != null) {
+            this.lockFile.unlock();
+        }
+        this.lockFile = null;
+        LOG.info(this + " stopped");
+
+    }
+
+    synchronized void incrementJournalCount(Transaction tx, Location location) throws IOException {
+        int logId = location.getDataFileId();
+        Integer val = this.metaData.journalRC.get(tx, logId);
+        int refCount = val != null ? val.intValue() + 1 : 1;
+        this.metaData.journalRC.put(tx, logId, refCount);
+
+    }
+
+    synchronized void decrementJournalCount(Transaction tx, Location location) throws IOException {
+        int logId = location.getDataFileId();
+        int refCount = this.metaData.journalRC.get(tx, logId);
+        refCount--;
+        if (refCount <= 0) {
+            this.metaData.journalRC.remove(tx, logId);
+            Set<Integer> set = new HashSet<Integer>();
+            set.add(logId);
+            this.journal.removeDataFiles(set);
+        } else {
+            this.metaData.journalRC.put(tx, logId, refCount);
+        }
+
+    }
+
+    synchronized ByteSequence getPayload(Location location) throws IllegalStateException, IOException {
+        ByteSequence result = null;
+        result = this.journal.read(location);
+        return result;
+    }
+
+    synchronized Location write(ByteSequence payload, boolean sync) throws IllegalStateException, IOException {
+        return this.journal.write(payload, sync);
+    }
+
+    private void lock() throws IOException {
+        if (lockFile == null) {
+            File lockFileName = new File(directory, "lock");
+            lockFile = new LockFile(lockFileName, true);
+            if (failIfDatabaseIsLocked) {
+                lockFile.lock();
+            } else {
+                while (true) {
+                    try {
+                        lockFile.lock();
+                        break;
+                    } catch (IOException e) {
+                        LOG.info("Database " + lockFileName + " is locked... waiting "
+                                + (DATABASE_LOCKED_WAIT_DELAY / 1000)
+                                + " seconds for the database to be unlocked. Reason: " + e);
+                        try {
+                            Thread.sleep(DATABASE_LOCKED_WAIT_DELAY);
+                        } catch (InterruptedException e1) {
+                        }
+                    }
+                }
+            }
+        }
+    }
+
+    PageFile getPageFile() {
+        this.pageFile.isLoaded();
+        return this.pageFile;
+    }
+
+    public boolean isFailIfDatabaseIsLocked() {
+        return failIfDatabaseIsLocked;
+    }
+
+    public void setFailIfDatabaseIsLocked(boolean failIfDatabaseIsLocked) {
+        this.failIfDatabaseIsLocked = failIfDatabaseIsLocked;
+    }
+
+    public int getJournalMaxFileLength() {
+        return journalMaxFileLength;
+    }
+
+    public void setJournalMaxFileLength(int journalMaxFileLength) {
+        this.journalMaxFileLength = journalMaxFileLength;
+    }
+
+    public int getJournalMaxWriteBatchSize() {
+        return journalMaxWriteBatchSize;
+    }
+
+    public void setJournalMaxWriteBatchSize(int journalMaxWriteBatchSize) {
+        this.journalMaxWriteBatchSize = journalMaxWriteBatchSize;
+    }
+
+    public boolean isEnableIndexWriteAsync() {
+        return enableIndexWriteAsync;
+    }
+
+    public void setEnableIndexWriteAsync(boolean enableIndexWriteAsync) {
+        this.enableIndexWriteAsync = enableIndexWriteAsync;
+    }
+
+    @Override
+    public String toString() {
+        return "JobSchedulerStore:" + this.directory;
+    }
+
+}

Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/plist/PListStore.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/plist/PListStore.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/usage/SystemUsage.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/usage/SystemUsage.java?rev=901188&r1=901187&r2=901188&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/usage/SystemUsage.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/usage/SystemUsage.java Wed Jan 20 13:27:05 2010
@@ -19,8 +19,8 @@
 import java.util.List;
 import java.util.concurrent.CopyOnWriteArrayList;
 import org.apache.activemq.Service;
-import org.apache.activemq.kaha.Store;
 import org.apache.activemq.store.PersistenceAdapter;
+import org.apache.activemq.store.kahadb.plist.PListStore;
 
 /**
  * Holder for Usage instances for memory, store and temp files Main use case is
@@ -43,13 +43,13 @@
      */
     private boolean sendFailIfNoSpaceExplicitySet;
     private boolean sendFailIfNoSpace;
-    private List<SystemUsage> children = new CopyOnWriteArrayList<SystemUsage>();
+    private final List<SystemUsage> children = new CopyOnWriteArrayList<SystemUsage>();
 
     public SystemUsage() {
         this("default", null, null);
     }
 
-    public SystemUsage(String name, PersistenceAdapter adapter, Store tempStore) {
+    public SystemUsage(String name, PersistenceAdapter adapter, PListStore tempStore) {
         this.parent = null;
         this.name = name;
         this.memoryUsage = new MemoryUsage(name + ":memory");
@@ -90,6 +90,7 @@
         return this.tempUsage;
     }
 
+    @Override
     public String toString() {
         return "UsageManager(" + getName() + ")";
     }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/usage/TempUsage.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/usage/TempUsage.java?rev=901188&r1=901187&r2=901188&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/usage/TempUsage.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/usage/TempUsage.java Wed Jan 20 13:27:05 2010
@@ -16,7 +16,8 @@
  */
 package org.apache.activemq.usage;
 
-import org.apache.activemq.kaha.Store;
+import org.apache.activemq.store.kahadb.plist.PListStore;
+
 
 /**
  * Used to keep track of how much of something is being used so that a
@@ -28,13 +29,13 @@
  */
 public class TempUsage extends Usage<TempUsage> {
 
-    private Store store;
+    private PListStore store;
 
     public TempUsage() {
         super(null, null, 1.0f);
     }
 
-    public TempUsage(String name, Store store) {
+    public TempUsage(String name, PListStore store) {
         super(null, name, 1.0f);
         this.store = store;
     }
@@ -44,6 +45,7 @@
         this.store = parent.store;
     }
 
+    @Override
     protected long retrieveUsage() {
         if (store == null) {
             return 0;
@@ -51,11 +53,11 @@
         return store.size();
     }
 
-    public Store getStore() {
+    public PListStore getStore() {
         return store;
     }
 
-    public void setStore(Store store) {
+    public void setStore(PListStore store) {
         this.store = store;
         onLimitChange();
     }

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/TempStorageBlockedBrokerTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/TempStorageBlockedBrokerTest.java?rev=901188&r1=901187&r2=901188&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/TempStorageBlockedBrokerTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/TempStorageBlockedBrokerTest.java Wed Jan 20 13:27:05 2010
@@ -17,7 +17,6 @@
 package org.apache.activemq.bugs;
 
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
 import java.io.File;
 import java.util.concurrent.atomic.AtomicInteger;
 import javax.jms.Connection;
@@ -33,8 +32,8 @@
 import org.apache.activemq.broker.region.policy.PolicyEntry;
 import org.apache.activemq.broker.region.policy.PolicyMap;
 import org.apache.activemq.command.ActiveMQTopic;
-import org.apache.activemq.kaha.Store;
 import org.apache.activemq.store.amq.AMQPersistenceAdapter;
+import org.apache.activemq.store.kahadb.plist.PListStore;
 import org.apache.activemq.usage.MemoryUsage;
 import org.apache.activemq.usage.StoreUsage;
 import org.apache.activemq.usage.SystemUsage;
@@ -82,6 +81,7 @@
         producerConnection.start();
 
         Thread producingThread = new Thread("Producing thread") {
+            @Override
             public void run() {
                 try {
                     Session session = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
@@ -180,7 +180,9 @@
         persistence.setDirectory(directory);
         File tmpDir = new File(directory, "tmp");
         IOHelper.deleteChildren(tmpDir);
-        Store tempStore = new org.apache.activemq.kaha.impl.KahaStore(tmpDir, "rw");
+        PListStore tempStore = new PListStore();
+        tempStore.setDirectory(tmpDir);
+        tempStore.start();
 
         SystemUsage sysUsage = new SystemUsage("mySysUsage", persistence, tempStore);
         MemoryUsage memUsage = new MemoryUsage();

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/KahaDBQueueTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/KahaDBQueueTest.java?rev=901188&r1=901187&r2=901188&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/KahaDBQueueTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/KahaDBQueueTest.java Wed Jan 20 13:27:05 2010
@@ -17,7 +17,6 @@
 package org.apache.activemq.perf;
 
 import java.io.File;
-
 import org.apache.activemq.broker.BrokerService;
 import org.apache.activemq.store.kahadb.KahaDBStore;
 
@@ -26,19 +25,22 @@
  */
 public class KahaDBQueueTest extends SimpleQueueTest {
 
+    @Override
     protected void configureBroker(BrokerService answer,String uri) throws Exception {
 
         File dataFileDir = new File("target/test-amq-data/perfTest/kahadb");
-
+        File archiveDir = new File(dataFileDir,"archive");
         KahaDBStore kaha = new KahaDBStore();
         kaha.setDirectory(dataFileDir);
+        kaha.setDirectoryArchive(archiveDir);
+        kaha.setArchiveDataLogs(true);
 
         // The setEnableJournalDiskSyncs(false) setting is a little dangerous right now, as I have not verified 
         // what happens if the index is updated but a journal update is lost.
         // Index is going to be in consistent, but can it be repaired?
         kaha.setEnableJournalDiskSyncs(false);
         // Using a bigger journal file size makes he take fewer spikes as it is not switching files as often.
-        kaha.setJournalMaxFileLength(1024*100);
+        //kaha.setJournalMaxFileLength(1024*1024*100);
         
         // small batch means more frequent and smaller writes
         kaha.setIndexWriteBatchSize(100);



Mime
View raw message