activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chir...@apache.org
Subject svn commit: r1406370 - in /activemq/trunk/activemq-core/src: main/java/org/apache/activemq/broker/ main/java/org/apache/activemq/broker/region/ main/java/org/apache/activemq/broker/region/cursors/ main/java/org/apache/activemq/store/ main/java/org/apac...
Date Tue, 06 Nov 2012 22:04:51 GMT
Author: chirino
Date: Tue Nov  6 22:04:50 2012
New Revision: 1406370

URL: http://svn.apache.org/viewvc?rev=1406370&view=rev
Log:
Extract out a PListStore interface so that the broker can be decoupled from the KahaDB store implementation.

Added:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/JournaledStore.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/PList.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/PListEntry.java
      - copied, changed from r1406369, activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/plist/PListEntry.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/PListStore.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/plist/PListImpl.java
      - copied, changed from r1406369, activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/plist/PList.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/plist/PListStoreImpl.java
      - copied, changed from r1406369, activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/plist/PListStore.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursorTestSupport.java
      - copied, changed from r1406369, activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursorTest.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/kahadb/plist/KahaDBFilePendingMessageCursorTest.java
      - copied, changed from r1406369, activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursorTest.java
Removed:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/plist/PList.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/plist/PListEntry.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/plist/PListStore.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursorTest.java
Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/Broker.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerFilter.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/EmptyBroker.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ErrorBroker.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/MutableBrokerFilter.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapter.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapter.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/usage/SystemUsage.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/usage/TempUsage.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/TempStorageBlockedBrokerTest.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/TempStorageConfigBrokerTest.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/TempStoreDataCleanupTest.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/kahadb/plist/PListTest.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/Broker.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/Broker.java?rev=1406370&r1=1406369&r2=1406370&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/Broker.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/Broker.java Tue Nov  6 22:04:50 2012
@@ -33,8 +33,7 @@ import org.apache.activemq.command.Messa
 import org.apache.activemq.command.ProducerInfo;
 import org.apache.activemq.command.SessionInfo;
 import org.apache.activemq.command.TransactionId;
-import org.apache.activemq.network.NetworkBridge;
-import org.apache.activemq.store.kahadb.plist.PListStore;
+import org.apache.activemq.store.PListStore;
 import org.apache.activemq.thread.Scheduler;
 import org.apache.activemq.usage.Usage;
 

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerFilter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerFilter.java?rev=1406370&r1=1406369&r2=1406370&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerFilter.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerFilter.java Tue Nov  6 22:04:50 2012
@@ -40,7 +40,7 @@ import org.apache.activemq.command.Remov
 import org.apache.activemq.command.Response;
 import org.apache.activemq.command.SessionInfo;
 import org.apache.activemq.command.TransactionId;
-import org.apache.activemq.store.kahadb.plist.PListStore;
+import org.apache.activemq.store.PListStore;
 import org.apache.activemq.thread.Scheduler;
 import org.apache.activemq.usage.Usage;
 

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java?rev=1406370&r1=1406369&r2=1406370&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java Tue Nov  6 22:04:50 2012
@@ -16,64 +16,14 @@
  */
 package org.apache.activemq.broker;
 
-import java.io.BufferedReader;
-import java.io.File;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.InputStreamReader;
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.net.UnknownHostException;
-import java.util.ArrayList;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Locale;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.RejectedExecutionException;
-import java.util.concurrent.RejectedExecutionHandler;
-import java.util.concurrent.SynchronousQueue;
-import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import javax.annotation.PostConstruct;
-import javax.annotation.PreDestroy;
-import javax.management.MalformedObjectNameException;
-import javax.management.ObjectName;
-
 import org.apache.activemq.ActiveMQConnectionMetaData;
 import org.apache.activemq.ConfigurationException;
 import org.apache.activemq.Service;
 import org.apache.activemq.advisory.AdvisoryBroker;
 import org.apache.activemq.broker.cluster.ConnectionSplitBroker;
 import org.apache.activemq.broker.ft.MasterConnector;
-import org.apache.activemq.broker.jmx.AnnotatedMBean;
-import org.apache.activemq.broker.jmx.BrokerView;
-import org.apache.activemq.broker.jmx.ConnectorView;
-import org.apache.activemq.broker.jmx.ConnectorViewMBean;
-import org.apache.activemq.broker.jmx.FTConnectorView;
-import org.apache.activemq.broker.jmx.JmsConnectorView;
-import org.apache.activemq.broker.jmx.JobSchedulerView;
-import org.apache.activemq.broker.jmx.JobSchedulerViewMBean;
-import org.apache.activemq.broker.jmx.ManagedRegionBroker;
-import org.apache.activemq.broker.jmx.ManagementContext;
-import org.apache.activemq.broker.jmx.NetworkConnectorView;
-import org.apache.activemq.broker.jmx.NetworkConnectorViewMBean;
-import org.apache.activemq.broker.jmx.ProxyConnectorView;
-import org.apache.activemq.broker.region.CompositeDestinationInterceptor;
-import org.apache.activemq.broker.region.Destination;
-import org.apache.activemq.broker.region.DestinationFactory;
-import org.apache.activemq.broker.region.DestinationFactoryImpl;
-import org.apache.activemq.broker.region.DestinationInterceptor;
-import org.apache.activemq.broker.region.RegionBroker;
+import org.apache.activemq.broker.jmx.*;
+import org.apache.activemq.broker.region.*;
 import org.apache.activemq.broker.region.policy.PolicyMap;
 import org.apache.activemq.broker.region.virtual.MirroredQueue;
 import org.apache.activemq.broker.region.virtual.VirtualDestination;
@@ -91,11 +41,10 @@ import org.apache.activemq.network.jms.J
 import org.apache.activemq.proxy.ProxyConnector;
 import org.apache.activemq.security.MessageAuthorizationPolicy;
 import org.apache.activemq.selector.SelectorParser;
+import org.apache.activemq.store.JournaledStore;
+import org.apache.activemq.store.PListStore;
 import org.apache.activemq.store.PersistenceAdapter;
 import org.apache.activemq.store.PersistenceAdapterFactory;
-import org.apache.activemq.store.amq.AMQPersistenceAdapter;
-import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
-import org.apache.activemq.store.kahadb.plist.PListStore;
 import org.apache.activemq.store.memory.MemoryPersistenceAdapter;
 import org.apache.activemq.thread.Scheduler;
 import org.apache.activemq.thread.TaskRunnerFactory;
@@ -104,21 +53,23 @@ import org.apache.activemq.transport.Tra
 import org.apache.activemq.transport.stomp.ProtocolConverter;
 import org.apache.activemq.transport.vm.VMTransportFactory;
 import org.apache.activemq.usage.SystemUsage;
-import org.apache.activemq.util.BrokerSupport;
-import org.apache.activemq.util.DefaultIOExceptionHandler;
-import org.apache.activemq.util.IOExceptionHandler;
-import org.apache.activemq.util.IOExceptionSupport;
-import org.apache.activemq.util.IOHelper;
-import org.apache.activemq.util.InetAddressUtil;
-import org.apache.activemq.util.JMXSupport;
-import org.apache.activemq.util.ServiceStopper;
-import org.apache.activemq.util.ThreadPoolUtils;
-import org.apache.activemq.util.TimeUtils;
-import org.apache.activemq.util.URISupport;
+import org.apache.activemq.util.*;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.slf4j.MDC;
 
+import javax.annotation.PostConstruct;
+import javax.annotation.PreDestroy;
+import javax.management.MalformedObjectNameException;
+import javax.management.ObjectName;
+import java.io.*;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.net.UnknownHostException;
+import java.util.*;
+import java.util.concurrent.*;
+import java.util.concurrent.atomic.AtomicBoolean;
+
 /**
  * Manages the lifecycle of an ActiveMQ Broker. A BrokerService consists of a
  * number of transport connectors, network connectors and a bunch of properties
@@ -133,6 +84,8 @@ public class BrokerService implements Se
     public static final String LOCAL_HOST_NAME;
     public static final String BROKER_VERSION;
     public static final String DEFAULT_BROKER_NAME = "localhost";
+    public static final int DEFAULT_MAX_FILE_LENGTH = 1024 * 1024 * 32;
+
     private static final Logger LOG = LoggerFactory.getLogger(BrokerService.class);
     private static final long serialVersionUID = 7353129142305630237L;
     private boolean useJmx = true;
@@ -1660,7 +1613,9 @@ public class BrokerService implements Se
                     String str = result ? "Successfully deleted" : "Failed to delete";
                     LOG.info(str + " temporary storage");
                 }
-                this.tempDataStore = new PListStore();
+
+                String clazz = "org.apache.activemq.store.kahadb.plist.PListStoreImpl";
+                this.tempDataStore = (PListStore) getClass().getClassLoader().loadClass(clazz).newInstance();
                 this.tempDataStore.setDirectory(getTmpDataDirectory());
                 configureService(tempDataStore);
                 this.tempDataStore.start();
@@ -1892,12 +1847,8 @@ public class BrokerService implements Se
             long maxJournalFileSize = 0;
             long storeLimit = usage.getStoreUsage().getLimit();
 
-            if (adapter instanceof KahaDBPersistenceAdapter) {
-                KahaDBPersistenceAdapter kahaDB = (KahaDBPersistenceAdapter) adapter;
-                maxJournalFileSize = kahaDB.getJournalMaxFileLength();
-            } else if (adapter instanceof AMQPersistenceAdapter) {
-                AMQPersistenceAdapter amqAdapter = (AMQPersistenceAdapter) adapter;
-                maxJournalFileSize = amqAdapter.getMaxFileLength();
+            if (adapter instanceof JournaledStore) {
+                maxJournalFileSize = ((JournaledStore) adapter).getJournalMaxFileLength();
             }
 
             if (storeLimit < maxJournalFileSize) {
@@ -1930,10 +1881,11 @@ public class BrokerService implements Se
             if (isPersistent()) {
                 long maxJournalFileSize;
 
-                if (usage.getTempUsage().getStore() != null) {
-                    maxJournalFileSize = usage.getTempUsage().getStore().getJournalMaxFileLength();
+                PListStore store = usage.getTempUsage().getStore();
+                if (store != null && store instanceof JournaledStore) {
+                    maxJournalFileSize = ((JournaledStore) store).getJournalMaxFileLength();
                 } else {
-                    maxJournalFileSize = org.apache.activemq.store.kahadb.disk.journal.Journal.DEFAULT_MAX_FILE_LENGTH;
+                    maxJournalFileSize = DEFAULT_MAX_FILE_LENGTH;
                 }
 
                 if (storeLimit < maxJournalFileSize) {
@@ -2225,11 +2177,16 @@ public class BrokerService implements Se
             PersistenceAdapterFactory fac = getPersistenceFactory();
             if (fac != null) {
                 return fac.createPersistenceAdapter();
-            }else {
-                KahaDBPersistenceAdapter adaptor = new KahaDBPersistenceAdapter();
-                File dir = new File(getBrokerDataDirectory(),"KahaDB");
-                adaptor.setDirectory(dir);
-                return adaptor;
+            } else {
+                try {
+                    String clazz = "org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter";
+                    PersistenceAdapter adaptor = (PersistenceAdapter)getClass().getClassLoader().loadClass(clazz).newInstance();
+                    File dir = new File(getBrokerDataDirectory(),"KahaDB");
+                    adaptor.setDirectory(dir);
+                    return adaptor;
+                } catch (Throwable e) {
+                    throw IOExceptionSupport.create(e);
+                }
             }
         } else {
             return new MemoryPersistenceAdapter();

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/EmptyBroker.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/EmptyBroker.java?rev=1406370&r1=1406369&r2=1406370&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/EmptyBroker.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/EmptyBroker.java Tue Nov  6 22:04:50 2012
@@ -41,7 +41,7 @@ import org.apache.activemq.command.Remov
 import org.apache.activemq.command.Response;
 import org.apache.activemq.command.SessionInfo;
 import org.apache.activemq.command.TransactionId;
-import org.apache.activemq.store.kahadb.plist.PListStore;
+import org.apache.activemq.store.PListStore;
 import org.apache.activemq.thread.Scheduler;
 import org.apache.activemq.usage.Usage;
 

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ErrorBroker.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ErrorBroker.java?rev=1406370&r1=1406369&r2=1406370&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ErrorBroker.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ErrorBroker.java Tue Nov  6 22:04:50 2012
@@ -41,7 +41,7 @@ import org.apache.activemq.command.Remov
 import org.apache.activemq.command.Response;
 import org.apache.activemq.command.SessionInfo;
 import org.apache.activemq.command.TransactionId;
-import org.apache.activemq.store.kahadb.plist.PListStore;
+import org.apache.activemq.store.PListStore;
 import org.apache.activemq.thread.Scheduler;
 import org.apache.activemq.usage.Usage;
 

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/MutableBrokerFilter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/MutableBrokerFilter.java?rev=1406370&r1=1406369&r2=1406370&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/MutableBrokerFilter.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/MutableBrokerFilter.java Tue Nov  6 22:04:50 2012
@@ -41,7 +41,7 @@ import org.apache.activemq.command.Remov
 import org.apache.activemq.command.Response;
 import org.apache.activemq.command.SessionInfo;
 import org.apache.activemq.command.TransactionId;
-import org.apache.activemq.store.kahadb.plist.PListStore;
+import org.apache.activemq.store.PListStore;
 import org.apache.activemq.thread.Scheduler;
 import org.apache.activemq.usage.Usage;
 

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java?rev=1406370&r1=1406369&r2=1406370&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java Tue Nov  6 22:04:50 2012
@@ -28,7 +28,7 @@ import org.apache.activemq.broker.region
 import org.apache.activemq.broker.region.policy.PolicyMap;
 import org.apache.activemq.command.*;
 import org.apache.activemq.state.ConnectionState;
-import org.apache.activemq.store.kahadb.plist.PListStore;
+import org.apache.activemq.store.PListStore;
 import org.apache.activemq.thread.Scheduler;
 import org.apache.activemq.thread.TaskRunnerFactory;
 import org.apache.activemq.usage.SystemUsage;

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java?rev=1406370&r1=1406369&r2=1406370&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java Tue Nov  6 22:04:50 2012
@@ -30,9 +30,9 @@ import org.apache.activemq.broker.region
 import org.apache.activemq.command.Message;
 import org.apache.activemq.filter.NonCachedMessageEvaluationContext;
 import org.apache.activemq.openwire.OpenWireFormat;
-import org.apache.activemq.store.kahadb.plist.PList;
-import org.apache.activemq.store.kahadb.plist.PListEntry;
-import org.apache.activemq.store.kahadb.plist.PListStore;
+import org.apache.activemq.store.PList;
+import org.apache.activemq.store.PListStore;
+import org.apache.activemq.store.PListEntry;
 import org.apache.activemq.usage.SystemUsage;
 import org.apache.activemq.usage.Usage;
 import org.apache.activemq.usage.UsageListener;
@@ -454,7 +454,7 @@ public class FilePendingMessageCursor ex
         return diskList == null || diskList.isEmpty();
     }
 
-    protected PList getDiskList() {
+    public PList getDiskList() {
         if (diskList == null) {
             try {
                 diskList = store.getPList(name);

Added: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/JournaledStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/JournaledStore.java?rev=1406370&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/JournaledStore.java (added)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/JournaledStore.java Tue Nov  6 22:04:50 2012
@@ -0,0 +1,8 @@
+package org.apache.activemq.store;
+
+/**
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+public interface JournaledStore {
+    int getJournalMaxFileLength();
+}

Added: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/PList.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/PList.java?rev=1406370&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/PList.java (added)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/PList.java Tue Nov  6 22:04:50 2012
@@ -0,0 +1,41 @@
+package org.apache.activemq.store;
+
+import org.apache.activemq.util.ByteSequence;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+/**
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+public interface PList {
+    void setName(String name);
+
+    String getName();
+
+    void destroy() throws IOException;
+
+    void addLast(String id, ByteSequence bs) throws IOException;
+
+    void addFirst(String id, ByteSequence bs) throws IOException;
+
+    boolean remove(String id) throws IOException;
+
+    boolean remove(long position) throws IOException;
+
+    PListEntry get(long position) throws IOException;
+
+    PListEntry getFirst() throws IOException;
+
+    PListEntry getLast() throws IOException;
+
+    boolean isEmpty();
+
+    PListIterator iterator() throws IOException;
+
+    long size();
+
+    public interface PListIterator extends Iterator<PListEntry> {
+        void release();
+    }
+}

Copied: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/PListEntry.java (from r1406369, activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/plist/PListEntry.java)
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/PListEntry.java?p2=activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/PListEntry.java&p1=activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/plist/PListEntry.java&r1=1406369&r2=1406370&rev=1406370&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/plist/PListEntry.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/PListEntry.java Tue Nov  6 22:04:50 2012
@@ -14,7 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.activemq.store.kahadb.plist;
+package org.apache.activemq.store;
 
 import org.apache.activemq.util.ByteSequence;
 
@@ -23,7 +23,7 @@ public class PListEntry {
     private final ByteSequence byteSequence;
     private final String entry;
 
-    PListEntry(String entry, ByteSequence bs) {
+    public PListEntry(String entry, ByteSequence bs) {
         this.entry = entry;
         this.byteSequence = bs;
     }

Added: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/PListStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/PListStore.java?rev=1406370&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/PListStore.java (added)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/PListStore.java Tue Nov  6 22:04:50 2012
@@ -0,0 +1,21 @@
+package org.apache.activemq.store;
+
+import org.apache.activemq.Service;
+import org.apache.activemq.store.kahadb.plist.PListImpl;
+
+import java.io.File;
+
+/**
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+public interface PListStore extends Service {
+    File getDirectory();
+
+    void setDirectory(File directory);
+
+    PListImpl getPList(String name) throws Exception;
+
+    boolean removePList(String name) throws Exception;
+
+    long size();
+}

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapter.java?rev=1406370&r1=1406369&r2=1406370&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapter.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapter.java Tue Nov  6 22:04:50 2012
@@ -51,13 +51,7 @@ import org.apache.activemq.kaha.impl.asy
 import org.apache.activemq.kaha.impl.async.Location;
 import org.apache.activemq.kaha.impl.index.hash.HashIndex;
 import org.apache.activemq.openwire.OpenWireFormat;
-import org.apache.activemq.store.MessageStore;
-import org.apache.activemq.store.PersistenceAdapter;
-import org.apache.activemq.store.ReferenceStore;
-import org.apache.activemq.store.ReferenceStoreAdapter;
-import org.apache.activemq.store.TopicMessageStore;
-import org.apache.activemq.store.TopicReferenceStore;
-import org.apache.activemq.store.TransactionStore;
+import org.apache.activemq.store.*;
 import org.apache.activemq.store.kahadaptor.KahaReferenceStoreAdapter;
 import org.apache.activemq.thread.Scheduler;
 import org.apache.activemq.thread.Task;
@@ -82,7 +76,7 @@ import org.slf4j.LoggerFactory;
  * @org.apache.xbean.XBean element="amqPersistenceAdapter"
  * 
  */
-public class AMQPersistenceAdapter implements PersistenceAdapter, UsageListener, BrokerServiceAware {
+public class AMQPersistenceAdapter implements PersistenceAdapter, UsageListener, BrokerServiceAware, JournaledStore {
 
     private static final Logger LOG = LoggerFactory.getLogger(AMQPersistenceAdapter.class);
     private Scheduler scheduler;
@@ -1117,4 +1111,9 @@ public class AMQPersistenceAdapter imple
         // reference store send has adequate duplicate suppression
         return -1;
     }
+
+    @Override
+    public int getJournalMaxFileLength() {
+        return getMaxFileLength();
+    }
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapter.java?rev=1406370&r1=1406369&r2=1406370&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapter.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapter.java Tue Nov  6 22:04:50 2012
@@ -28,11 +28,7 @@ import org.apache.activemq.command.Trans
 import org.apache.activemq.command.XATransactionId;
 import org.apache.activemq.protobuf.Buffer;
 import org.apache.activemq.broker.Locker;
-import org.apache.activemq.store.MessageStore;
-import org.apache.activemq.store.PersistenceAdapter;
-import org.apache.activemq.store.SharedFileLocker;
-import org.apache.activemq.store.TopicMessageStore;
-import org.apache.activemq.store.TransactionStore;
+import org.apache.activemq.store.*;
 import org.apache.activemq.store.kahadb.data.KahaLocalTransactionId;
 import org.apache.activemq.store.kahadb.data.KahaTransactionInfo;
 import org.apache.activemq.store.kahadb.data.KahaXATransactionId;
@@ -50,7 +46,7 @@ import java.util.Set;
  * @org.apache.xbean.XBean element="kahaDB"
  *
  */
-public class KahaDBPersistenceAdapter extends LockableServiceSupport implements PersistenceAdapter {
+public class KahaDBPersistenceAdapter extends LockableServiceSupport implements PersistenceAdapter, JournaledStore {
     private final KahaDBStore letter = new KahaDBStore();
 
     /**

Copied: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/plist/PListImpl.java (from r1406369, activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/plist/PList.java)
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/plist/PListImpl.java?p2=activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/plist/PListImpl.java&p1=activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/plist/PList.java&r1=1406369&r2=1406370&rev=1406370&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/plist/PList.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/plist/PListImpl.java Tue Nov  6 22:04:50 2012
@@ -26,6 +26,8 @@ import java.util.Set;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
 
+import org.apache.activemq.store.PList;
+import org.apache.activemq.store.PListEntry;
 import org.apache.activemq.store.kahadb.disk.index.ListIndex;
 import org.apache.activemq.store.kahadb.disk.journal.Location;
 import org.apache.activemq.store.kahadb.disk.page.Transaction;
@@ -35,13 +37,13 @@ import org.apache.activemq.store.kahadb.
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class PList extends ListIndex<String, Location> {
-    static final Logger LOG = LoggerFactory.getLogger(PList.class);
-    final PListStore store;
+public class PListImpl extends ListIndex<String, Location> implements PList {
+    static final Logger LOG = LoggerFactory.getLogger(PListImpl.class);
+    final PListStoreImpl store;
     private String name;
     Object indexLock;
 
-    PList(PListStore store) {
+    PListImpl(PListStoreImpl store) {
         this.store = store;
         this.indexLock = store.getIndexLock();
         setPageFile(store.getPageFile());
@@ -49,10 +51,12 @@ public class PList extends ListIndex<Str
         setValueMarshaller(LocationMarshaller.INSTANCE);
     }
 
+    @Override
     public void setName(String name) {
         this.name = name;
     }
 
+    @Override
     public String getName() {
         return this.name;
     }
@@ -65,6 +69,7 @@ public class PList extends ListIndex<Str
         out.writeLong(getHeadPageId());
     }
 
+    @Override
     public synchronized void destroy() throws IOException {
         synchronized (indexLock) {
             this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() {
@@ -76,6 +81,7 @@ public class PList extends ListIndex<Str
         }
     }
 
+    @Override
     public void addLast(final String id, final ByteSequence bs) throws IOException {
         final Location location = this.store.write(bs, false);
         synchronized (indexLock) {
@@ -87,6 +93,7 @@ public class PList extends ListIndex<Str
         }
     }
 
+    @Override
     public void addFirst(final String id, final ByteSequence bs) throws IOException {
         final Location location = this.store.write(bs, false);
         synchronized (indexLock) {
@@ -98,6 +105,7 @@ public class PList extends ListIndex<Str
         }
     }
 
+    @Override
     public boolean remove(final String id) throws IOException {
         final AtomicBoolean result = new AtomicBoolean();
         synchronized (indexLock) {
@@ -110,6 +118,7 @@ public class PList extends ListIndex<Str
         return result.get();
     }
 
+    @Override
     public boolean remove(final long position) throws IOException {
         final AtomicBoolean result = new AtomicBoolean();
         synchronized (indexLock) {
@@ -129,6 +138,7 @@ public class PList extends ListIndex<Str
         return result.get();
     }
 
+    @Override
     public PListEntry get(final long position) throws IOException {
         PListEntry result = null;
         final AtomicReference<Map.Entry<String, Location>> ref = new AtomicReference<Map.Entry<String, Location>>();
@@ -147,6 +157,7 @@ public class PList extends ListIndex<Str
         return result;
     }
 
+    @Override
     public PListEntry getFirst() throws IOException {
         PListEntry result = null;
         final AtomicReference<Map.Entry<String, Location>> ref = new AtomicReference<Map.Entry<String, Location>>();
@@ -164,6 +175,7 @@ public class PList extends ListIndex<Str
         return result;
     }
 
+    @Override
     public PListEntry getLast() throws IOException {
         PListEntry result = null;
         final AtomicReference<Map.Entry<String, Location>> ref = new AtomicReference<Map.Entry<String, Location>>();
@@ -181,19 +193,21 @@ public class PList extends ListIndex<Str
         return result;
     }
 
+    @Override
     public boolean isEmpty() {
         return size() == 0;
     }
 
+    @Override
     public PListIterator iterator() throws IOException {
-        return new PListIterator();
+        return new PListIteratorImpl();
     }
 
-    public final class PListIterator implements Iterator<PListEntry> {
+    final class PListIteratorImpl implements PListIterator {
         final Iterator<Map.Entry<String, Location>> iterator;
         final Transaction tx;
 
-        PListIterator() throws IOException {
+        PListIteratorImpl() throws IOException {
             tx = store.pageFile.tx();
             synchronized (indexLock) {
                 this.iterator = iterator(tx);

Copied: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/plist/PListStoreImpl.java (from r1406369, activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/plist/PListStore.java)
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/plist/PListStoreImpl.java?p2=activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/plist/PListStoreImpl.java&p1=activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/plist/PListStore.java&r1=1406369&r2=1406370&rev=1406370&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/plist/PListStore.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/plist/PListStoreImpl.java Tue Nov  6 22:04:50 2012
@@ -16,42 +16,36 @@
  */
 package org.apache.activemq.store.kahadb.plist;
 
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.File;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Set;
-
 import org.apache.activemq.broker.BrokerService;
 import org.apache.activemq.broker.BrokerServiceAware;
-import org.apache.activemq.thread.Scheduler;
-import org.apache.activemq.util.IOHelper;
-import org.apache.activemq.util.ServiceStopper;
-import org.apache.activemq.util.ServiceSupport;
+import org.apache.activemq.store.JournaledStore;
+import org.apache.activemq.store.PList;
+import org.apache.activemq.store.PListStore;
 import org.apache.activemq.store.kahadb.disk.index.BTreeIndex;
 import org.apache.activemq.store.kahadb.disk.journal.Journal;
 import org.apache.activemq.store.kahadb.disk.journal.Location;
 import org.apache.activemq.store.kahadb.disk.page.Page;
 import org.apache.activemq.store.kahadb.disk.page.PageFile;
 import org.apache.activemq.store.kahadb.disk.page.Transaction;
-import org.apache.activemq.util.ByteSequence;
-import org.apache.activemq.util.LockFile;
 import org.apache.activemq.store.kahadb.disk.util.StringMarshaller;
 import org.apache.activemq.store.kahadb.disk.util.VariableMarshaller;
+import org.apache.activemq.thread.Scheduler;
+import org.apache.activemq.util.*;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.File;
+import java.io.IOException;
+import java.util.*;
+import java.util.Map.Entry;
+
 /**
  * @org.apache.xbean.XBean
  */
-public class PListStore extends ServiceSupport implements BrokerServiceAware, Runnable {
-    static final Logger LOG = LoggerFactory.getLogger(PListStore.class);
+public class PListStoreImpl extends ServiceSupport implements BrokerServiceAware, Runnable, PListStore, JournaledStore {
+    static final Logger LOG = LoggerFactory.getLogger(PListStoreImpl.class);
     private static final int DATABASE_LOCKED_WAIT_DELAY = 10 * 1000;
 
     static final int CLOSED_STATE = 1;
@@ -70,7 +64,7 @@ public class PListStore extends ServiceS
     // private int indexWriteBatchSize = PageFile.DEFAULT_WRITE_BATCH_SIZE;
     MetaData metaData = new MetaData(this);
     final MetaDataMarshaller metaDataMarshaller = new MetaDataMarshaller(this);
-    Map<String, PList> persistentLists = new HashMap<String, PList>();
+    Map<String, PListImpl> persistentLists = new HashMap<String, PListImpl>();
     final Object indexLock = new Object();
     private Scheduler scheduler;
     private long cleanupInterval = 30000;
@@ -122,16 +116,16 @@ public class PListStore extends ServiceS
     }
 
     protected class MetaData {
-        protected MetaData(PListStore store) {
+        protected MetaData(PListStoreImpl store) {
             this.store = store;
         }
 
-        private final PListStore store;
+        private final PListStoreImpl store;
         Page<MetaData> page;
-        BTreeIndex<String, PList> lists;
+        BTreeIndex<String, PListImpl> lists;
 
         void createIndexes(Transaction tx) throws IOException {
-            this.lists = new BTreeIndex<String, PList>(pageFile, tx.allocate().getPageId());
+            this.lists = new BTreeIndex<String, PListImpl>(pageFile, tx.allocate().getPageId());
         }
 
         void load(Transaction tx) throws IOException {
@@ -140,16 +134,16 @@ public class PListStore extends ServiceS
             this.lists.load(tx);
         }
 
-        void loadLists(Transaction tx, Map<String, PList> lists) throws IOException {
-            for (Iterator<Entry<String, PList>> i = this.lists.iterator(tx); i.hasNext();) {
-                Entry<String, PList> entry = i.next();
+        void loadLists(Transaction tx, Map<String, PListImpl> lists) throws IOException {
+            for (Iterator<Entry<String, PListImpl>> i = this.lists.iterator(tx); i.hasNext();) {
+                Entry<String, PListImpl> entry = i.next();
                 entry.getValue().load(tx);
                 lists.put(entry.getKey(), entry.getValue());
             }
         }
 
         public void read(DataInput is) throws IOException {
-            this.lists = new BTreeIndex<String, PList>(pageFile, is.readLong());
+            this.lists = new BTreeIndex<String, PListImpl>(pageFile, is.readLong());
             this.lists.setKeyMarshaller(StringMarshaller.INSTANCE);
             this.lists.setValueMarshaller(new PListMarshaller(this.store));
         }
@@ -160,9 +154,9 @@ public class PListStore extends ServiceS
     }
 
     class MetaDataMarshaller extends VariableMarshaller<MetaData> {
-        private final PListStore store;
+        private final PListStoreImpl store;
 
-        MetaDataMarshaller(PListStore store) {
+        MetaDataMarshaller(PListStoreImpl store) {
             this.store = store;
         }
         public MetaData readPayload(DataInput dataIn) throws IOException {
@@ -176,18 +170,18 @@ public class PListStore extends ServiceS
         }
     }
 
-    class PListMarshaller extends VariableMarshaller<PList> {
-        private final PListStore store;
-        PListMarshaller(PListStore store) {
+    class PListMarshaller extends VariableMarshaller<PListImpl> {
+        private final PListStoreImpl store;
+        PListMarshaller(PListStoreImpl store) {
             this.store = store;
         }
-        public PList readPayload(DataInput dataIn) throws IOException {
-            PList result = new PList(this.store);
+        public PListImpl readPayload(DataInput dataIn) throws IOException {
+            PListImpl result = new PListImpl(this.store);
             result.read(dataIn);
             return result;
         }
 
-        public void writePayload(PList list, DataOutput dataOut) throws IOException {
+        public void writePayload(PListImpl list, DataOutput dataOut) throws IOException {
             list.write(dataOut);
         }
     }
@@ -196,10 +190,12 @@ public class PListStore extends ServiceS
         return this.journal;
     }
 
+    @Override
     public File getDirectory() {
         return directory;
     }
 
+    @Override
     public void setDirectory(File directory) {
         this.directory = directory;
     }
@@ -217,16 +213,17 @@ public class PListStore extends ServiceS
         }
     }
 
-    public PList getPList(final String name) throws Exception {
+    @Override
+    public PListImpl getPList(final String name) throws Exception {
         if (!isStarted()) {
             throw new IllegalStateException("Not started");
         }
         intialize();
         synchronized (indexLock) {
             synchronized (this) {
-                PList result = this.persistentLists.get(name);
+                PListImpl result = this.persistentLists.get(name);
                 if (result == null) {
-                    final PList pl = new PList(this);
+                    final PListImpl pl = new PListImpl(this);
                     pl.setName(name);
                     getPageFile().tx().execute(new Transaction.Closure<IOException>() {
                         public void execute(Transaction tx) throws IOException {
@@ -238,7 +235,7 @@ public class PListStore extends ServiceS
                     result = pl;
                     this.persistentLists.put(name, pl);
                 }
-                final PList toLoad = result;
+                final PListImpl toLoad = result;
                 getPageFile().tx().execute(new Transaction.Closure<IOException>() {
                     public void execute(Transaction tx) throws IOException {
                         toLoad.load(tx);
@@ -250,6 +247,7 @@ public class PListStore extends ServiceS
         }
     }
 
+    @Override
     public boolean removePList(final String name) throws Exception {
         boolean result = false;
         synchronized (indexLock) {
@@ -312,7 +310,7 @@ public class PListStore extends ServiceS
 
                 if (cleanupInterval > 0) {
                     if (scheduler == null) {
-                        scheduler = new Scheduler(PListStore.class.getSimpleName());
+                        scheduler = new Scheduler(PListStoreImpl.class.getSimpleName());
                         scheduler.start();
                     }
                     scheduler.executePeriodically(this, cleanupInterval);
@@ -334,12 +332,12 @@ public class PListStore extends ServiceS
     @Override
     protected synchronized void doStop(ServiceStopper stopper) throws Exception {
         if (scheduler != null) {
-            if (PListStore.class.getSimpleName().equals(scheduler.getName())) {
+            if (PListStoreImpl.class.getSimpleName().equals(scheduler.getName())) {
                 scheduler.stop();
                 scheduler = null;
             }
         }
-        for (PList pl : this.persistentLists.values()) {
+        for (PListImpl pl : this.persistentLists.values()) {
             pl.unload(null);
         }
         if (this.pageFile != null) {
@@ -372,13 +370,13 @@ public class PListStore extends ServiceS
                         iterator.remove();
                     }
                 }
-                List<PList> plists = null;
+                List<PListImpl> plists = null;
                 synchronized (indexLock) {
                     synchronized (this) {
-                        plists = new ArrayList<PList>(persistentLists.values());
+                        plists = new ArrayList<PListImpl>(persistentLists.values());
                     }
                 }
-                for (PList list : plists) {
+                for (PListImpl list : plists) {
                     list.claimFileLocations(candidates);
                     if (isStopping()) {
                         return;

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/usage/SystemUsage.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/usage/SystemUsage.java?rev=1406370&r1=1406369&r2=1406370&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/usage/SystemUsage.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/usage/SystemUsage.java Tue Nov  6 22:04:50 2012
@@ -20,8 +20,8 @@ import java.util.List;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.ThreadPoolExecutor;
 import org.apache.activemq.Service;
+import org.apache.activemq.store.PListStore;
 import org.apache.activemq.store.PersistenceAdapter;
-import org.apache.activemq.store.kahadb.plist.PListStore;
 
 /**
  * Holder for Usage instances for memory, store and temp files Main use case is

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/usage/TempUsage.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/usage/TempUsage.java?rev=1406370&r1=1406369&r2=1406370&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/usage/TempUsage.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/usage/TempUsage.java Tue Nov  6 22:04:50 2012
@@ -16,8 +16,8 @@
  */
 package org.apache.activemq.usage;
 
-import org.apache.activemq.store.kahadb.plist.PListStore;
 
+import org.apache.activemq.store.PListStore;
 
 /**
  * Used to keep track of how much of something is being used so that a

Copied: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursorTestSupport.java (from r1406369, activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursorTest.java)
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursorTestSupport.java?p2=activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursorTestSupport.java&p1=activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursorTest.java&r1=1406369&r2=1406370&rev=1406370&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursorTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursorTestSupport.java Tue Nov  6 22:04:50 2012
@@ -26,7 +26,7 @@ import org.apache.activemq.broker.region
 import org.apache.activemq.command.ActiveMQMessage;
 import org.apache.activemq.command.ActiveMQQueue;
 import org.apache.activemq.command.MessageId;
-import org.apache.activemq.store.kahadb.plist.PList;
+import org.apache.activemq.store.PList;
 import org.apache.activemq.usage.SystemUsage;
 import org.apache.activemq.store.kahadb.disk.page.PageFile;
 import org.apache.activemq.util.ByteSequence;
@@ -40,10 +40,11 @@ import static org.junit.Assert.assertEqu
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 
-public class FilePendingMessageCursorTest {
-    private static final Logger LOG = LoggerFactory.getLogger(FilePendingMessageCursorTest.class);
-    BrokerService brokerService;
-    FilePendingMessageCursor underTest;
+public class FilePendingMessageCursorTestSupport {
+
+    protected static final Logger LOG = LoggerFactory.getLogger(FilePendingMessageCursorTestSupport.class);
+    protected BrokerService brokerService;
+    protected  FilePendingMessageCursor underTest;
 
     @After
     public void stopBroker() throws Exception {
@@ -77,59 +78,4 @@ public class FilePendingMessageCursorTes
         assertFalse("cursor is not full", underTest.isFull());
     }
 
-    @Test
-    public void testAddRemoveAddIndexSize() throws Exception {
-        brokerService = new BrokerService();
-        SystemUsage usage = brokerService.getSystemUsage();
-        usage.getMemoryUsage().setLimit(1024*150);
-        String body = new String(new byte[1024]);
-        Destination destination = new Queue(brokerService, new ActiveMQQueue("Q"), null, new DestinationStatistics(), null);
-
-        underTest = new FilePendingMessageCursor(brokerService.getBroker(), "test", false);
-        underTest.setSystemUsage(usage);
-
-        LOG.info("start");
-        final PageFile pageFile =  underTest.getDiskList().getPageFile();
-        LOG.info("page count: " +pageFile.getPageCount());
-        LOG.info("free count: " + pageFile.getFreePageCount());
-        LOG.info("content size: " +pageFile.getPageContentSize());
-
-        final long initialPageCount =  pageFile.getPageCount();
-
-        final int numMessages = 1000;
-
-        for (int j=0; j<10; j++) {
-            // ensure free pages are reused
-            for (int i=0; i< numMessages; i++) {
-                ActiveMQMessage mqMessage = new ActiveMQMessage();
-                mqMessage.setStringProperty("body", body);
-                mqMessage.setMessageId(new MessageId("1:2:3:" + i));
-                mqMessage.setMemoryUsage(usage.getMemoryUsage());
-                mqMessage.setRegionDestination(destination);
-                underTest.addMessageLast(new IndirectMessageReference(mqMessage));
-            }
-            assertFalse("cursor is not full " + usage.getTempUsage(), underTest.isFull());
-
-            underTest.reset();
-            long receivedCount = 0;
-            while(underTest.hasNext()) {
-                MessageReference ref = underTest.next();
-                underTest.remove();
-                assertEquals("id is correct", receivedCount++, ref.getMessageId().getProducerSequenceId());
-            }
-            assertEquals("got all messages back", receivedCount, numMessages);
-            LOG.info("page count: " +pageFile.getPageCount());
-            LOG.info("free count: " + pageFile.getFreePageCount());
-            LOG.info("content size: " + pageFile.getPageContentSize());
-        }
-
-        assertEquals("expected page usage", initialPageCount, pageFile.getPageCount() - pageFile.getFreePageCount() );
-
-        LOG.info("Destroy");
-        underTest.destroy();
-        LOG.info("page count: " + pageFile.getPageCount());
-        LOG.info("free count: " + pageFile.getFreePageCount());
-        LOG.info("content size: " + pageFile.getPageContentSize());
-        assertEquals("expected page usage", initialPageCount -1, pageFile.getPageCount() - pageFile.getFreePageCount() );
-    }
 }

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/TempStorageBlockedBrokerTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/TempStorageBlockedBrokerTest.java?rev=1406370&r1=1406369&r2=1406370&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/TempStorageBlockedBrokerTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/TempStorageBlockedBrokerTest.java Tue Nov  6 22:04:50 2012
@@ -37,6 +37,7 @@ import org.apache.activemq.broker.region
 import org.apache.activemq.broker.region.policy.PolicyMap;
 import org.apache.activemq.command.ActiveMQQueue;
 import org.apache.activemq.command.ActiveMQTopic;
+import org.apache.activemq.store.kahadb.plist.PListStoreImpl;
 import org.apache.activemq.usage.MemoryUsage;
 import org.apache.activemq.usage.StoreUsage;
 import org.apache.activemq.usage.SystemUsage;
@@ -157,7 +158,7 @@ public class TempStorageBlockedBrokerTes
                 + broker.getSystemUsage().getTempUsage().getUsage());
 
         // do a cleanup
-        broker.getTempDataStore().run();
+        ((PListStoreImpl)broker.getTempDataStore()).run();
         LOG.info("Subscrition Usage: " + tempUsageBySubscription + ", endUsage: "
                         + broker.getSystemUsage().getTempUsage().getUsage());
 

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/TempStorageConfigBrokerTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/TempStorageConfigBrokerTest.java?rev=1406370&r1=1406369&r2=1406370&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/TempStorageConfigBrokerTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/TempStorageConfigBrokerTest.java Tue Nov  6 22:04:50 2012
@@ -38,6 +38,7 @@ import org.apache.activemq.broker.region
 import org.apache.activemq.command.ActiveMQQueue;
 import org.apache.activemq.command.ActiveMQTopic;
 import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
+import org.apache.activemq.store.kahadb.plist.PListStoreImpl;
 import org.junit.After;
 import org.junit.Ignore;
 import org.junit.Test;
@@ -164,7 +165,7 @@ public class TempStorageConfigBrokerTest
         broker.getSystemUsage().setSendFailIfNoSpace(true);
         broker.getSystemUsage().getMemoryUsage().setLimit(1048576);
         broker.getSystemUsage().getTempUsage().setLimit(2*1048576);
-        broker.getSystemUsage().getTempUsage().getStore().setJournalMaxFileLength(2*1048576);
+        ((PListStoreImpl)broker.getSystemUsage().getTempUsage().getStore()).setJournalMaxFileLength(2 * 1048576);
         broker.getSystemUsage().getStoreUsage().setLimit(20*1048576);
 
         PolicyEntry defaultPolicy = new PolicyEntry();

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/TempStoreDataCleanupTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/TempStoreDataCleanupTest.java?rev=1406370&r1=1406369&r2=1406370&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/TempStoreDataCleanupTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/TempStoreDataCleanupTest.java Tue Nov  6 22:04:50 2012
@@ -41,7 +41,7 @@ import org.apache.activemq.broker.region
 import org.apache.activemq.broker.region.policy.PolicyMap;
 import org.apache.activemq.broker.region.policy.SharedDeadLetterStrategy;
 import org.apache.activemq.command.ActiveMQQueue;
-import org.apache.activemq.store.kahadb.plist.PListStore;
+import org.apache.activemq.store.kahadb.plist.PListStoreImpl;
 import org.apache.activemq.util.Wait;
 import org.junit.After;
 import org.junit.Before;
@@ -148,7 +148,7 @@ public class TempStoreDataCleanupTest {
 
         LOG.info("MemoryUseage before awaiting temp store cleanup = " + broker.getAdminView().getMemoryPercentUsage());
 
-        final PListStore pa = broker.getTempDataStore();
+        final PListStoreImpl pa = (PListStoreImpl) broker.getTempDataStore();
         assertTrue("only one journal file should be left: " + pa.getJournal().getFileMap().size(),
             Wait.waitFor(new Wait.Condition() {
 

Copied: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/kahadb/plist/KahaDBFilePendingMessageCursorTest.java (from r1406369, activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursorTest.java)
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/kahadb/plist/KahaDBFilePendingMessageCursorTest.java?p2=activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/kahadb/plist/KahaDBFilePendingMessageCursorTest.java&p1=activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursorTest.java&r1=1406369&r2=1406370&rev=1406370&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursorTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/kahadb/plist/KahaDBFilePendingMessageCursorTest.java Tue Nov  6 22:04:50 2012
@@ -1,81 +1,23 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.broker.region.cursors;
+package org.apache.activemq.store.kahadb.plist;
 
 import org.apache.activemq.broker.BrokerService;
-import org.apache.activemq.broker.region.Destination;
-import org.apache.activemq.broker.region.DestinationStatistics;
-import org.apache.activemq.broker.region.IndirectMessageReference;
-import org.apache.activemq.broker.region.MessageReference;
-import org.apache.activemq.broker.region.Queue;
-import org.apache.activemq.broker.region.QueueMessageReference;
+import org.apache.activemq.broker.region.*;
+import org.apache.activemq.broker.region.cursors.FilePendingMessageCursor;
+import org.apache.activemq.broker.region.cursors.FilePendingMessageCursorTestSupport;
 import org.apache.activemq.command.ActiveMQMessage;
 import org.apache.activemq.command.ActiveMQQueue;
 import org.apache.activemq.command.MessageId;
-import org.apache.activemq.store.kahadb.plist.PList;
-import org.apache.activemq.usage.SystemUsage;
 import org.apache.activemq.store.kahadb.disk.page.PageFile;
-import org.apache.activemq.util.ByteSequence;
-import org.junit.After;
+import org.apache.activemq.usage.SystemUsage;
 import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-
-public class FilePendingMessageCursorTest {
-    private static final Logger LOG = LoggerFactory.getLogger(FilePendingMessageCursorTest.class);
-    BrokerService brokerService;
-    FilePendingMessageCursor underTest;
-
-    @After
-    public void stopBroker() throws Exception {
-        if (brokerService != null) {
-            brokerService.getTempDataStore().stop();
-        }
-    }
-
-    private void createBrokerWithTempStoreLimit() throws Exception {
-        brokerService = new BrokerService();
-        SystemUsage usage = brokerService.getSystemUsage();
-        usage.getTempUsage().setLimit(1025*1024*15);
-
-        // put something in the temp store to on demand initialise it
-        PList dud = brokerService.getTempDataStore().getPList("dud");
-        dud.addFirst("A", new ByteSequence("A".getBytes()));
-    }
 
-    @Test
-    public void testAddToEmptyCursorWhenTempStoreIsFull() throws Exception {
-        createBrokerWithTempStoreLimit();
-        SystemUsage usage = brokerService.getSystemUsage();
-        assertTrue("temp store is full: %" + usage.getTempUsage().getPercentUsage(), usage.getTempUsage().isFull());
-
-        underTest = new FilePendingMessageCursor(brokerService.getBroker(), "test", false);
-        underTest.setSystemUsage(usage);
-
-        // ok to add
-        underTest.addMessageLast(QueueMessageReference.NULL_MESSAGE);
-
-        assertFalse("cursor is not full", underTest.isFull());
-    }
+/**
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+public class KahaDBFilePendingMessageCursorTest extends FilePendingMessageCursorTestSupport {
 
     @Test
     public void testAddRemoveAddIndexSize() throws Exception {
@@ -89,7 +31,7 @@ public class FilePendingMessageCursorTes
         underTest.setSystemUsage(usage);
 
         LOG.info("start");
-        final PageFile pageFile =  underTest.getDiskList().getPageFile();
+        final PageFile pageFile =  ((PListImpl)underTest.getDiskList()).getPageFile();
         LOG.info("page count: " +pageFile.getPageCount());
         LOG.info("free count: " + pageFile.getFreePageCount());
         LOG.info("content size: " +pageFile.getPageContentSize());
@@ -132,4 +74,6 @@ public class FilePendingMessageCursorTes
         LOG.info("content size: " + pageFile.getPageContentSize());
         assertEquals("expected page usage", initialPageCount -1, pageFile.getPageCount() - pageFile.getFreePageCount() );
     }
+
+
 }

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/kahadb/plist/PListTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/kahadb/plist/PListTest.java?rev=1406370&r1=1406369&r2=1406370&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/kahadb/plist/PListTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/kahadb/plist/PListTest.java Tue Nov  6 22:04:50 2012
@@ -31,6 +31,8 @@ import java.util.concurrent.ExecutorServ
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 
+import org.apache.activemq.store.PList;
+import org.apache.activemq.store.PListEntry;
 import org.apache.activemq.util.IOHelper;
 import org.apache.activemq.util.ByteSequence;
 import org.junit.After;
@@ -41,8 +43,8 @@ import org.slf4j.LoggerFactory;
 
 public class PListTest {
     static final Logger LOG = LoggerFactory.getLogger(PListTest.class);
-    private PListStore store;
-    private PList plist;
+    private PListStoreImpl store;
+    private PListImpl plist;
     final ByteSequence payload = new ByteSequence(new byte[400]);
     final String idSeed = new String("Seed" + new byte[1024]);
     final Vector<Throwable> exceptions = new Vector<Throwable>();
@@ -173,7 +175,7 @@ public class PListTest {
         store.stop();
         IOHelper.mkdirs(directory);
         IOHelper.deleteChildren(directory);
-        store = new PListStore();
+        store = new PListStoreImpl();
         store.setCleanupInterval(400);
         store.setDirectory(directory);
         store.setJournalMaxFileLength(1024*5);
@@ -263,7 +265,7 @@ public class PListTest {
         store.stop();
         IOHelper.mkdirs(directory);
         IOHelper.deleteChildren(directory);
-        store = new PListStore();
+        store = new PListStoreImpl();
         store.setDirectory(directory);
         store.start();
 
@@ -294,7 +296,7 @@ public class PListTest {
         store.stop();
         IOHelper.mkdirs(directory);
         IOHelper.deleteChildren(directory);
-        store = new PListStore();
+        store = new PListStoreImpl();
         store.setDirectory(directory);
         store.start();
 
@@ -312,7 +314,7 @@ public class PListTest {
         store.stop();
         IOHelper.mkdirs(directory);
         IOHelper.deleteChildren(directory);
-        store = new PListStore();
+        store = new PListStoreImpl();
         store.setDirectory(directory);
         store.setJournalMaxFileLength(1024*5);
         store.setCleanupInterval(5000);
@@ -394,7 +396,7 @@ public class PListTest {
         store.stop();
         IOHelper.mkdirs(directory);
         IOHelper.deleteChildren(directory);
-        store = new PListStore();
+        store = new PListStoreImpl();
         store.setIndexEnablePageCaching(enablePageCache);
         store.setIndexPageSize(2*1024);
         store.setDirectory(directory);
@@ -443,7 +445,7 @@ public class PListTest {
         store.stop();
         IOHelper.mkdirs(directory);
         IOHelper.deleteChildren(directory);
-        store = new PListStore();
+        store = new PListStoreImpl();
         store.setIndexPageSize(2*1024);
         store.setJournalMaxFileLength(1024*1024);
         store.setDirectory(directory);
@@ -502,7 +504,7 @@ public class PListTest {
         public void run() {
             final String threadName = Thread.currentThread().getName();
             try {
-                PList plist = null;
+                PListImpl plist = null;
                 switch (task) {
                     case CREATE:
                         Thread.currentThread().setName("C:"+id);
@@ -625,7 +627,7 @@ public class PListTest {
     }
 
     protected void startStore(File directory) throws Exception {
-        store = new PListStore();
+        store = new PListStoreImpl();
         store.setDirectory(directory);
         store.start();
         plist = store.getPList("main");



Mime
View raw message