activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tab...@apache.org
Subject svn commit: r1416959 - in /activemq/trunk/activemq-amq-store/src/main/java/org/apache/activemq/store: amq/AMQPersistenceAdapter.java kahadaptor/KahaPersistenceAdapter.java
Date Tue, 04 Dec 2012 14:24:08 GMT
Author: tabish
Date: Tue Dec  4 14:24:07 2012
New Revision: 1416959

URL: http://svn.apache.org/viewvc?rev=1416959&view=rev
Log:
Tag these as deprecated.  https://issues.apache.org/jira/browse/AMQ-4125

Modified:
    activemq/trunk/activemq-amq-store/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapter.java
    activemq/trunk/activemq-amq-store/src/main/java/org/apache/activemq/store/kahadaptor/KahaPersistenceAdapter.java

Modified: activemq/trunk/activemq-amq-store/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-amq-store/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapter.java?rev=1416959&r1=1416958&r2=1416959&view=diff
==============================================================================
--- activemq/trunk/activemq-amq-store/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapter.java
(original)
+++ activemq/trunk/activemq-amq-store/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapter.java
Tue Dec  4 14:24:07 2012
@@ -30,6 +30,7 @@ import java.util.concurrent.CountDownLat
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
+
 import org.apache.activeio.journal.Journal;
 import org.apache.activemq.broker.BrokerService;
 import org.apache.activemq.broker.BrokerServiceAware;
@@ -50,7 +51,14 @@ import org.apache.activemq.kaha.impl.asy
 import org.apache.activemq.kaha.impl.async.Location;
 import org.apache.activemq.kaha.impl.index.hash.HashIndex;
 import org.apache.activemq.openwire.OpenWireFormat;
-import org.apache.activemq.store.*;
+import org.apache.activemq.store.JournaledStore;
+import org.apache.activemq.store.MessageStore;
+import org.apache.activemq.store.PersistenceAdapter;
+import org.apache.activemq.store.ReferenceStore;
+import org.apache.activemq.store.ReferenceStoreAdapter;
+import org.apache.activemq.store.TopicMessageStore;
+import org.apache.activemq.store.TopicReferenceStore;
+import org.apache.activemq.store.TransactionStore;
 import org.apache.activemq.store.kahadaptor.KahaReferenceStoreAdapter;
 import org.apache.activemq.thread.Scheduler;
 import org.apache.activemq.thread.Task;
@@ -71,10 +79,11 @@ import org.slf4j.LoggerFactory;
  * An implementation of {@link PersistenceAdapter} designed for use with a
  * {@link Journal} and then check pointing asynchronously on a timeout with some
  * other long term persistent storage.
- * 
+ *
  * @org.apache.xbean.XBean element="amqPersistenceAdapter"
- * 
+ *
  */
+@Deprecated
 public class AMQPersistenceAdapter implements PersistenceAdapter, UsageListener, BrokerServiceAware,
JournaledStore {
 
     private static final Logger LOG = LoggerFactory.getLogger(AMQPersistenceAdapter.class);
@@ -120,7 +129,7 @@ public class AMQPersistenceAdapter imple
     private RandomAccessFile lockFile;
     private FileLock lock;
     private boolean disableLocking = DISABLE_LOCKING;
-	private boolean failIfJournalIsLocked;
+    private boolean failIfJournalIsLocked;
     private boolean lockLogged;
     private boolean lockAquired;
     private boolean recoverReferenceStore=true;
@@ -132,6 +141,7 @@ public class AMQPersistenceAdapter imple
         return this.brokerName;
     }
 
+    @Override
     public void setBrokerName(String brokerName) {
         this.brokerName = brokerName;
         if (this.referenceStoreAdapter != null) {
@@ -143,10 +153,12 @@ public class AMQPersistenceAdapter imple
         return brokerService;
     }
 
+    @Override
     public void setBrokerService(BrokerService brokerService) {
         this.brokerService = brokerService;
     }
 
+    @Override
     public synchronized void start() throws Exception {
         if (!started.compareAndSet(false, true)) {
             return;
@@ -154,7 +166,7 @@ public class AMQPersistenceAdapter imple
         if (this.directory == null) {
             if (brokerService != null) {
                 this.directory = brokerService.getBrokerDataDirectory();
-               
+
             } else {
                 this.directory = new File(IOHelper.getDefaultDataDirectory(), IOHelper.toFileSystemSafeName(brokerName));
                 this.directory = new File(directory, "amqstore");
@@ -176,7 +188,7 @@ public class AMQPersistenceAdapter imple
         IOHelper.mkdirs(this.directory);
         lockFile = new RandomAccessFile(new File(directory, "lock"), "rw");
         lock();
-        LOG.info("AMQStore starting using directory: " + directory); 
+        LOG.info("AMQStore starting using directory: " + directory);
         if (archiveDataLogs) {
             IOHelper.mkdirs(this.directoryArchive);
         }
@@ -194,7 +206,7 @@ public class AMQPersistenceAdapter imple
         referenceStoreAdapter.setBrokerName(getBrokerName());
         referenceStoreAdapter.setUsageManager(usageManager);
         referenceStoreAdapter.setMaxDataFileLength(getMaxReferenceFileLength());
-        
+
         if (failIfJournalIsLocked) {
             asyncDataManager.lock();
         } else {
@@ -211,7 +223,7 @@ public class AMQPersistenceAdapter imple
                 }
             }
         }
-        
+
         asyncDataManager.start();
         if (deleteAllMessages) {
             asyncDataManager.delete();
@@ -234,6 +246,7 @@ public class AMQPersistenceAdapter imple
         LOG.info("Active data files: " + files);
         checkpointTask = taskRunnerFactory.createTaskRunner(new Task() {
 
+            @Override
             public boolean iterate() {
                 doCheckpoint();
                 return false;
@@ -268,6 +281,7 @@ public class AMQPersistenceAdapter imple
         // Do a checkpoint periodically.
         periodicCheckpointTask = new Runnable() {
 
+            @Override
             public void run() {
                 checkpoint(false);
             }
@@ -275,12 +289,13 @@ public class AMQPersistenceAdapter imple
         scheduler.executePeriodically(periodicCheckpointTask, getCheckpointInterval());
         periodicCleanupTask = new Runnable() {
 
+            @Override
             public void run() {
                 cleanup();
             }
         };
         scheduler.executePeriodically(periodicCleanupTask, getCleanupInterval());
-        
+
         if (lockAquired && lockLogged) {
             LOG.info("Aquired lock for AMQ Store" + getDirectory());
             if (brokerService != null) {
@@ -290,6 +305,7 @@ public class AMQPersistenceAdapter imple
 
     }
 
+    @Override
     public void stop() throws Exception {
 
         if (!started.compareAndSet(true, false)) {
@@ -344,9 +360,10 @@ public class AMQPersistenceAdapter imple
 
     /**
      * When we checkpoint we move all the journalled data to long term storage.
-     * 
+     *
      * @param sync
      */
+    @Override
     public void checkpoint(boolean sync) {
         try {
             if (asyncDataManager == null) {
@@ -374,7 +391,7 @@ public class AMQPersistenceAdapter imple
 
     /**
      * This does the actual checkpoint.
-     * 
+     *
      * @return true if successful
      */
     public boolean doCheckpoint() {
@@ -435,11 +452,11 @@ public class AMQPersistenceAdapter imple
             Set<Integer>inProgress = new HashSet<Integer>();
             if (LOG.isDebugEnabled()) {
                 LOG.debug("dataFilesInProgress.values: (" + dataFilesInProgress.values().size()
+ ") " + dataFilesInProgress.values());
-            }      
+            }
             for (Map<Integer, AtomicInteger> set: dataFilesInProgress.values()) {
                 inProgress.addAll(set.keySet());
             }
-            Integer lastDataFile = asyncDataManager.getCurrentDataFileId();   
+            Integer lastDataFile = asyncDataManager.getCurrentDataFileId();
             inProgress.add(lastDataFile);
             lastDataFile = asyncDataManager.getMark().getDataFileId();
             inProgress.addAll(referenceStoreAdapter.getReferenceFileIdsInUse());
@@ -454,6 +471,7 @@ public class AMQPersistenceAdapter imple
         }
     }
 
+    @Override
     public Set<ActiveMQDestination> getDestinations() {
         Set<ActiveMQDestination> destinations = new HashSet<ActiveMQDestination>(referenceStoreAdapter.getDestinations());
         destinations.addAll(queues.keySet());
@@ -469,6 +487,7 @@ public class AMQPersistenceAdapter imple
         }
     }
 
+    @Override
     public MessageStore createQueueMessageStore(ActiveMQQueue destination) throws IOException
{
         AMQMessageStore store = queues.get(destination);
         if (store == null) {
@@ -484,6 +503,7 @@ public class AMQPersistenceAdapter imple
         return store;
     }
 
+    @Override
     public TopicMessageStore createTopicMessageStore(ActiveMQTopic destinationName) throws
IOException {
         AMQTopicMessageStore store = topics.get(destinationName);
         if (store == null) {
@@ -504,6 +524,7 @@ public class AMQPersistenceAdapter imple
      *
      * @param destination
      */
+    @Override
     public void removeQueueMessageStore(ActiveMQQueue destination) {
         AMQMessageStore store= queues.remove(destination);
         referenceStoreAdapter.removeQueueMessageStore(destination);
@@ -514,37 +535,43 @@ public class AMQPersistenceAdapter imple
      *
      * @param destination
      */
+    @Override
     public void removeTopicMessageStore(ActiveMQTopic destination) {
         topics.remove(destination);
     }
 
+    @Override
     public TransactionStore createTransactionStore() throws IOException {
         return transactionStore;
     }
 
+    @Override
     public long getLastMessageBrokerSequenceId() throws IOException {
         return referenceStoreAdapter.getLastMessageBrokerSequenceId();
     }
 
+    @Override
     public void beginTransaction(ConnectionContext context) throws IOException {
         referenceStoreAdapter.beginTransaction(context);
     }
 
+    @Override
     public void commitTransaction(ConnectionContext context) throws IOException {
         referenceStoreAdapter.commitTransaction(context);
     }
 
+    @Override
     public void rollbackTransaction(ConnectionContext context) throws IOException {
         referenceStoreAdapter.rollbackTransaction(context);
     }
-    
+
     public boolean isPersistentIndex() {
-		return persistentIndex;
-	}
+        return persistentIndex;
+    }
 
-	public void setPersistentIndex(boolean persistentIndex) {
-		this.persistentIndex = persistentIndex;
-	}
+    public void setPersistentIndex(boolean persistentIndex) {
+        this.persistentIndex = persistentIndex;
+    }
 
     /**
      * @param location
@@ -563,7 +590,7 @@ public class AMQPersistenceAdapter imple
     /**
      * Move all the messages that were in the journal into long term storage. We
      * just replay and do a checkpoint.
-     * 
+     *
      * @throws IOException
      * @throws IOException
      * @throws IllegalStateException
@@ -696,14 +723,14 @@ public class AMQPersistenceAdapter imple
     public Location writeCommand(DataStructure command, boolean syncHint) throws IOException
{
         return writeCommand(command, syncHint,false);
     }
-    
+
     public Location writeCommand(DataStructure command, boolean syncHint,boolean forceSync)
throws IOException {
-    	try {
-    		return asyncDataManager.write(wireFormat.marshal(command), (forceSync||(syncHint &&
syncOnWrite)));
-    	} catch (IOException ioe) {
-    		LOG.error("Failed to write command: " + command + ". Reason: " + ioe, ioe);
-        	brokerService.handleIOException(ioe);
-        	throw ioe;
+        try {
+            return asyncDataManager.write(wireFormat.marshal(command), (forceSync||(syncHint
&& syncOnWrite)));
+        } catch (IOException ioe) {
+            LOG.error("Failed to write command: " + command + ". Reason: " + ioe, ioe);
+            brokerService.handleIOException(ioe);
+            throw ioe;
         }
     }
 
@@ -713,6 +740,7 @@ public class AMQPersistenceAdapter imple
         return writeCommand(trace, sync);
     }
 
+    @Override
     public void onUsageChanged(Usage usage, int oldPercentUsage, int newPercentUsage) {
         newPercentUsage = (newPercentUsage / 10) * 10;
         oldPercentUsage = (oldPercentUsage / 10) * 10;
@@ -725,6 +753,7 @@ public class AMQPersistenceAdapter imple
         return transactionStore;
     }
 
+    @Override
     public synchronized void deleteAllMessages() throws IOException {
         deleteAllMessages = true;
     }
@@ -743,7 +772,7 @@ public class AMQPersistenceAdapter imple
         manager.setDirectoryArchive(getDirectoryArchive());
         manager.setArchiveDataLogs(isArchiveDataLogs());
         manager.setMaxFileLength(maxFileLength);
-        manager.setUseNio(useNio);    
+        manager.setUseNio(useNio);
         return manager;
     }
 
@@ -796,6 +825,7 @@ public class AMQPersistenceAdapter imple
         return usageManager;
     }
 
+    @Override
     public void setUsageManager(SystemUsage usageManager) {
         this.usageManager = usageManager;
     }
@@ -804,7 +834,7 @@ public class AMQPersistenceAdapter imple
         return maxCheckpointMessageAddSize;
     }
 
-    /** 
+    /**
      * When set using Xbean, values of the form "20 Mb", "1024kb", and "1g" can be used
      * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.MemoryIntPropertyEditor"
      */
@@ -812,11 +842,13 @@ public class AMQPersistenceAdapter imple
         this.maxCheckpointMessageAddSize = maxCheckpointMessageAddSize;
     }
 
-   
+
+    @Override
     public synchronized File getDirectory() {
         return directory;
     }
 
+    @Override
     public synchronized void setDirectory(File directory) {
         this.directory = directory;
     }
@@ -828,39 +860,40 @@ public class AMQPersistenceAdapter imple
     public void setSyncOnWrite(boolean syncOnWrite) {
         this.syncOnWrite = syncOnWrite;
     }
-    
+
     /**
      * @param referenceStoreAdapter the referenceStoreAdapter to set
      */
     public void setReferenceStoreAdapter(ReferenceStoreAdapter referenceStoreAdapter) {
         this.referenceStoreAdapter = referenceStoreAdapter;
     }
-    
+
+    @Override
     public long size(){
         return storeSize.get();
     }
 
-	public boolean isUseNio() {
-		return useNio;
-	}
-
-	public void setUseNio(boolean useNio) {
-		this.useNio = useNio;
-	}
-
-	public int getMaxFileLength() {
-		return maxFileLength;
-	}
+    public boolean isUseNio() {
+        return useNio;
+    }
+
+    public void setUseNio(boolean useNio) {
+        this.useNio = useNio;
+    }
+
+    public int getMaxFileLength() {
+        return maxFileLength;
+    }
 
-	 /**
+     /**
       * When set using Xbean, values of the form "20 Mb", "1024kb", and "1g" can be used
       * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.MemoryPropertyEditor"
       */
-	public void setMaxFileLength(int maxFileLength) {
-		this.maxFileLength = maxFileLength;
-	}
-	
-	public long getCleanupInterval() {
+    public void setMaxFileLength(int maxFileLength) {
+        this.maxFileLength = maxFileLength;
+    }
+
+    public long getCleanupInterval() {
         return cleanupInterval;
     }
 
@@ -875,7 +908,7 @@ public class AMQPersistenceAdapter imple
     public void setCheckpointInterval(long checkpointInterval) {
         this.checkpointInterval = checkpointInterval;
     }
-    
+
     public int getIndexBinSize() {
         return indexBinSize;
     }
@@ -895,7 +928,7 @@ public class AMQPersistenceAdapter imple
     public int getIndexPageSize() {
         return indexPageSize;
     }
-    
+
     public int getIndexMaxBinSize() {
         return indexMaxBinSize;
     }
@@ -911,15 +944,15 @@ public class AMQPersistenceAdapter imple
     public void setIndexPageSize(int indexPageSize) {
         this.indexPageSize = indexPageSize;
     }
-    
+
     public void setIndexLoadFactor(int factor){
-    	this.indexLoadFactor=factor;    
+        this.indexLoadFactor=factor;
     }
-    
+
     public int getIndexLoadFactor(){
-    	return this.indexLoadFactor;
+        return this.indexLoadFactor;
     }
-    
+
     public int getMaxReferenceFileLength() {
         return maxReferenceFileLength;
     }
@@ -931,7 +964,7 @@ public class AMQPersistenceAdapter imple
     public void setMaxReferenceFileLength(int maxReferenceFileLength) {
         this.maxReferenceFileLength = maxReferenceFileLength;
     }
-    
+
     public File getDirectoryArchive() {
         return directoryArchive;
     }
@@ -946,8 +979,8 @@ public class AMQPersistenceAdapter imple
 
     public void setArchiveDataLogs(boolean archiveDataLogs) {
         this.archiveDataLogs = archiveDataLogs;
-    }  
-    
+    }
+
     public boolean isDisableLocking() {
         return disableLocking;
     }
@@ -955,7 +988,7 @@ public class AMQPersistenceAdapter imple
     public void setDisableLocking(boolean disableLocking) {
         this.disableLocking = disableLocking;
     }
-    
+
     /**
      * @return the recoverReferenceStore
      */
@@ -983,15 +1016,15 @@ public class AMQPersistenceAdapter imple
     public void setForceRecoverReferenceStore(boolean forceRecoverReferenceStore) {
         this.forceRecoverReferenceStore = forceRecoverReferenceStore;
     }
-    
+
     public boolean isUseDedicatedTaskRunner() {
         return useDedicatedTaskRunner;
     }
-    
+
     public void setUseDedicatedTaskRunner(boolean useDedicatedTaskRunner) {
         this.useDedicatedTaskRunner = useDedicatedTaskRunner;
     }
-    
+
     /**
      * @return the journalThreadPriority
      */
@@ -1006,27 +1039,27 @@ public class AMQPersistenceAdapter imple
         this.journalThreadPriority = journalThreadPriority;
     }
 
-	
-	protected void addInProgressDataFile(AMQMessageStore store,int dataFileId) {
-	    Map<Integer, AtomicInteger> map = dataFilesInProgress.get(store);
-	    if (map == null) {
-	        map = new ConcurrentHashMap<Integer, AtomicInteger>();
-	        dataFilesInProgress.put(store, map);
-	    }
-	    AtomicInteger count = map.get(dataFileId);
-	    if (count == null) {
-	        count = new AtomicInteger(0);
-	        map.put(dataFileId, count);
-	    }
-	    count.incrementAndGet();
-	}
-	
-	protected void removeInProgressDataFile(AMQMessageStore store,int dataFileId) {
+
+    protected void addInProgressDataFile(AMQMessageStore store,int dataFileId) {
+        Map<Integer, AtomicInteger> map = dataFilesInProgress.get(store);
+        if (map == null) {
+            map = new ConcurrentHashMap<Integer, AtomicInteger>();
+            dataFilesInProgress.put(store, map);
+        }
+        AtomicInteger count = map.get(dataFileId);
+        if (count == null) {
+            count = new AtomicInteger(0);
+            map.put(dataFileId, count);
+        }
+        count.incrementAndGet();
+    }
+
+    protected void removeInProgressDataFile(AMQMessageStore store,int dataFileId) {
         Map<Integer, AtomicInteger> map = dataFilesInProgress.get(store);
         if (map != null) {
             AtomicInteger count = map.get(dataFileId);
             if (count != null) {
-                int newCount = count.decrementAndGet(); 
+                int newCount = count.decrementAndGet();
                 if (newCount <=0) {
                     map.remove(dataFileId);
                 }
@@ -1036,9 +1069,9 @@ public class AMQPersistenceAdapter imple
             }
         }
     }
-	
-	
-	protected void lock() throws Exception {
+
+
+    protected void lock() throws Exception {
         lockLogged = false;
         lockAquired = false;
         do {
@@ -1054,8 +1087,8 @@ public class AMQPersistenceAdapter imple
 
         } while (!lockAquired && !disableLocking);
     }
-	
-	private synchronized void unlock() throws IOException {
+
+    private synchronized void unlock() throws IOException {
         if (!disableLocking && (null != lock)) {
             //clear property doesn't work on some platforms
             System.getProperties().remove(getPropertyKey());
@@ -1064,16 +1097,16 @@ public class AMQPersistenceAdapter imple
             if (lock.isValid()) {
                 lock.release();
                 lock.channel().close();
-                
+
             }
             lock = null;
         }
     }
 
-	
-	protected boolean doLock() throws IOException {
-	    boolean result = true;
-	    if (!disableLocking && directory != null && lock == null) {
+
+    protected boolean doLock() throws IOException {
+        boolean result = true;
+        if (!disableLocking && directory != null && lock == null) {
             String key = getPropertyKey();
             String property = System.getProperty(key);
             if (null == property) {
@@ -1089,23 +1122,24 @@ public class AMQPersistenceAdapter imple
                 result = false;
             }
         }
-	    return result;
-	}
-	
-	private String getPropertyKey() throws IOException {
+        return result;
+    }
+
+    private String getPropertyKey() throws IOException {
         return getClass().getName() + ".lock." + directory.getCanonicalPath();
     }
-	
-	static {
-	    BROKEN_FILE_LOCK = "true".equals(System.getProperty(PROPERTY_PREFIX
-	            + ".FileLockBroken",
-	            "false"));
-	    DISABLE_LOCKING = "true".equals(System.getProperty(PROPERTY_PREFIX
-	           + ".DisableLocking",
-	           "false"));
-	}
 
-	
+    static {
+        BROKEN_FILE_LOCK = "true".equals(System.getProperty(PROPERTY_PREFIX
+                + ".FileLockBroken",
+                "false"));
+        DISABLE_LOCKING = "true".equals(System.getProperty(PROPERTY_PREFIX
+               + ".DisableLocking",
+               "false"));
+    }
+
+
+    @Override
     public long getLastProducerSequenceId(ProducerId id) {
         // reference store send has adequate duplicate suppression
         return -1;

Modified: activemq/trunk/activemq-amq-store/src/main/java/org/apache/activemq/store/kahadaptor/KahaPersistenceAdapter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-amq-store/src/main/java/org/apache/activemq/store/kahadaptor/KahaPersistenceAdapter.java?rev=1416959&r1=1416958&r2=1416959&view=diff
==============================================================================
--- activemq/trunk/activemq-amq-store/src/main/java/org/apache/activemq/store/kahadaptor/KahaPersistenceAdapter.java
(original)
+++ activemq/trunk/activemq-amq-store/src/main/java/org/apache/activemq/store/kahadaptor/KahaPersistenceAdapter.java
Tue Dec  4 14:24:07 2012
@@ -55,8 +55,9 @@ import org.slf4j.LoggerFactory;
 
 /**
  * @org.apache.xbean.XBean
- * 
+ *
  */
+@Deprecated
 public class KahaPersistenceAdapter implements PersistenceAdapter, BrokerServiceAware {
 
     private static final int STORE_LOCKED_WAIT_DELAY = 10 * 1000;
@@ -78,15 +79,15 @@ public class KahaPersistenceAdapter impl
     private boolean persistentIndex = true;
     private BrokerService brokerService;
 
-    
     public KahaPersistenceAdapter(AtomicLong size) {
         this.storeSize=size;
     }
-    
+
     public KahaPersistenceAdapter() {
         this(new AtomicLong());
     }
-    
+
+    @Override
     public Set<ActiveMQDestination> getDestinations() {
         Set<ActiveMQDestination> rc = new HashSet<ActiveMQDestination>();
         try {
@@ -104,6 +105,7 @@ public class KahaPersistenceAdapter impl
         return rc;
     }
 
+    @Override
     public synchronized MessageStore createQueueMessageStore(ActiveMQQueue destination) throws
IOException {
         MessageStore rc = queues.get(destination);
         if (rc == null) {
@@ -117,6 +119,7 @@ public class KahaPersistenceAdapter impl
         return rc;
     }
 
+    @Override
     public synchronized TopicMessageStore createTopicMessageStore(ActiveMQTopic destination)
         throws IOException {
         TopicMessageStore rc = topics.get(destination);
@@ -143,14 +146,15 @@ public class KahaPersistenceAdapter impl
      *
      * @param destination Destination to forget
      */
+    @Override
     public void removeQueueMessageStore(ActiveMQQueue destination) {
         queues.remove(destination);
         try{
-        	if(theStore!=null){
-        		theStore.deleteMapContainer(destination,"queue-data");
-        	}
+            if(theStore!=null){
+                theStore.deleteMapContainer(destination,"queue-data");
+            }
         }catch(IOException e ){
-        	LOG.error("Failed to remove store map container for queue:"+destination, e);
+            LOG.error("Failed to remove store map container for queue:"+destination, e);
         }
     }
 
@@ -159,6 +163,7 @@ public class KahaPersistenceAdapter impl
      *
      * @param destination Destination to forget
      */
+    @Override
     public void removeTopicMessageStore(ActiveMQTopic destination) {
         topics.remove(destination);
     }
@@ -168,6 +173,7 @@ public class KahaPersistenceAdapter impl
         return result;
     }
 
+    @Override
     public TransactionStore createTransactionStore() throws IOException {
         if (transactionStore == null) {
             while (true) {
@@ -194,32 +200,39 @@ public class KahaPersistenceAdapter impl
         return transactionStore;
     }
 
+    @Override
     public void beginTransaction(ConnectionContext context) {
     }
 
+    @Override
     public void commitTransaction(ConnectionContext context) throws IOException {
         if (theStore != null) {
             theStore.force();
         }
     }
 
+    @Override
     public void rollbackTransaction(ConnectionContext context) {
     }
 
+    @Override
     public void start() throws Exception {
         initialize();
     }
 
+    @Override
     public void stop() throws Exception {
         if (theStore != null) {
             theStore.close();
         }
     }
 
+    @Override
     public long getLastMessageBrokerSequenceId() throws IOException {
         return 0;
     }
 
+    @Override
     public void deleteAllMessages() throws IOException {
         if (theStore != null) {
             if (theStore.isInitialized()) {
@@ -268,6 +281,7 @@ public class KahaPersistenceAdapter impl
      * @param usageManager The UsageManager that is controlling the broker's
      *                memory usage.
      */
+    @Override
     public void setUsageManager(SystemUsage usageManager) {
     }
 
@@ -277,14 +291,14 @@ public class KahaPersistenceAdapter impl
     public long getMaxDataFileLength() {
         return maxDataFileLength;
     }
-    
+
     public boolean isPersistentIndex() {
-		return persistentIndex;
-	}
+        return persistentIndex;
+    }
 
-	public void setPersistentIndex(boolean persistentIndex) {
-		this.persistentIndex = persistentIndex;
-	}
+    public void setPersistentIndex(boolean persistentIndex) {
+        this.persistentIndex = persistentIndex;
+    }
 
     /**
      * When set using Xbean, values of the form "20 Mb", "1024kb", and "1g" can be used
@@ -300,7 +314,7 @@ public class KahaPersistenceAdapter impl
         }
         return theStore;
     }
-    
+
     protected final Store createStore() throws IOException {
         Store result = StoreFactory.open(getStoreDirectory(), "rw",storeSize);
         result.setMaxDataFileLength(maxDataFileLength);
@@ -319,10 +333,12 @@ public class KahaPersistenceAdapter impl
         return directory;
     }
 
+    @Override
     public String toString() {
         return "KahaPersistenceAdapter(" + getStoreName() + ")";
     }
 
+    @Override
     public void setBrokerName(String brokerName) {
         this.brokerName = brokerName;
     }
@@ -331,20 +347,24 @@ public class KahaPersistenceAdapter impl
         return brokerName;
     }
 
+    @Override
     public File getDirectory() {
         return this.directory;
     }
 
+    @Override
     public void setDirectory(File directory) {
         this.directory = directory;
     }
 
+    @Override
     public void checkpoint(boolean sync) throws IOException {
         if (sync) {
             getStore().force();
         }
     }
-   
+
+    @Override
     public long size(){
        return storeSize.get();
     }
@@ -367,14 +387,16 @@ public class KahaPersistenceAdapter impl
         }
     }
 
-	public void setBrokerService(BrokerService brokerService) {
-		this.brokerService = brokerService;
-	}
+    @Override
+    public void setBrokerService(BrokerService brokerService) {
+        this.brokerService = brokerService;
+    }
 
+    @Override
     public long getLastProducerSequenceId(ProducerId id) {
         // reference store send has adequate duplicate suppression
         return -1;
     }
-  
+
 
 }



Mime
View raw message