accumulo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ktur...@apache.org
Subject svn commit: r1362072 - in /accumulo/trunk/core/src: main/java/org/apache/accumulo/core/file/blockfile/ main/java/org/apache/accumulo/core/file/blockfile/cache/ main/java/org/apache/accumulo/core/file/blockfile/impl/ main/java/org/apache/accumulo/core/f...
Date Mon, 16 Jul 2012 14:53:59 GMT
Author: kturner
Date: Mon Jul 16 14:53:58 2012
New Revision: 1362072

URL: http://svn.apache.org/viewvc?rev=1362072&view=rev
Log:
ACCUMULO-473 added transient indexing to cached rfile blocks

Added:
    accumulo/trunk/core/src/main/java/org/apache/accumulo/core/file/rfile/BlockIndex.java
Modified:
    accumulo/trunk/core/src/main/java/org/apache/accumulo/core/file/blockfile/ABlockReader.java
    accumulo/trunk/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/BlockCache.java
    accumulo/trunk/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/CacheEntry.java
    accumulo/trunk/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/CachedBlock.java
    accumulo/trunk/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/LruBlockCache.java
    accumulo/trunk/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/SimpleBlockCache.java
    accumulo/trunk/core/src/main/java/org/apache/accumulo/core/file/blockfile/impl/CachableBlockFile.java
    accumulo/trunk/core/src/main/java/org/apache/accumulo/core/file/rfile/RFile.java
    accumulo/trunk/core/src/main/java/org/apache/accumulo/core/file/rfile/RelativeKey.java
    accumulo/trunk/core/src/test/java/org/apache/accumulo/core/file/blockfile/cache/TestLruBlockCache.java
    accumulo/trunk/core/src/test/java/org/apache/accumulo/core/file/rfile/RFileTest.java

Modified: accumulo/trunk/core/src/main/java/org/apache/accumulo/core/file/blockfile/ABlockReader.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/core/src/main/java/org/apache/accumulo/core/file/blockfile/ABlockReader.java?rev=1362072&r1=1362071&r2=1362072&view=diff
==============================================================================
--- accumulo/trunk/core/src/main/java/org/apache/accumulo/core/file/blockfile/ABlockReader.java
(original)
+++ accumulo/trunk/core/src/main/java/org/apache/accumulo/core/file/blockfile/ABlockReader.java
Mon Jul 16 14:53:58 2012
@@ -34,4 +34,13 @@ public interface ABlockReader extends Da
   
   public void close() throws IOException;
   
+  /**
+   * An indexable block supports seeking, getting a position, and associating an arbitrary
index with the block
+   * 
+   * @return
+   */
+  public boolean isIndexable();
+  public void seek(int position);
+  public int getPosition();
+  <T> T getIndex(Class<T> clazz);
 }

Modified: accumulo/trunk/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/BlockCache.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/BlockCache.java?rev=1362072&r1=1362071&r2=1362072&view=diff
==============================================================================
--- accumulo/trunk/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/BlockCache.java
(original)
+++ accumulo/trunk/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/BlockCache.java
Mon Jul 16 14:53:58 2012
@@ -33,7 +33,7 @@ public interface BlockCache {
    * @param inMemory
    *          Whether block should be treated as in-memory
    */
-  public void cacheBlock(String blockName, byte buf[], boolean inMemory);
+  public CacheEntry cacheBlock(String blockName, byte buf[], boolean inMemory);
   
   /**
    * Add block to cache (defaults to not in-memory).
@@ -43,7 +43,7 @@ public interface BlockCache {
    * @param buf
    *          The block contents wrapped in a ByteBuffer.
    */
-  public void cacheBlock(String blockName, byte buf[]);
+  public CacheEntry cacheBlock(String blockName, byte buf[]);
   
   /**
    * Fetch block from cache.
@@ -52,10 +52,15 @@ public interface BlockCache {
    *          Block number to fetch.
    * @return Block or null if block is not in the cache.
    */
-  public byte[] getBlock(String blockName);
+  public CacheEntry getBlock(String blockName);
   
   /**
    * Shutdown the cache.
    */
   public void shutdown();
+  
+  /**
+   * @return
+   */
+  public long getMaxSize();
 }

Modified: accumulo/trunk/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/CacheEntry.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/CacheEntry.java?rev=1362072&r1=1362071&r2=1362072&view=diff
==============================================================================
--- accumulo/trunk/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/CacheEntry.java
(original)
+++ accumulo/trunk/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/CacheEntry.java
Mon Jul 16 14:53:58 2012
@@ -16,40 +16,11 @@
  */
 package org.apache.accumulo.core.file.blockfile.cache;
 
-public class CacheEntry {
-  private String fName;
-  private Long hash;
+public interface CacheEntry {
+  byte[] getBuffer();
   
-  public CacheEntry(String name, Long time) {
-    this.hash = time;
-    this.fName = name;
-  }
+  public Object getIndex();
   
-  @Override
-  public boolean equals(Object other) {
-    return
-    
-    ((CacheEntry) other).getName().equals(fName) && ((CacheEntry) other).getHashInfo().equals(hash)
&& ((CacheEntry) other).getName().equals(fName)
-        && ((CacheEntry) other).getHashInfo().equals(hash);
-    
-  }
-  
-  @Override
-  public int hashCode() {
-    return fName.hashCode() + hash.hashCode();
-  }
-  
-  public String getName() {
-    return fName;
-  }
-  
-  public Long getHashInfo() {
-    
-    return this.hash;
-  }
-  
-  public long length() {
-    return fName.length() + Long.SIZE;
-  }
+  public void setIndex(Object idx);
   
 }

Modified: accumulo/trunk/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/CachedBlock.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/CachedBlock.java?rev=1362072&r1=1362071&r2=1362072&view=diff
==============================================================================
--- accumulo/trunk/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/CachedBlock.java
(original)
+++ accumulo/trunk/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/CachedBlock.java
Mon Jul 16 14:53:58 2012
@@ -19,6 +19,7 @@
  */
 package org.apache.accumulo.core.file.blockfile.cache;
 
+
 /**
  * Represents an entry in the {@link LruBlockCache}.
  * 
@@ -26,7 +27,7 @@ package org.apache.accumulo.core.file.bl
  * Makes the block memory-aware with {@link HeapSize} and Comparable to sort by access time
for the LRU. It also takes care of priority by either instantiating
  * as in-memory or handling the transition from single to multiple access.
  */
-public class CachedBlock implements HeapSize, Comparable<CachedBlock> {
+public class CachedBlock implements HeapSize, Comparable<CachedBlock>, CacheEntry {
   
   public final static long PER_BLOCK_OVERHEAD = ClassSize.align(ClassSize.OBJECT + (3 * ClassSize.REFERENCE)
+ (2 * SizeConstants.SIZEOF_LONG)
       + ClassSize.STRING + ClassSize.BYTE_BUFFER);
@@ -51,6 +52,7 @@ public class CachedBlock implements Heap
   private volatile long accessTime;
   private long size;
   private BlockPriority priority;
+  private Object index;
   
   public CachedBlock(String blockName, byte buf[], long accessTime) {
     this(blockName, buf, accessTime, false);
@@ -88,6 +90,7 @@ public class CachedBlock implements Heap
     return this.accessTime < that.accessTime ? 1 : -1;
   }
   
+  @Override
   public byte[] getBuffer() {
     return this.buf;
   }
@@ -99,4 +102,14 @@ public class CachedBlock implements Heap
   public BlockPriority getPriority() {
     return this.priority;
   }
+  
+  @Override
+  public Object getIndex() {
+    return index;
+  }
+  
+  @Override
+  public void setIndex(Object idx) {
+    this.index = idx;
+  }
 }

Modified: accumulo/trunk/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/LruBlockCache.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/LruBlockCache.java?rev=1362072&r1=1362071&r2=1362072&view=diff
==============================================================================
--- accumulo/trunk/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/LruBlockCache.java
(original)
+++ accumulo/trunk/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/LruBlockCache.java
Mon Jul 16 14:53:58 2012
@@ -247,7 +247,7 @@ public class LruBlockCache implements Bl
    * @param inMemory
    *          if block is in-memory
    */
-  public void cacheBlock(String blockName, byte buf[], boolean inMemory) {
+  public CacheEntry cacheBlock(String blockName, byte buf[], boolean inMemory) {
     CachedBlock cb = map.get(blockName);
     if (cb != null) {
       stats.duplicateReads();
@@ -262,6 +262,8 @@ public class LruBlockCache implements Bl
         runEviction();
       }
     }
+    
+    return cb;
   }
   
   /**
@@ -275,8 +277,8 @@ public class LruBlockCache implements Bl
    * @param buf
    *          block buffer
    */
-  public void cacheBlock(String blockName, byte buf[]) {
-    cacheBlock(blockName, buf, false);
+  public CacheEntry cacheBlock(String blockName, byte buf[]) {
+    return cacheBlock(blockName, buf, false);
   }
   
   /**
@@ -286,7 +288,8 @@ public class LruBlockCache implements Bl
    *          block name
    * @return buffer of specified block name, or null if not in cache
    */
-  public byte[] getBlock(String blockName) {
+  
+  public CachedBlock getBlock(String blockName) {
     CachedBlock cb = map.get(blockName);
     if (cb == null) {
       stats.miss();
@@ -294,7 +297,7 @@ public class LruBlockCache implements Bl
     }
     stats.hit();
     cb.access(count.incrementAndGet());
-    return cb.getBuffer();
+    return cb;
   }
   
   protected long evictBlock(CachedBlock block) {

Modified: accumulo/trunk/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/SimpleBlockCache.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/SimpleBlockCache.java?rev=1362072&r1=1362071&r2=1362072&view=diff
==============================================================================
--- accumulo/trunk/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/SimpleBlockCache.java
(original)
+++ accumulo/trunk/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/SimpleBlockCache.java
Mon Jul 16 14:53:58 2012
@@ -27,18 +27,45 @@ import java.util.Map;
  * Simple one RFile soft reference cache.
  */
 public class SimpleBlockCache implements BlockCache {
-  private static class Ref extends SoftReference<byte[]> {
+  
+  private static class SimpleCacheEntry implements CacheEntry {
+    
+    private byte[] buffer;
+    private Object index;
+    
+    SimpleCacheEntry(byte[] buffer) {
+      this.buffer = buffer;
+    }
+    
+    @Override
+    public byte[] getBuffer() {
+      return buffer;
+    }
+    
+    @Override
+    public Object getIndex() {
+      return index;
+    }
+    
+    @Override
+    public void setIndex(Object idx) {
+      this.index = idx;
+    }
+    
+  }
+  
+  private static class Ref extends SoftReference<SimpleCacheEntry> {
     public String blockId;
     
-    public Ref(String blockId, byte buf[], ReferenceQueue<byte[]> q) {
-      super(buf, q);
+    public Ref(String blockId, SimpleCacheEntry sce, ReferenceQueue<SimpleCacheEntry>
q) {
+      super(sce, q);
       this.blockId = blockId;
     }
   }
   
   private Map<String,Ref> cache = new HashMap<String,Ref>();
   
-  private ReferenceQueue<byte[]> q = new ReferenceQueue<byte[]>();
+  private ReferenceQueue<SimpleCacheEntry> q = new ReferenceQueue<SimpleCacheEntry>();
   public int dumps = 0;
   
   /**
@@ -64,7 +91,7 @@ public class SimpleBlockCache implements
     return cache.size();
   }
   
-  public synchronized byte[] getBlock(String blockName) {
+  public synchronized SimpleCacheEntry getBlock(String blockName) {
     processQueue(); // clear out some crap.
     Ref ref = cache.get(blockName);
     if (ref == null)
@@ -72,15 +99,24 @@ public class SimpleBlockCache implements
     return ref.get();
   }
   
-  public synchronized void cacheBlock(String blockName, byte buf[]) {
-    cache.put(blockName, new Ref(blockName, buf, q));
+  public synchronized SimpleCacheEntry cacheBlock(String blockName, byte buf[]) {
+    SimpleCacheEntry sce = new SimpleCacheEntry(buf);
+    cache.put(blockName, new Ref(blockName, sce, q));
+    return sce;
   }
   
-  public synchronized void cacheBlock(String blockName, byte buf[], boolean inMemory) {
-    cache.put(blockName, new Ref(blockName, buf, q));
+  public synchronized SimpleCacheEntry cacheBlock(String blockName, byte buf[], boolean inMemory)
{
+    SimpleCacheEntry sce = new SimpleCacheEntry(buf);
+    cache.put(blockName, new Ref(blockName, sce, q));
+    return sce;
   }
   
   public void shutdown() {
     // noop
   }
+  
+  @Override
+  public long getMaxSize() {
+    return Long.MAX_VALUE;
+  }
 }

Modified: accumulo/trunk/core/src/main/java/org/apache/accumulo/core/file/blockfile/impl/CachableBlockFile.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/core/src/main/java/org/apache/accumulo/core/file/blockfile/impl/CachableBlockFile.java?rev=1362072&r1=1362071&r2=1362072&view=diff
==============================================================================
--- accumulo/trunk/core/src/main/java/org/apache/accumulo/core/file/blockfile/impl/CachableBlockFile.java
(original)
+++ accumulo/trunk/core/src/main/java/org/apache/accumulo/core/file/blockfile/impl/CachableBlockFile.java
Mon Jul 16 14:53:58 2012
@@ -21,13 +21,14 @@ import java.io.DataInputStream;
 import java.io.DataOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
+import java.lang.ref.SoftReference;
 
 import org.apache.accumulo.core.file.blockfile.ABlockReader;
 import org.apache.accumulo.core.file.blockfile.ABlockWriter;
 import org.apache.accumulo.core.file.blockfile.BlockFileReader;
 import org.apache.accumulo.core.file.blockfile.BlockFileWriter;
 import org.apache.accumulo.core.file.blockfile.cache.BlockCache;
-import org.apache.accumulo.core.file.blockfile.cache.LruBlockCache;
+import org.apache.accumulo.core.file.blockfile.cache.CacheEntry;
 import org.apache.accumulo.core.file.rfile.bcfile.BCFile;
 import org.apache.accumulo.core.file.rfile.bcfile.BCFile.Reader.BlockReader;
 import org.apache.accumulo.core.file.rfile.bcfile.BCFile.Writer.BlockAppender;
@@ -142,8 +143,8 @@ public class CachableBlockFile {
   public static class Reader implements BlockFileReader {
     private BCFile.Reader _bc;
     private String fileName = "not_available";
-    private LruBlockCache _dCache = null;
-    private LruBlockCache _iCache = null;
+    private BlockCache _dCache = null;
+    private BlockCache _iCache = null;
     private FSDataInputStream fin = null;
     private FileSystem fs;
     private Configuration conf;
@@ -224,12 +225,18 @@ public class CachableBlockFile {
        */
       
       fileName = dataFile.toString();
-      this._dCache = (LruBlockCache) data;
-      this._iCache = (LruBlockCache) index;
+      this._dCache = data;
+      this._iCache = index;
       this.fs = fs;
       this.conf = conf;
     }
     
+    public Reader(FSDataInputStream fsin, long len, Configuration conf, BlockCache data,
BlockCache index) throws IOException {
+      this._dCache = data;
+      this._iCache = index;
+      init(fsin, len, conf);
+    }
+
     public Reader(FSDataInputStream fsin, long len, Configuration conf) throws IOException
{
       // this.fin = fsin;
       init(fsin, len, conf);
@@ -255,13 +262,12 @@ public class CachableBlockFile {
     
     public BlockRead getCachedMetaBlock(String blockName) throws IOException {
       String _lookup = fileName + "M" + blockName;
-      byte b[] = null;
       
       if (_iCache != null) {
-        b = _iCache.getBlock(_lookup);
+        CacheEntry cacheEntry = _iCache.getBlock(_lookup);
         
-        if (b != null) {
-          return new BlockRead(new DataInputStream(new ByteArrayInputStream(b)), b.length);
+        if (cacheEntry != null) {
+          return new CachedBlockRead(cacheEntry, cacheEntry.getBuffer());
         }
         
       }
@@ -287,16 +293,16 @@ public class CachableBlockFile {
       }
     }
     
-    private BlockRead getBlock(String _lookup, LruBlockCache cache, BlockLoader loader) throws
IOException {
+    private BlockRead getBlock(String _lookup, BlockCache cache, BlockLoader loader) throws
IOException {
       
       BlockReader _currBlock;
       
       if (cache != null) {
-        byte b[] = null;
-        b = cache.getBlock(_lookup);
+        CacheEntry cb = null;
+        cb = cache.getBlock(_lookup);
         
-        if (b != null) {
-          return new BlockRead(new DataInputStream(new ByteArrayInputStream(b)), b.length);
+        if (cb != null) {
+          return new CachedBlockRead(cb, cb.getBuffer());
         }
         
       }
@@ -313,7 +319,7 @@ public class CachableBlockFile {
       
     }
     
-    private BlockRead cacheBlock(String _lookup, LruBlockCache cache, BlockReader _currBlock,
String block) throws IOException {
+    private BlockRead cacheBlock(String _lookup, BlockCache cache, BlockReader _currBlock,
String block) throws IOException {
       
       if ((cache == null) || (_currBlock.getRawSize() > cache.getMaxSize())) {
         return new BlockRead(_currBlock, _currBlock.getRawSize());
@@ -334,13 +340,17 @@ public class CachableBlockFile {
           _currBlock.close();
         }
         
+        CacheEntry ce = null;
         try {
-          cache.cacheBlock(_lookup, b);
+          ce = cache.cacheBlock(_lookup, b);
         } catch (Exception e) {
           log.warn("Already cached block: " + _lookup, e);
         }
         
-        return new BlockRead(new DataInputStream(new ByteArrayInputStream(b)), b.length);
+        if (ce == null)
+          return new BlockRead(new DataInputStream(new ByteArrayInputStream(b)), b.length);
+        else
+          return new CachedBlockRead(ce, ce.getBuffer());
         
       }
     }
@@ -399,6 +409,82 @@ public class CachableBlockFile {
     
   }
   
+  static class SeekableByteArrayInputStream extends ByteArrayInputStream {
+    
+    public SeekableByteArrayInputStream(byte[] buf) {
+      super(buf);
+    }
+    
+    public SeekableByteArrayInputStream(byte buf[], int offset, int length) {
+      super(buf, offset, length);
+      throw new UnsupportedOperationException("Seek code assumes offset is zero"); // do
not need this constructor, documenting that seek will not work
+                                                                                  // unless
offset it kept track of
+    }
+    
+    public void seek(int position) {
+      if (pos < 0 || pos >= buf.length)
+        throw new IllegalArgumentException("pos = " + pos + " buf.lenght = " + buf.length);
+      this.pos = position;
+    }
+    
+    public int getPosition() {
+      return this.pos;
+    }
+    
+  }
+
+  public static class CachedBlockRead extends BlockRead {
+    private SeekableByteArrayInputStream seekableInput;
+    private CacheEntry cb;
+    
+    public CachedBlockRead(CacheEntry cb, byte buf[]) {
+      this(new SeekableByteArrayInputStream(buf), buf.length);
+      this.cb = cb;
+    }
+    
+    private CachedBlockRead(SeekableByteArrayInputStream seekableInput, long size) {
+      super(seekableInput, size);
+      this.seekableInput = seekableInput;
+    }
+
+    @Override
+    public void seek(int position) {
+      seekableInput.seek(position);
+    }
+    
+    @Override
+    public int getPosition() {
+      return seekableInput.getPosition();
+    }
+    
+    @Override
+    public boolean isIndexable() {
+      return true;
+    }
+    
+    @Override
+    public <T> T getIndex(Class<T> clazz) {
+      T bi = null;
+      synchronized (cb) {
+        @SuppressWarnings("unchecked")
+        SoftReference<T> softRef = (SoftReference<T>) cb.getIndex();
+        if (softRef != null)
+          bi = softRef.get();
+        
+        if (bi == null) {
+          try {
+            bi = clazz.newInstance();
+          } catch (Exception e) {
+            throw new RuntimeException(e);
+          }
+          cb.setIndex(new SoftReference<T>(bi));
+        }
+      }
+      
+      return bi;
+    }
+  }
+
   /**
    * 
    * Class provides functionality to read one block from the underlying BCFile Since We are
caching blocks in the Reader class as bytearrays, this class will
@@ -430,5 +516,25 @@ public class CachableBlockFile {
       return this;
     }
     
+    @Override
+    public boolean isIndexable() {
+      return false;
+    }
+    
+    @Override
+    public void seek(int position) {
+      throw new UnsupportedOperationException();
+    }
+    
+    @Override
+    public int getPosition() {
+      throw new UnsupportedOperationException();
+    }
+    
+    @Override
+    public <T> T getIndex(Class<T> clazz) {
+      throw new UnsupportedOperationException();
+    }
+    
   }
 }

Added: accumulo/trunk/core/src/main/java/org/apache/accumulo/core/file/rfile/BlockIndex.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/core/src/main/java/org/apache/accumulo/core/file/rfile/BlockIndex.java?rev=1362072&view=auto
==============================================================================
--- accumulo/trunk/core/src/main/java/org/apache/accumulo/core/file/rfile/BlockIndex.java
(added)
+++ accumulo/trunk/core/src/main/java/org/apache/accumulo/core/file/rfile/BlockIndex.java
Mon Jul 16 14:53:58 2012
@@ -0,0 +1,168 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.core.file.rfile;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.file.blockfile.ABlockReader;
+import org.apache.accumulo.core.file.rfile.MultiLevelIndex.IndexEntry;
+
+/**
+ * 
+ */
+public class BlockIndex {
+  
+  public static BlockIndex getIndex(ABlockReader cacheBlock, IndexEntry indexEntry) throws
IOException {
+    
+    BlockIndex blockIndex = cacheBlock.getIndex(BlockIndex.class);
+    
+    int accessCount = blockIndex.accessCount.incrementAndGet();
+    
+    // 1 is a power of two, but do not care about it
+    if (accessCount >= 2 && isPowerOfTwo(accessCount)) {
+      blockIndex.buildIndex(accessCount, cacheBlock, indexEntry);
+    }
+    
+    if (blockIndex.blockIndex != null)
+      return blockIndex;
+
+    return null;
+  }
+  
+  private static boolean isPowerOfTwo(int x) {
+    return ((x > 0) && (x & (x - 1)) == 0);
+  }
+  
+  private AtomicInteger accessCount = new AtomicInteger(0);
+  private volatile BlockIndexEntry[] blockIndex = null;
+
+  public static class BlockIndexEntry implements Comparable<BlockIndexEntry> {
+    
+    private Key key;
+    private int entriesLeft;
+    private int pos;
+    
+    public BlockIndexEntry(int pos, int entriesLeft, Key key) {
+      this.pos = pos;
+      this.entriesLeft = entriesLeft;
+      this.key = key;
+    }
+
+    /**
+     * @param startKey
+     */
+    public BlockIndexEntry(Key key) {
+      this.key = key;
+    }
+
+    public Key getKey() {
+      return key;
+    }
+    
+    public int getEntriesLeft() {
+      return entriesLeft;
+    }
+
+    @Override
+    public int compareTo(BlockIndexEntry o) {
+      return key.compareTo(o.key);
+    }
+    
+    public String toString() {
+      return key + " " + entriesLeft + " " + pos;
+    }
+  }
+  
+  public BlockIndexEntry seekBlock(Key startKey, ABlockReader cacheBlock) {
+
+    
+    // get a local ref to the index, another thread could change it
+    BlockIndexEntry[] blockIndex = this.blockIndex;
+    
+    int pos = Arrays.binarySearch(blockIndex, new BlockIndexEntry(startKey));
+
+    int index;
+    
+    if (pos < 0) {
+      if (pos == -1)
+        return null; // less than the first key in index, did not index the first key in
block so just return null... code calling this will scan from beginning
+                     // of block
+      index = (pos * -1) - 2;
+    } else {
+      // found exact key in index
+      index = pos;
+    }
+    
+    // handle case where multiple keys in block are exactly the same, want to find the earliest
key in the index
+    while (index - 1 > 0) {
+      if (blockIndex[index].getKey().equals(blockIndex[index - 1].getKey()))
+        index--;
+      else
+        break;
+
+    }
+    
+    if (index == 0 && blockIndex[index].getKey().equals(startKey))
+      return null;
+
+    BlockIndexEntry bie = blockIndex[index];
+    cacheBlock.seek(bie.pos);
+    return bie;
+  }
+  
+  private synchronized void buildIndex(int indexEntries, ABlockReader cacheBlock, IndexEntry
indexEntry) throws IOException {
+    cacheBlock.seek(0);
+    
+    RelativeKey rk = new RelativeKey();
+    Value val = new Value();
+    
+    int interval = indexEntry.getNumEntries() / indexEntries;
+    
+    if (interval <= 32)
+      return;
+    
+    // multiple threads could try to create the index with different sizes, do not replace
a large index with a smaller one
+    if (this.blockIndex != null && this.blockIndex.length > indexEntries - 1)
+      return;
+
+    int count = 0;
+    
+    ArrayList<BlockIndexEntry> index = new ArrayList<BlockIndexEntry>(indexEntries
- 1);
+
+    while (count < (indexEntry.getNumEntries() - interval + 1)) {
+
+      int pos = cacheBlock.getPosition();
+      rk.readFields(cacheBlock);
+      val.readFields(cacheBlock);
+
+      if (count > 0 && count % interval == 0) {
+        index.add(new BlockIndexEntry(pos, indexEntry.getNumEntries() - count, rk.getKey()));
+      }
+      
+      count++;
+    }
+
+    this.blockIndex = index.toArray(new BlockIndexEntry[index.size()]);
+
+    cacheBlock.seek(0);
+  }
+}

Modified: accumulo/trunk/core/src/main/java/org/apache/accumulo/core/file/rfile/RFile.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/core/src/main/java/org/apache/accumulo/core/file/rfile/RFile.java?rev=1362072&r1=1362071&r2=1362072&view=diff
==============================================================================
--- accumulo/trunk/core/src/main/java/org/apache/accumulo/core/file/rfile/RFile.java (original)
+++ accumulo/trunk/core/src/main/java/org/apache/accumulo/core/file/rfile/RFile.java Mon Jul
16 14:53:58 2012
@@ -52,6 +52,7 @@ import org.apache.accumulo.core.file.blo
 import org.apache.accumulo.core.file.blockfile.BlockFileReader;
 import org.apache.accumulo.core.file.blockfile.BlockFileWriter;
 import org.apache.accumulo.core.file.blockfile.impl.CachableBlockFile;
+import org.apache.accumulo.core.file.rfile.BlockIndex.BlockIndexEntry;
 import org.apache.accumulo.core.file.rfile.MultiLevelIndex.IndexEntry;
 import org.apache.accumulo.core.file.rfile.MultiLevelIndex.Reader.IndexIterator;
 import org.apache.accumulo.core.file.rfile.RelativeKey.MByteSequence;
@@ -670,6 +671,12 @@ public class RFile {
         if (startKey.compareTo(getTopKey()) >= 0 && startKey.compareTo(iiter.peekPrevious().getKey())
<= 0) {
           // start key is within the unconsumed portion of the current block
           
+          // this code intentionally does not use the index associated with a cached block
+          // because if only forward seeks are being done, then there is no benefit to building
+          // and index for the block... could consider using the index if it exist but not
+          // causing the build of an index... doing this could slow down some use cases and
+          // and speed up others.
+
           MByteSequence valbs = new MByteSequence(new byte[64], 0, 0);
           RelativeKey tmpRk = new RelativeKey();
           Key pKey = new Key(getTopKey());
@@ -717,9 +724,35 @@ public class RFile {
           entriesLeft = indexEntry.getNumEntries();
           currBlock = getDataBlock(indexEntry);
 
-          MByteSequence valbs = new MByteSequence(new byte[64], 0, 0);
           RelativeKey tmpRk = new RelativeKey();
-          fastSkipped = tmpRk.fastSkip(currBlock, startKey, valbs, prevKey, null);
+          MByteSequence valbs = new MByteSequence(new byte[64], 0, 0);
+
+          Key currKey = null;
+
+          if (currBlock.isIndexable()) {
+            BlockIndex blockIndex = BlockIndex.getIndex(currBlock, indexEntry);
+            if (blockIndex != null) {
+              BlockIndexEntry bie = blockIndex.seekBlock(startKey, currBlock);
+              if (bie != null) {
+                // we are seeked to the current position of the key in the index
+                // need to prime the read process and read this key from the block
+                tmpRk.setPrevKey(bie.getKey());
+                tmpRk.readFields(currBlock);
+                val = new Value();
+
+                val.readFields(currBlock);
+                valbs = new MByteSequence(val.get(), 0, val.getSize());
+                
+                // just consumed one key from the input stream, so subtract one from entries
left
+                entriesLeft = bie.getEntriesLeft() - 1;
+                prevKey = new Key(bie.getKey());
+                currKey = bie.getKey();
+              }
+            }
+            
+          }
+
+          fastSkipped = tmpRk.fastSkip(currBlock, startKey, valbs, prevKey, currKey);
           entriesLeft -= fastSkipped;
           val = new Value(valbs.toArray());
           // set rk when everything above is successful, if exception
@@ -789,7 +822,7 @@ public class RFile {
     
     private AtomicBoolean interruptFlag;
     
-    Reader(BlockFileReader rdr) throws IOException {
+    public Reader(BlockFileReader rdr) throws IOException {
       this.reader = rdr;
       
       ABlockReader mb = reader.getMetaBlock("RFile.index");

Modified: accumulo/trunk/core/src/main/java/org/apache/accumulo/core/file/rfile/RelativeKey.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/core/src/main/java/org/apache/accumulo/core/file/rfile/RelativeKey.java?rev=1362072&r1=1362071&r2=1362072&view=diff
==============================================================================
--- accumulo/trunk/core/src/main/java/org/apache/accumulo/core/file/rfile/RelativeKey.java
(original)
+++ accumulo/trunk/core/src/main/java/org/apache/accumulo/core/file/rfile/RelativeKey.java
Mon Jul 16 14:53:58 2012
@@ -111,6 +111,10 @@ public class RelativeKey implements Writ
       fieldsSame |= DELETED;
   }
   
+  public void setPrevKey(Key pk) {
+    this.prevKey = pk;
+  }
+  
   @Override
   public void readFields(DataInput in) throws IOException {
     fieldsSame = in.readByte();

Modified: accumulo/trunk/core/src/test/java/org/apache/accumulo/core/file/blockfile/cache/TestLruBlockCache.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/core/src/test/java/org/apache/accumulo/core/file/blockfile/cache/TestLruBlockCache.java?rev=1362072&r1=1362071&r2=1362072&view=diff
==============================================================================
--- accumulo/trunk/core/src/test/java/org/apache/accumulo/core/file/blockfile/cache/TestLruBlockCache.java
(original)
+++ accumulo/trunk/core/src/test/java/org/apache/accumulo/core/file/blockfile/cache/TestLruBlockCache.java
Mon Jul 16 14:53:58 2012
@@ -83,9 +83,9 @@ public class TestLruBlockCache extends T
     
     // Check if all blocks are properly cached and retrieved
     for (Block block : blocks) {
-      byte buf1[] = cache.getBlock(block.blockName);
-      assertTrue(buf1 != null);
-      assertEquals(buf1.length, block.buf.length);
+      CacheEntry ce = cache.getBlock(block.blockName);
+      assertTrue(ce != null);
+      assertEquals(ce.getBuffer().length, block.buf.length);
     }
     
     // Verify correctly calculated cache heap size
@@ -93,9 +93,9 @@ public class TestLruBlockCache extends T
     
     // Check if all blocks are properly cached and retrieved
     for (Block block : blocks) {
-      byte buf1[] = cache.getBlock(block.blockName);
-      assertTrue(buf1 != null);
-      assertEquals(buf1.length, block.buf.length);
+      CacheEntry ce = cache.getBlock(block.blockName);
+      assertTrue(ce != null);
+      assertEquals(ce.getBuffer().length, block.buf.length);
     }
     
     // Expect no evictions
@@ -138,7 +138,7 @@ public class TestLruBlockCache extends T
     assertTrue(cache.getBlock(blocks[0].blockName) == null);
     assertTrue(cache.getBlock(blocks[1].blockName) == null);
     for (int i = 2; i < blocks.length; i++) {
-      assertEquals(cache.getBlock(blocks[i].blockName), blocks[i].buf);
+      assertEquals(cache.getBlock(blocks[i].blockName).getBuffer(), blocks[i].buf);
     }
   }
   
@@ -163,7 +163,7 @@ public class TestLruBlockCache extends T
     for (Block block : multiBlocks) {
       cache.cacheBlock(block.blockName, block.buf);
       expectedCacheSize += block.heapSize();
-      assertEquals(cache.getBlock(block.blockName), block.buf);
+      assertEquals(cache.getBlock(block.blockName).getBuffer(), block.buf);
     }
     
     // Add the single blocks (no get)
@@ -196,8 +196,8 @@ public class TestLruBlockCache extends T
     
     // And all others to be cached
     for (int i = 1; i < 4; i++) {
-      assertEquals(cache.getBlock(singleBlocks[i].blockName), singleBlocks[i].buf);
-      assertEquals(cache.getBlock(multiBlocks[i].blockName), multiBlocks[i].buf);
+      assertEquals(cache.getBlock(singleBlocks[i].blockName).getBuffer(), singleBlocks[i].buf);
+      assertEquals(cache.getBlock(multiBlocks[i].blockName).getBuffer(), multiBlocks[i].buf);
     }
   }
   
@@ -429,9 +429,9 @@ public class TestLruBlockCache extends T
     
     // And the newest 5 blocks should still be accessible
     for (int i = 5; i < 10; i++) {
-      assertEquals(singleBlocks[i].buf, cache.getBlock(singleBlocks[i].blockName));
-      assertEquals(multiBlocks[i].buf, cache.getBlock(multiBlocks[i].blockName));
-      assertEquals(memoryBlocks[i].buf, cache.getBlock(memoryBlocks[i].blockName));
+      assertEquals(singleBlocks[i].buf, cache.getBlock(singleBlocks[i].blockName).getBuffer());
+      assertEquals(multiBlocks[i].buf, cache.getBlock(multiBlocks[i].blockName).getBuffer());
+      assertEquals(memoryBlocks[i].buf, cache.getBlock(memoryBlocks[i].blockName).getBuffer());
     }
   }
   

Modified: accumulo/trunk/core/src/test/java/org/apache/accumulo/core/file/rfile/RFileTest.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/core/src/test/java/org/apache/accumulo/core/file/rfile/RFileTest.java?rev=1362072&r1=1362071&r2=1362072&view=diff
==============================================================================
--- accumulo/trunk/core/src/test/java/org/apache/accumulo/core/file/rfile/RFileTest.java (original)
+++ accumulo/trunk/core/src/test/java/org/apache/accumulo/core/file/rfile/RFileTest.java Mon
Jul 16 14:53:58 2012
@@ -26,6 +26,7 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashSet;
 import java.util.Iterator;
+import java.util.Random;
 import java.util.Set;
 
 import junit.framework.TestCase;
@@ -37,6 +38,7 @@ import org.apache.accumulo.core.data.Par
 import org.apache.accumulo.core.data.Range;
 import org.apache.accumulo.core.data.Value;
 import org.apache.accumulo.core.file.FileSKVIterator;
+import org.apache.accumulo.core.file.blockfile.cache.LruBlockCache;
 import org.apache.accumulo.core.file.blockfile.impl.CachableBlockFile;
 import org.apache.accumulo.core.file.rfile.RFile.Reader;
 import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
@@ -177,7 +179,11 @@ public class RFileTest extends TestCase 
       byte[] data = baos.toByteArray();
       bais = new SeekableByteArrayInputStream(data);
       in = new FSDataInputStream(bais);
-      CachableBlockFile.Reader _cbr = new CachableBlockFile.Reader(in, data.length, conf);
+      
+      LruBlockCache indexCache = new LruBlockCache(100000000, 100000);
+      LruBlockCache dataCache = new LruBlockCache(100000000, 100000);
+      
+      CachableBlockFile.Reader _cbr = new CachableBlockFile.Reader(in, data.length, conf,
dataCache, indexCache);
       reader = new RFile.Reader(_cbr);
       iter = new ColumnFamilySkippingIterator(reader);
       
@@ -301,10 +307,10 @@ public class RFileTest extends TestCase 
         }
       }
     }
-    
+		
     // trf.writer.append(nk("r1","cf1","cq1","L1", 55), nv("foo"));
     trf.closeWriter();
-    
+
     trf.openReader();
     // seek before everything
     trf.iter.seek(new Range((Key) null, null), EMPTY_COL_FAMS, false);
@@ -384,6 +390,20 @@ public class RFileTest extends TestCase 
     
     assertEquals(expectedKeys.get(expectedKeys.size() - 1), trf.reader.getLastKey());
     
+    // test seeking to random location and reading all data from that point
+    // there was an off by one bug with this in the transient index
+    Random rand = new Random();
+    for (int i = 0; i < 12; i++) {
+      index = rand.nextInt(expectedKeys.size());
+      trf.seek(expectedKeys.get(index));
+      for (; index < expectedKeys.size(); index++) {
+        assertTrue(trf.iter.hasTop());
+        assertEquals(expectedKeys.get(index), trf.iter.getTopKey());
+        assertEquals(expectedValues.get(index), trf.iter.getTopValue());
+        trf.iter.next();
+      }
+    }
+
     trf.closeReader();
   }
   
@@ -1203,7 +1223,7 @@ public class RFileTest extends TestCase 
     assertFalse(trf.iter.hasTop());
     
     trf.iter.seek(new Range(nk("r0000", "cf1", "cq1", "", 1), false, nk("r0001", "cf1", "cq1",
"", 1), true), EMPTY_COL_FAMS, false);
-    
+		
     for (int i = 2048; i < 4096; i++) {
       assertTrue(trf.iter.hasTop());
       assertEquals(nk("r0001", "cf1", "cq1", "", 1), trf.iter.getTopKey());



Mime
View raw message