activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rajdav...@apache.org
Subject svn commit: r597581 - in /activemq/trunk/activemq-core/src/main/java/org/apache/activemq: kaha/impl/async/AsyncDataManager.java kaha/impl/async/DataFile.java store/kahadaptor/KahaReferenceStoreAdapter.java store/kahadaptor/KahaTopicReferenceStore.java
Date Fri, 23 Nov 2007 07:40:52 GMT
Author: rajdavies
Date: Thu Nov 22 23:40:51 2007
New Revision: 597581

URL: http://svn.apache.org/viewvc?rev=597581&view=rev
Log:
Fix for https://issues.apache.org/activemq/browse/AMQ-1500

Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/AsyncDataManager.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/DataFile.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStoreAdapter.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicReferenceStore.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/AsyncDataManager.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/AsyncDataManager.java?rev=597581&r1=597580&r2=597581&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/AsyncDataManager.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/AsyncDataManager.java
Thu Nov 22 23:40:51 2007
@@ -28,6 +28,7 @@
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
+import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -39,10 +40,11 @@
 import org.apache.activemq.kaha.impl.async.DataFileAppender.WriteKey;
 import org.apache.activemq.thread.Scheduler;
 import org.apache.activemq.util.ByteSequence;
-import org.apache.activemq.util.IOHelper;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
+
+
 /**
  * Manages DataFiles
  * 
@@ -87,6 +89,7 @@
     private DataFileAccessorPool accessorPool = new DataFileAccessorPool(this);
 
     private Map<Integer, DataFile> fileMap = new HashMap<Integer, DataFile>();
+    private Map<File, DataFile> fileByFileMap = new LinkedHashMap<File, DataFile>();
     private DataFile currentWriteFile;
 
     private Location mark;
@@ -131,7 +134,7 @@
                 return dir.equals(directory) && n.startsWith(filePrefix);
             }
         });
-
+       
         if (files != null) {
             for (int i = 0; i < files.length; i++) {
                 try {
@@ -157,6 +160,7 @@
                     currentWriteFile.linkAfter(df);
                 }
                 currentWriteFile = df;
+                fileByFileMap.put(df.getFile(), df);
             }
         }
 
@@ -254,8 +258,10 @@
             int nextNum = currentWriteFile != null ? currentWriteFile.getDataFileId().intValue()
+ 1 : 1;
 
             String fileName = filePrefix + nextNum;
-            DataFile nextWriteFile = new DataFile(new File(directory, fileName), nextNum,
preferedFileLength);
+            File file = new File(directory, fileName);
+            DataFile nextWriteFile = new DataFile(file, nextNum, preferedFileLength);
             fileMap.put(nextWriteFile.getDataFileId(), nextWriteFile);
+            fileByFileMap.put(file, nextWriteFile);
             if (currentWriteFile != null) {
                 currentWriteFile.linkAfter(nextWriteFile);
                 if (currentWriteFile.isUnused()) {
@@ -289,6 +295,16 @@
         }
         return dataFile;
     }
+    
+    File getFile(Location item) throws IOException {
+        Integer key = Integer.valueOf(item.getDataFileId());
+        DataFile dataFile = fileMap.get(key);
+        if (dataFile == null) {
+            LOG.error("Looking for key " + key + " but not found in fileMap: " + fileMap);
+            throw new IOException("Could not locate data file " + filePrefix + "-" + item.getDataFileId());
+        }
+        return dataFile.getFile();
+    }
 
     private DataFile getNextDataFile(DataFile dataFile) {
         return (DataFile)dataFile.getNext();
@@ -303,6 +319,7 @@
         storeState(false);
         appender.close();
         fileMap.clear();
+        fileByFileMap.clear();
         controlFile.unlock();
         controlFile.dispose();
         started = false;
@@ -327,6 +344,7 @@
             result &= dataFile.delete();
         }
         fileMap.clear();
+        fileByFileMap.clear();
         lastAppendLocation.set(null);
         mark = null;
         currentWriteFile = null;
@@ -415,6 +433,7 @@
     private synchronized void forceRemoveDataFile(DataFile dataFile)
             throws IOException {
         accessorPool.disposeDataFileAccessors(dataFile);
+        fileByFileMap.remove(dataFile.getFile());
         DataFile removed = fileMap.remove(dataFile.getDataFileId());
         storeSize.addAndGet(-dataFile.getLength());
         dataFile.unlink();
@@ -461,16 +480,6 @@
                     cur = new Location();
                     cur.setDataFileId(head.getDataFileId());
                     cur.setOffset(0);
-
-                    // DataFileAccessor reader =
-                    // accessorPool.openDataFileAccessor(head);
-                    // try {
-                    // if( !reader.readLocationDetailsAndValidate(cur) ) {
-                    // return null;
-                    // }
-                    // } finally {
-                    // accessorPool.closeDataFileAccessor(reader);
-                    // }
                 } else {
                     // Set to the next offset..
                     cur = new Location(location);
@@ -509,6 +518,64 @@
             }
         }
     }
+    
+    public synchronized Location getNextLocation(File file, Location lastLocation,boolean
thisFileOnly) throws IllegalStateException, IOException{
+        DataFile df = fileByFileMap.get(file);
+        return getNextLocation(df, lastLocation,thisFileOnly);
+    }
+    
+    public synchronized Location getNextLocation(DataFile dataFile,
+            Location lastLocation,boolean thisFileOnly) throws IOException, IllegalStateException
{
+
+        Location cur = null;
+        while (true) {
+            if (cur == null) {
+                if (lastLocation == null) {
+                    DataFile head = (DataFile)dataFile.getHeadNode();
+                    cur = new Location();
+                    cur.setDataFileId(head.getDataFileId());
+                    cur.setOffset(0);
+                } else {
+                    // Set to the next offset..
+                    cur = new Location(lastLocation);
+                    cur.setOffset(cur.getOffset() + cur.getSize());
+                }
+            } else {
+                cur.setOffset(cur.getOffset() + cur.getSize());
+            }
+
+            
+            // Did it go into the next file??
+            if (dataFile.getLength() <= cur.getOffset()) {
+                if (thisFileOnly) {
+                    return null;
+                }else {
+                dataFile = getNextDataFile(dataFile);
+                if (dataFile == null) {
+                    return null;
+                } else {
+                    cur.setDataFileId(dataFile.getDataFileId().intValue());
+                    cur.setOffset(0);
+                }
+                }
+            }
+
+            // Load in location size and type.
+            DataFileAccessor reader = accessorPool.openDataFileAccessor(dataFile);
+            try {
+                reader.readLocationDetails(cur);
+            } finally {
+                accessorPool.closeDataFileAccessor(reader);
+            }
+
+            if (cur.getType() == 0) {
+                return null;
+            } else if (cur.getType() > 0) {
+                // Only return user records.
+                return cur;
+            }
+        }
+    }
 
     public ByteSequence read(Location location) throws IOException, IllegalStateException
{
         DataFile dataFile = getDataFile(location);
@@ -610,5 +677,13 @@
         if( currentWriteFile==null )
             return null;
         return currentWriteFile.getDataFileId();
+    }
+    
+    /**
+     * Get a set of files - only valid after start()
+     * @return files currently being used
+     */
+    public Set<File> getFiles(){
+        return fileByFileMap.keySet();
     }
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/DataFile.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/DataFile.java?rev=597581&r1=597580&r2=597581&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/DataFile.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/DataFile.java
Thu Nov 22 23:40:51 2007
@@ -43,6 +43,10 @@
         this.dataFileId = Integer.valueOf(number);
         length = (int)(file.exists() ? file.length() : 0);
     }
+    
+    File getFile() {
+        return file;
+    }
 
     public Integer getDataFileId() {
         return dataFileId;

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStoreAdapter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStoreAdapter.java?rev=597581&r1=597580&r2=597581&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStoreAdapter.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStoreAdapter.java
Thu Nov 22 23:40:51 2007
@@ -91,11 +91,13 @@
         boolean empty = store.getMapContainerIds().isEmpty();
         stateMap = store.getMapContainer("state", STORE_STATE);
         stateMap.load();
+        storeValid=true;
         if (!empty) {
             AtomicBoolean status = (AtomicBoolean)stateMap.get(STORE_STATE);
             if (status != null) {
                 storeValid = status.get();
             }
+           
             if (storeValid) {
                 //check what version the indexes are at
                 Integer indexVersion = (Integer) stateMap.get(INDEX_VERSION_NAME);
@@ -236,7 +238,9 @@
      * @see org.apache.activemq.store.ReferenceStoreAdapter#clearMessages()
      */
     public void clearMessages() throws IOException {
-        deleteAllMessages();
+        //don't delete messages as it will clear state - call base
+        //class method to clear out the data instead
+        super.deleteAllMessages();
     }
 
     /**
@@ -247,6 +251,7 @@
     public void recoverState() throws IOException {
         for (Iterator<SubscriptionInfo> i = durableSubscribers.iterator(); i.hasNext();)
{
             SubscriptionInfo info = i.next();
+            LOG.info("Recovering subscriber state for durable subscriber: " + info);
             TopicReferenceStore ts = createTopicReferenceStore((ActiveMQTopic)info.getDestination());
             ts.addSubsciption(info, false);
         }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicReferenceStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicReferenceStore.java?rev=597581&r1=597580&r2=597581&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicReferenceStore.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicReferenceStore.java
Thu Nov 22 23:40:51 2007
@@ -215,8 +215,9 @@
     }
 
     public SubscriptionInfo[] getAllSubscriptions() throws IOException {
-        return subscriberContainer.values()
+        SubscriptionInfo[] result = subscriberContainer.values()
             .toArray(new SubscriptionInfo[subscriberContainer.size()]);
+        return result;
     }
 
     public int getMessageCount(String clientId, String subscriberName) throws IOException
{



Mime
View raw message