activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rajdav...@apache.org
Subject svn commit: r632964 - in /activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/hash: HashBin.java HashIndex.java HashPage.java HashPageInfo.java
Date Mon, 03 Mar 2008 07:18:01 GMT
Author: rajdavies
Date: Sun Mar  2 23:18:00 2008
New Revision: 632964

URL: http://svn.apache.org/viewvc?rev=632964&view=rev
Log:
Compress HashIndex on startup - only way to ensure the index
pages are loaded in correct order without changing the wire format

Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/hash/HashBin.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/hash/HashIndex.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/hash/HashPage.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/hash/HashPageInfo.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/hash/HashBin.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/hash/HashBin.java?rev=632964&r1=632963&r2=632964&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/hash/HashBin.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/hash/HashBin.java
Sun Mar  2 23:18:00 2008
@@ -92,7 +92,7 @@
         return size;
     }
 
-    HashPageInfo addHashPageInfo(long id, int size) {
+    HashPageInfo addHashPageInfo(long id, int size) throws IOException {
         HashPageInfo info = new HashPageInfo(hashIndex);
         info.setId(id);
         info.setSize(size);
@@ -105,7 +105,7 @@
         HashEntry result = null;
         try {
             int low = 0;
-            int high = size() - 1;
+            int high = size()-1;
             while (low <= high) {
                 int mid = (low + high) >> 1;
                 HashEntry te = getHashEntry(mid);
@@ -129,7 +129,7 @@
         boolean replace = false;
         try {
             int low = 0;
-            int high = size() - 1;
+            int high = size()-1;
             while (low <= high) {
                 int mid = (low + high) >> 1;
                 HashEntry midVal = getHashEntry(mid);
@@ -223,7 +223,7 @@
         HashPageInfo page = getRetrievePage(index);
         int offset = getRetrieveOffset(index);
         HashEntry result = page.removeHashEntry(offset);
-       
+        
         if (page.isEmpty()) {
             hashPages.remove(page);
             hashIndex.releasePage(page.getPage());
@@ -280,7 +280,7 @@
             // overflowed
             info.begin();
             HashEntry entry = info.removeHashEntry(info.size() - 1);
-            doOverFlow(hashPages.indexOf(info) + 1, entry);
+            doOverFlow(hashPages.indexOf(info)+1, entry);
         }
     }
 
@@ -298,13 +298,22 @@
         if (info.size() > maximumEntries) {
             // overflowed
             HashEntry overflowed = info.removeHashEntry(info.size() - 1);
-            doOverFlow(pageNo + 1, overflowed);
+            doOverFlow(pageNo+1, overflowed);
         }
     }
 
     private void doUnderFlow(int index) {
     }
 
+    String dump() throws IOException {
+        String str = "[" + hashPages.size()+"]";
+        for (HashPageInfo page : hashPages) {
+            page.begin();
+            str +=page.dump();
+            page.end();
+        }
+        return str;
+    }
     private void end() throws IOException {
         for (HashPageInfo info : hashPages) {
             info.end();

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/hash/HashIndex.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/hash/HashIndex.java?rev=632964&r1=632963&r2=632964&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/hash/HashIndex.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/hash/HashIndex.java
Sun Mar  2 23:18:00 2008
@@ -17,8 +17,10 @@
 package org.apache.activemq.kaha.impl.index.hash;
 
 import java.io.File;
+import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.io.RandomAccessFile;
+import java.util.Arrays;
 import java.util.LinkedList;
 import java.util.concurrent.atomic.AtomicBoolean;
 import org.apache.activemq.kaha.Marshaller;
@@ -65,7 +67,7 @@
     private int pageCacheSize = 10;
     private int size;
     private int activeBins;
-
+    
     
     /**
      * Constructor
@@ -198,29 +200,17 @@
             readBuffer = new byte[pageSize];
             try {
                 openIndexFile();
-                long offset = 0;
-                while ((offset + pageSize) <= indexFile.length()) {
-                    indexFile.seek(offset);
-                    indexFile.readFully(readBuffer, 0, HashPage.PAGE_HEADER_SIZE);
-                    dataIn.restart(readBuffer);
-                    HashPage page = new HashPage(keysPerPage);
-                    page.setId(offset);
-                    page.readHeader(dataIn);
-                    if (!page.isActive()) {
-                        freeList.add(page);
-                    } else {
-                        addToBin(page);
-                        size+=page.size();
-                    }
-                    offset += pageSize;
+                if (indexFile.length() > 0) {
+                    doCompress();
                 }
-                length = offset;
             } catch (IOException e) {
                 LOG.error("Failed to load index ", e);
                 throw new RuntimeException(e);
             }
         }
     }
+    
+    
 
     public synchronized void unload() throws IOException {
         if (loaded.compareAndSet(true, false)) {
@@ -228,6 +218,7 @@
                 indexFile.close();
                 indexFile = null;
                 freeList.clear();
+                pageCache.clear();
                 bins = new HashBin[bins.length];
             }
         }
@@ -330,6 +321,7 @@
             result = freeList.removeFirst();
             result.setActive(true);
             result.reset();
+            writePageHeader(result);
         }
         return result;
     }
@@ -371,7 +363,7 @@
         return page;
     }
 
-    void addToBin(HashPage page) {
+    void addToBin(HashPage page) throws IOException {
         HashBin bin = getBin(page.getBinId());
         bin.addHashPageInfo(page.getId(), page.getPersistedSize());
     }
@@ -393,7 +385,7 @@
             indexFile = new RandomAccessFile(file, "rw");
         }
     }
-
+    
     private HashBin getBin(Object key) {
         int hash = hash(key);
         int i = indexFor(hash, bins.length);
@@ -419,6 +411,61 @@
             pageCache.remove(page.getId());
         }
     }
+    
+    private void doLoad() throws IOException {
+        long offset = 0;
+        if (loaded.compareAndSet(false, true)) {
+            while ((offset + pageSize) <= indexFile.length()) {
+                indexFile.seek(offset);
+                indexFile.readFully(readBuffer, 0, HashPage.PAGE_HEADER_SIZE);
+                dataIn.restart(readBuffer);
+                HashPage page = new HashPage(keysPerPage);
+                page.setId(offset);
+                page.readHeader(dataIn);
+                if (!page.isActive()) {
+                    page.reset();
+                    freeList.add(page);
+                } else {
+                    addToBin(page);
+                    size+=page.size();
+                }
+                offset += pageSize;
+            }
+            length=offset;
+        }
+    }
+    
+    private void doCompress() throws IOException {
+        String backFileName = name + "-COMPRESS";
+        HashIndex backIndex = new HashIndex(directory,backFileName,indexManager);
+        backIndex.setKeyMarshaller(keyMarshaller);
+        backIndex.setKeySize(getKeySize());
+        backIndex.setNumberOfBins(getNumberOfBins());
+        backIndex.setPageSize(getPageSize());
+        backIndex.load();
+        File backFile = backIndex.file;
+        long offset = 0;
+        while ((offset + pageSize) <= indexFile.length()) {
+            indexFile.seek(offset);
+            HashPage page = getFullPage(offset);
+            if (page.isActive()) {
+                for (HashEntry entry : page.getEntries()) {
+                    backIndex.getBin(entry.getKey()).put(entry);
+                    backIndex.size++;
+                }
+            }
+            offset += pageSize;
+        }
+        backIndex.unload();
+      
+        unload();
+        IOHelper.deleteFile(file);
+        IOHelper.copyFile(backFile, file);
+        IOHelper.deleteFile(backFile);
+        openIndexFile();
+        doLoad();
+    }
+      
 
     static int hash(Object x) {
         int h = x.hashCode();

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/hash/HashPage.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/hash/HashPage.java?rev=632964&r1=632963&r2=632964&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/hash/HashPage.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/hash/HashPage.java
Sun Mar  2 23:18:00 2008
@@ -38,8 +38,8 @@
     private int maximumEntries;
     private long id;
     private int binId;
-    private int persistedSize;
     private List<HashEntry> hashIndexEntries;
+    private int persistedSize;
     /*
      * for persistence only
      */
@@ -71,7 +71,7 @@
     }
 
     public String toString() {
-        return "HashPage[" + getId() + ":" + binId + ":" + id+"] size = " + hashIndexEntries.size();
+        return "HashPage[" + getId() + ":" + binId + ":" + id+"] size = " + persistedSize;
     }
 
     public boolean equals(Object o) {
@@ -95,14 +95,7 @@
         this.active = active;
     }
 
-    long getNextFreePageId() {
-        return this.nextFreePageId;
-    }
-
-    void setNextFreePageId(long nextPageId) {
-        this.nextFreePageId = nextPageId;
-    }
-
+    
     long getId() {
         return id;
     }
@@ -116,8 +109,9 @@
     }
 
     void write(Marshaller keyMarshaller, DataOutput dataOut) throws IOException {
+        persistedSize=hashIndexEntries.size();
         writeHeader(dataOut);
-        dataOut.writeInt(hashIndexEntries.size());
+        dataOut.writeInt(persistedSize);
         for (HashEntry entry : hashIndexEntries) {
             entry.write(keyMarshaller, dataOut);
         }
@@ -125,7 +119,8 @@
 
     void read(Marshaller keyMarshaller, DataInput dataIn) throws IOException {
         readHeader(dataIn);
-        int size = dataIn.readInt();
+        dataIn.readInt();
+        int size = persistedSize;
         hashIndexEntries.clear();
         for (int i = 0; i < size; i++) {
             HashEntry entry = new HashEntry();
@@ -145,8 +140,10 @@
         dataOut.writeBoolean(isActive());
         dataOut.writeLong(nextFreePageId);
         dataOut.writeInt(binId);
-        dataOut.writeInt(size());
+        persistedSize=hashIndexEntries.size();
+        dataOut.writeInt(persistedSize);
     }
+    
 
     boolean isEmpty() {
         return hashIndexEntries.isEmpty();
@@ -186,12 +183,10 @@
 
     void reset() throws IOException {
         hashIndexEntries.clear();
-        setNextFreePageId(HashEntry.NOT_SET);
+        persistedSize=0;
     }
 
     void addHashEntry(int index, HashEntry entry) throws IOException {
-        // index = index >= 0 ? index : 0;
-        // index = (index == 0 || index< size()) ? index : size()-1;
         hashIndexEntries.add(index, entry);
     }
 
@@ -227,7 +222,7 @@
         this.binId = binId;
     }
 
-    void dump() {
+    String dump() {
 
         StringBuffer str = new StringBuffer(32);
         str.append(toString());
@@ -236,6 +231,6 @@
             str.append(entry);
             str.append(",");
         }
-        LOG.info(str);
+        return str.toString();
     }
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/hash/HashPageInfo.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/hash/HashPageInfo.java?rev=632964&r1=632963&r2=632964&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/hash/HashPageInfo.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/hash/HashPageInfo.java
Sun Mar  2 23:18:00 2008
@@ -86,8 +86,8 @@
         return result;
     }
 
-    void dump() {
-        page.dump();
+    String dump() {
+        return page.dump();
     }
 
     void begin() throws IOException {



Mime
View raw message