accumulo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ctubb...@apache.org
Subject [27/61] [abbrv] [partial] accumulo git commit: ACCUMULO-722 put trunk in my sandbox
Date Thu, 03 Mar 2016 21:59:52 GMT
http://git-wip-us.apache.org/repos/asf/accumulo/blob/7bdbfccb/1.5/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/BlockCache.java
----------------------------------------------------------------------
diff --git a/1.5/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/BlockCache.java b/1.5/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/BlockCache.java
new file mode 100644
index 0000000..54bc701
--- /dev/null
+++ b/1.5/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/BlockCache.java
@@ -0,0 +1,68 @@
+/**
+ * Copyright 2009 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.core.file.blockfile.cache;
+
+/**
+ * Block cache interface. TODO: Add filename or hash of filename to block cache key.
+ */
+public interface BlockCache {
+  /**
+   * Add block to cache.
+   * 
+   * @param blockName
+   *          Zero-based file block number.
+   * @param buf
+   *          The block contents wrapped in a ByteBuffer.
+   * @param inMemory
+   *          Whether block should be treated as in-memory
+   */
+  public CacheEntry cacheBlock(String blockName, byte buf[], boolean inMemory);
+  
+  /**
+   * Add block to cache (defaults to not in-memory).
+   * 
+   * @param blockName
+   *          Zero-based file block number.
+   * @param buf
+   *          The block contents wrapped in a ByteBuffer.
+   */
+  public CacheEntry cacheBlock(String blockName, byte buf[]);
+  
+  /**
+   * Fetch block from cache.
+   * 
+   * @param blockName
+   *          Block number to fetch.
+   * @return Block or null if block is not in the cache.
+   */
+  public CacheEntry getBlock(String blockName);
+  
+  /**
+   * Shutdown the cache.
+   */
+  public void shutdown();
+  
+  /**
+   * Get the maximum size of this cache.
+   *
+   * @return max size in bytes
+   */
+  public long getMaxSize();
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/7bdbfccb/1.5/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/CacheEntry.java
----------------------------------------------------------------------
diff --git a/1.5/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/CacheEntry.java b/1.5/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/CacheEntry.java
new file mode 100644
index 0000000..609e374
--- /dev/null
+++ b/1.5/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/CacheEntry.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.core.file.blockfile.cache;
+
+public interface CacheEntry {
+  byte[] getBuffer();
+  
+  public Object getIndex();
+  
+  public void setIndex(Object idx);
+  
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/7bdbfccb/1.5/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/CachedBlock.java
----------------------------------------------------------------------
diff --git a/1.5/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/CachedBlock.java b/1.5/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/CachedBlock.java
new file mode 100644
index 0000000..486bf1e
--- /dev/null
+++ b/1.5/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/CachedBlock.java
@@ -0,0 +1,115 @@
+/**
+ * Copyright 2009 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.core.file.blockfile.cache;
+
+
+/**
+ * Represents an entry in the {@link LruBlockCache}.
+ * 
+ * <p>
+ * Makes the block memory-aware with {@link HeapSize} and Comparable to sort by access time for the LRU. It also takes care of priority by either instantiating
+ * as in-memory or handling the transition from single to multiple access.
+ */
+public class CachedBlock implements HeapSize, Comparable<CachedBlock>, CacheEntry {
+  
+  public final static long PER_BLOCK_OVERHEAD = ClassSize.align(ClassSize.OBJECT + (3 * ClassSize.REFERENCE) + (2 * SizeConstants.SIZEOF_LONG)
+      + ClassSize.STRING + ClassSize.BYTE_BUFFER);
+  
+  static enum BlockPriority {
+    /**
+     * Accessed a single time (used for scan-resistance)
+     */
+    SINGLE,
+    /**
+     * Accessed multiple times
+     */
+    MULTI,
+    /**
+     * Block from in-memory store
+     */
+    MEMORY
+  };
+  
+  private final String blockName;
+  private final byte buf[];
+  private volatile long accessTime;
+  private long size;
+  private BlockPriority priority;
+  private Object index;
+  
+  public CachedBlock(String blockName, byte buf[], long accessTime) {
+    this(blockName, buf, accessTime, false);
+  }
+  
+  public CachedBlock(String blockName, byte buf[], long accessTime, boolean inMemory) {
+    this.blockName = blockName;
+    this.buf = buf;
+    this.accessTime = accessTime;
+    this.size = ClassSize.align(blockName.length()) + ClassSize.align(buf.length) + PER_BLOCK_OVERHEAD;
+    if (inMemory) {
+      this.priority = BlockPriority.MEMORY;
+    } else {
+      this.priority = BlockPriority.SINGLE;
+    }
+  }
+  
+  /**
+   * Block has been accessed. Update its local access time.
+   */
+  public void access(long accessTime) {
+    this.accessTime = accessTime;
+    if (this.priority == BlockPriority.SINGLE) {
+      this.priority = BlockPriority.MULTI;
+    }
+  }
+  
+  public long heapSize() {
+    return size;
+  }
+  
+  public int compareTo(CachedBlock that) {
+    if (this.accessTime == that.accessTime)
+      return 0;
+    return this.accessTime < that.accessTime ? 1 : -1;
+  }
+  
+  @Override
+  public byte[] getBuffer() {
+    return this.buf;
+  }
+  
+  public String getName() {
+    return this.blockName;
+  }
+  
+  public BlockPriority getPriority() {
+    return this.priority;
+  }
+  
+  @Override
+  public Object getIndex() {
+    return index;
+  }
+  
+  @Override
+  public void setIndex(Object idx) {
+    this.index = idx;
+  }
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/7bdbfccb/1.5/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/CachedBlockQueue.java
----------------------------------------------------------------------
diff --git a/1.5/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/CachedBlockQueue.java b/1.5/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/CachedBlockQueue.java
new file mode 100644
index 0000000..ee43c3e
--- /dev/null
+++ b/1.5/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/CachedBlockQueue.java
@@ -0,0 +1,121 @@
+/**
+ * Copyright 2009 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.core.file.blockfile.cache;
+
+import java.util.LinkedList;
+import java.util.PriorityQueue;
+
+/**
+ * A memory-bound queue that will grow until an element brings total size >= maxSize. From then on, only entries that are sorted larger than the smallest
+ * current entry will be inserted/replaced.
+ * 
+ * <p>
+ * Use this when you want to find the largest elements (according to their ordering, not their heap size) that consume as close to the specified maxSize as
+ * possible. Default behavior is to grow just above rather than just below specified max.
+ * 
+ * <p>
+ * Object used in this queue must implement {@link HeapSize} as well as {@link Comparable}.
+ */
+public class CachedBlockQueue implements HeapSize {
+  
+  private PriorityQueue<CachedBlock> queue;
+  
+  private long heapSize;
+  private long maxSize;
+  
+  /**
+   * @param maxSize
+   *          the target size of elements in the queue
+   * @param blockSize
+   *          expected average size of blocks
+   */
+  public CachedBlockQueue(long maxSize, long blockSize) {
+    int initialSize = (int) Math.ceil(maxSize / (double) blockSize);
+    if (initialSize == 0)
+      initialSize++;
+    queue = new PriorityQueue<CachedBlock>(initialSize);
+    heapSize = 0;
+    this.maxSize = maxSize;
+  }
+  
+  /**
+   * Attempt to add the specified cached block to this queue.
+   * 
+   * <p>
+   * If the queue is smaller than the max size, or if the specified element is ordered before the smallest element in the queue, the element will be added to
+   * the queue. Otherwise, there is no side effect of this call.
+   * 
+   * @param cb
+   *          block to try to add to the queue
+   */
+  public void add(CachedBlock cb) {
+    if (heapSize < maxSize) {
+      queue.add(cb);
+      heapSize += cb.heapSize();
+    } else {
+      CachedBlock head = queue.peek();
+      if (cb.compareTo(head) > 0) {
+        heapSize += cb.heapSize();
+        heapSize -= head.heapSize();
+        if (heapSize > maxSize) {
+          queue.poll();
+        } else {
+          heapSize += head.heapSize();
+        }
+        queue.add(cb);
+      }
+    }
+  }
+  
+  /**
+   * Get a sorted List of all elements in this queue, in descending order.
+   * 
+   * @return list of cached elements in descending order
+   */
+  public CachedBlock[] get() {
+    LinkedList<CachedBlock> blocks = new LinkedList<CachedBlock>();
+    while (!queue.isEmpty()) {
+      blocks.addFirst(queue.poll());
+    }
+    return blocks.toArray(new CachedBlock[blocks.size()]);
+  }
+  
+  /**
+   * Get a sorted List of all elements in this queue, in descending order.
+   * 
+   * @return list of cached elements in descending order
+   */
+  public LinkedList<CachedBlock> getList() {
+    LinkedList<CachedBlock> blocks = new LinkedList<CachedBlock>();
+    while (!queue.isEmpty()) {
+      blocks.addFirst(queue.poll());
+    }
+    return blocks;
+  }
+  
+  /**
+   * Total size of all elements in this queue.
+   * 
+   * @return size of all elements currently in queue, in bytes
+   */
+  public long heapSize() {
+    return heapSize;
+  }
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/7bdbfccb/1.5/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/ClassSize.java
----------------------------------------------------------------------
diff --git a/1.5/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/ClassSize.java b/1.5/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/ClassSize.java
new file mode 100644
index 0000000..986f52a
--- /dev/null
+++ b/1.5/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/ClassSize.java
@@ -0,0 +1,275 @@
+/**
+ * Copyright 2009 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.accumulo.core.file.blockfile.cache;
+
+import java.lang.reflect.Field;
+import java.lang.reflect.Modifier;
+import java.util.Properties;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ * Class for determining the "size" of a class, an attempt to calculate the actual bytes that an object of this class will occupy in memory
+ * 
+ * The core of this class is taken from the Derby project
+ */
+public class ClassSize {
+  static final Log LOG = LogFactory.getLog(ClassSize.class);
+  
+  private static int nrOfRefsPerObj = 2;
+  
+  /** Array overhead */
+  public static int ARRAY = 0;
+  
+  /** Overhead for ArrayList(0) */
+  public static int ARRAYLIST = 0;
+  
+  /** Overhead for ByteBuffer */
+  public static int BYTE_BUFFER = 0;
+  
+  /** Overhead for an Integer */
+  public static int INTEGER = 0;
+  
+  /** Overhead for entry in map */
+  public static int MAP_ENTRY = 0;
+  
+  /** Object overhead is minimum 2 * reference size (8 bytes on 64-bit) */
+  public static int OBJECT = 0;
+  
+  /** Reference size is 8 bytes on 64-bit, 4 bytes on 32-bit */
+  public static int REFERENCE = 0;
+  
+  /** String overhead */
+  public static int STRING = 0;
+  
+  /** Overhead for TreeMap */
+  public static int TREEMAP = 0;
+  
+  /** Overhead for ConcurrentHashMap */
+  public static int CONCURRENT_HASHMAP = 0;
+  
+  /** Overhead for ConcurrentHashMap.Entry */
+  public static int CONCURRENT_HASHMAP_ENTRY = 0;
+  
+  /** Overhead for ConcurrentHashMap.Segment */
+  public static int CONCURRENT_HASHMAP_SEGMENT = 0;
+  
+  /** Overhead for ConcurrentSkipListMap */
+  public static int CONCURRENT_SKIPLISTMAP = 0;
+  
+  /** Overhead for ConcurrentSkipListMap Entry */
+  public static int CONCURRENT_SKIPLISTMAP_ENTRY = 0;
+  
+  /** Overhead for ReentrantReadWriteLock */
+  public static int REENTRANT_LOCK = 0;
+  
+  /** Overhead for AtomicLong */
+  public static int ATOMIC_LONG = 0;
+  
+  /** Overhead for AtomicInteger */
+  public static int ATOMIC_INTEGER = 0;
+  
+  /** Overhead for AtomicBoolean */
+  public static int ATOMIC_BOOLEAN = 0;
+  
+  /** Overhead for CopyOnWriteArraySet */
+  public static int COPYONWRITE_ARRAYSET = 0;
+  
+  /** Overhead for CopyOnWriteArrayList */
+  public static int COPYONWRITE_ARRAYLIST = 0;
+  
+  private static final String THIRTY_TWO = "32";
+  
+  /**
+   * Method for reading the arc settings and setting overheads according to 32-bit or 64-bit architecture.
+   */
+  static {
+    // Figure out whether this is a 32 or 64 bit machine.
+    Properties sysProps = System.getProperties();
+    String arcModel = sysProps.getProperty("sun.arch.data.model");
+    
+    // Default value is set to 8, covering the case when arcModel is unknown
+    REFERENCE = 8;
+    if (arcModel.equals(THIRTY_TWO)) {
+      REFERENCE = 4;
+    }
+    
+    OBJECT = 2 * REFERENCE;
+    
+    ARRAY = 3 * REFERENCE;
+    
+    ARRAYLIST = align(OBJECT + align(REFERENCE) + align(ARRAY) + (2 * SizeConstants.SIZEOF_INT));
+    
+    BYTE_BUFFER = align(OBJECT + align(REFERENCE) + align(ARRAY) + (5 * SizeConstants.SIZEOF_INT) + (3 * SizeConstants.SIZEOF_BOOLEAN)
+        + SizeConstants.SIZEOF_LONG);
+    
+    INTEGER = align(OBJECT + SizeConstants.SIZEOF_INT);
+    
+    MAP_ENTRY = align(OBJECT + 5 * REFERENCE + SizeConstants.SIZEOF_BOOLEAN);
+    
+    TREEMAP = align(OBJECT + (2 * SizeConstants.SIZEOF_INT) + align(7 * REFERENCE));
+    
+    STRING = align(OBJECT + ARRAY + REFERENCE + 3 * SizeConstants.SIZEOF_INT);
+    
+    CONCURRENT_HASHMAP = align((2 * SizeConstants.SIZEOF_INT) + ARRAY + (6 * REFERENCE) + OBJECT);
+    
+    CONCURRENT_HASHMAP_ENTRY = align(REFERENCE + OBJECT + (3 * REFERENCE) + (2 * SizeConstants.SIZEOF_INT));
+    
+    CONCURRENT_HASHMAP_SEGMENT = align(REFERENCE + OBJECT + (3 * SizeConstants.SIZEOF_INT) + SizeConstants.SIZEOF_FLOAT + ARRAY);
+    
+    CONCURRENT_SKIPLISTMAP = align(SizeConstants.SIZEOF_INT + OBJECT + (8 * REFERENCE));
+    
+    CONCURRENT_SKIPLISTMAP_ENTRY = align(align(OBJECT + (3 * REFERENCE)) + /* one node per entry */
+    align((OBJECT + (3 * REFERENCE)) / 2)); /* one index per two entries */
+    
+    REENTRANT_LOCK = align(OBJECT + (3 * REFERENCE));
+    
+    ATOMIC_LONG = align(OBJECT + SizeConstants.SIZEOF_LONG);
+    
+    ATOMIC_INTEGER = align(OBJECT + SizeConstants.SIZEOF_INT);
+    
+    ATOMIC_BOOLEAN = align(OBJECT + SizeConstants.SIZEOF_BOOLEAN);
+    
+    COPYONWRITE_ARRAYSET = align(OBJECT + REFERENCE);
+    
+    COPYONWRITE_ARRAYLIST = align(OBJECT + (2 * REFERENCE) + ARRAY);
+  }
+  
+  /**
+   * The estimate of the size of a class instance depends on whether the JVM uses 32 or 64 bit addresses, that is it depends on the size of an object reference.
+   * It is a linear function of the size of a reference, e.g. 24 + 5*r where r is the size of a reference (usually 4 or 8 bytes).
+   * 
+   * This method returns the coefficients of the linear function, e.g. {24, 5} in the above example.
+   * 
+   * @param cl
+   *          A class whose instance size is to be estimated
+   * @return an array of 3 integers. The first integer is the size of the primitives, the second the number of arrays and the third the number of references.
+   */
+  private static int[] getSizeCoefficients(Class<?> cl, boolean debug) {
+    int primitives = 0;
+    int arrays = 0;
+    // The number of references that a new object takes
+    int references = nrOfRefsPerObj;
+    
+    for (; null != cl; cl = cl.getSuperclass()) {
+      Field[] field = cl.getDeclaredFields();
+      if (null != field) {
+        for (int i = 0; i < field.length; i++) {
+          if (!Modifier.isStatic(field[i].getModifiers())) {
+            Class<?> fieldClass = field[i].getType();
+            if (fieldClass.isArray()) {
+              arrays++;
+              references++;
+            } else if (!fieldClass.isPrimitive()) {
+              references++;
+            } else {// Is simple primitive
+              String name = fieldClass.getName();
+              
+              if (name.equals("int") || name.equals("I"))
+                primitives += SizeConstants.SIZEOF_INT;
+              else if (name.equals("long") || name.equals("J"))
+                primitives += SizeConstants.SIZEOF_LONG;
+              else if (name.equals("boolean") || name.equals("Z"))
+                primitives += SizeConstants.SIZEOF_BOOLEAN;
+              else if (name.equals("short") || name.equals("S"))
+                primitives += SizeConstants.SIZEOF_SHORT;
+              else if (name.equals("byte") || name.equals("B"))
+                primitives += SizeConstants.SIZEOF_BYTE;
+              else if (name.equals("char") || name.equals("C"))
+                primitives += SizeConstants.SIZEOF_CHAR;
+              else if (name.equals("float") || name.equals("F"))
+                primitives += SizeConstants.SIZEOF_FLOAT;
+              else if (name.equals("double") || name.equals("D"))
+                primitives += SizeConstants.SIZEOF_DOUBLE;
+            }
+            if (debug) {
+              if (LOG.isDebugEnabled()) {
+                // Write out region name as string and its encoded name.
+                LOG.debug(field[i].getName() + "\n\t" + field[i].getType());
+              }
+            }
+          }
+        }
+      }
+    }
+    return new int[] {primitives, arrays, references};
+  }
+  
+  /**
+   * Estimate the static space taken up by a class instance given the coefficients returned by getSizeCoefficients.
+   * 
+   * @param coeff
+   *          the coefficients
+   * 
+   * @return the size estimate, in bytes
+   */
+  private static long estimateBaseFromCoefficients(int[] coeff, boolean debug) {
+    long size = coeff[0] + align(coeff[1] * ARRAY) + coeff[2] * REFERENCE;
+    
+    // Round up to a multiple of 8
+    size = align(size);
+    if (debug) {
+      if (LOG.isDebugEnabled()) {
+        // Write out region name as string and its encoded name.
+        LOG.debug("Primitives " + coeff[0] + ", arrays " + coeff[1] + ", references(includes " + nrOfRefsPerObj + " for object overhead) " + coeff[2]
+            + ", refSize " + REFERENCE + ", size " + size);
+      }
+    }
+    return size;
+  }
+  
+  /**
+   * Estimate the static space taken up by the fields of a class. This includes the space taken up by by references (the pointer) but not by the referenced
+   * object. So the estimated size of an array field does not depend on the size of the array. Similarly the size of an object (reference) field does not depend
+   * on the object.
+   * 
+   * @return the size estimate in bytes.
+   */
+  public static long estimateBase(Class<?> cl, boolean debug) {
+    return estimateBaseFromCoefficients(getSizeCoefficients(cl, debug), debug);
+  }
+  
+  /**
+   * Aligns a number to 8.
+   * 
+   * @param num
+   *          number to align to 8
+   * @return smallest number >= input that is a multiple of 8
+   */
+  public static int align(int num) {
+    return (int) (align((long) num));
+  }
+  
+  /**
+   * Aligns a number to 8.
+   * 
+   * @param num
+   *          number to align to 8
+   * @return smallest number >= input that is a multiple of 8
+   */
+  public static long align(long num) {
+    // The 7 comes from that the alignSize is 8 which is the number of bytes
+    // stored and sent together
+    return ((num + 7) >> 3) << 3;
+  }
+  
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/7bdbfccb/1.5/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/HeapSize.java
----------------------------------------------------------------------
diff --git a/1.5/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/HeapSize.java b/1.5/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/HeapSize.java
new file mode 100644
index 0000000..70bbaaa
--- /dev/null
+++ b/1.5/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/HeapSize.java
@@ -0,0 +1,45 @@
+/**
+ * Copyright 2009 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.core.file.blockfile.cache;
+
+/**
+ * Implementations can be asked for an estimate of their size in bytes.
+ * <p>
+ * Useful for sizing caches. Its a given that implementation approximations do not account for 32 vs 64 bit nor for different VM implementations.
+ * <p>
+ * An Object's size is determined by the non-static data members in it, as well as the fixed {@link Object} overhead.
+ * <p>
+ * For example:
+ * 
+ * <pre>
+ * public class SampleObject implements HeapSize {
+ *   
+ *   int[] numbers;
+ *   int x;
+ * }
+ * </pre>
+ */
+public interface HeapSize {
+  /**
+   * @return Approximate 'exclusive deep size' of implementing object. Includes count of payload and hosting object sizings.
+   */
+  public long heapSize();
+  
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/7bdbfccb/1.5/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/LruBlockCache.java
----------------------------------------------------------------------
diff --git a/1.5/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/LruBlockCache.java b/1.5/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/LruBlockCache.java
new file mode 100644
index 0000000..56de2d2
--- /dev/null
+++ b/1.5/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/LruBlockCache.java
@@ -0,0 +1,685 @@
+/**
+ * Copyright 2009 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.core.file.blockfile.cache;
+
+import java.lang.ref.WeakReference;
+import java.util.PriorityQueue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.apache.accumulo.core.util.NamingThreadFactory;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ * A block cache implementation that is memory-aware using {@link HeapSize}, memory-bound using an LRU eviction algorithm, and concurrent: backed by a
+ * {@link ConcurrentHashMap} and with a non-blocking eviction thread giving constant-time {@link #cacheBlock} and {@link #getBlock} operations.
+ * <p>
+ * 
+ * Contains three levels of block priority to allow for scan-resistance and in-memory families. A block is added with an inMemory flag if necessary, otherwise a
+ * block becomes a single access priority. Once a blocked is accessed again, it changes to multiple access. This is used to prevent scans from thrashing the
+ * cache, adding a least-frequently-used element to the eviction algorithm.
+ * <p>
+ * 
+ * Each priority is given its own chunk of the total cache to ensure fairness during eviction. Each priority will retain close to its maximum size, however, if
+ * any priority is not using its entire chunk the others are able to grow beyond their chunk size.
+ * <p>
+ * 
+ * Instantiated at a minimum with the total size and average block size. All sizes are in bytes. The block size is not especially important as this cache is
+ * fully dynamic in its sizing of blocks. It is only used for pre-allocating data structures and in initial heap estimation of the map.
+ * <p>
+ * 
+ * The detailed constructor defines the sizes for the three priorities (they should total to the maximum size defined). It also sets the levels that trigger and
+ * control the eviction thread.
+ * <p>
+ * 
+ * The acceptable size is the cache size level which triggers the eviction process to start. It evicts enough blocks to get the size below the minimum size
+ * specified.
+ * <p>
+ * 
+ * Eviction happens in a separate thread and involves a single full-scan of the map. It determines how many bytes must be freed to reach the minimum size, and
+ * then while scanning determines the fewest least-recently-used blocks necessary from each of the three priorities (would be 3 times bytes to free). It then
+ * uses the priority chunk sizes to evict fairly according to the relative sizes and usage.
+ */
+public class LruBlockCache implements BlockCache, HeapSize {
+  
+  static final Log LOG = LogFactory.getLog(LruBlockCache.class);
+  
+  /** Default Configuration Parameters */
+  
+  /** Backing Concurrent Map Configuration */
+  static final float DEFAULT_LOAD_FACTOR = 0.75f;
+  static final int DEFAULT_CONCURRENCY_LEVEL = 16;
+  
+  /** Eviction thresholds */
+  static final float DEFAULT_MIN_FACTOR = 0.75f;
+  static final float DEFAULT_ACCEPTABLE_FACTOR = 0.85f;
+  
+  /** Priority buckets */
+  static final float DEFAULT_SINGLE_FACTOR = 0.25f;
+  static final float DEFAULT_MULTI_FACTOR = 0.50f;
+  static final float DEFAULT_MEMORY_FACTOR = 0.25f;
+  
+  /** Statistics thread */
+  static final int statThreadPeriod = 60;
+  
+  /** Concurrent map (the cache) */
+  private final ConcurrentHashMap<String,CachedBlock> map;
+  
+  /** Eviction lock (locked when eviction in process) */
+  private final ReentrantLock evictionLock = new ReentrantLock(true);
+  
+  /** Volatile boolean to track if we are in an eviction process or not */
+  private volatile boolean evictionInProgress = false;
+  
+  /** Eviction thread */
+  private final EvictionThread evictionThread;
+  
+  /** Statistics thread schedule pool (for heavy debugging, could remove) */
+  private final ScheduledExecutorService scheduleThreadPool = Executors.newScheduledThreadPool(1, new NamingThreadFactory("LRUBlockCacheStats"));
+  
+  /** Current size of cache */
+  private final AtomicLong size;
+  
+  /** Current number of cached elements */
+  private final AtomicLong elements;
+  
+  /** Cache access count (sequential ID) */
+  private final AtomicLong count;
+  
+  /** Cache statistics */
+  private final CacheStats stats;
+  
+  /** Maximum allowable size of cache (block put if size > max, evict) */
+  private long maxSize;
+  
+  /** Approximate block size */
+  private long blockSize;
+  
+  /** Acceptable size of cache (no evictions if size < acceptable) */
+  private float acceptableFactor;
+  
+  /** Minimum threshold of cache (when evicting, evict until size < min) */
+  private float minFactor;
+  
+  /** Single access bucket size */
+  private float singleFactor;
+  
+  /** Multiple access bucket size */
+  private float multiFactor;
+  
+  /** In-memory bucket size */
+  private float memoryFactor;
+  
+  /** Overhead of the structure itself */
+  private long overhead;
+  
+  /**
+   * Default constructor. Specify maximum size and expected average block size (approximation is fine).
+   * 
+   * <p>
+   * All other factors will be calculated based on defaults specified in this class.
+   * 
+   * @param maxSize
+   *          maximum size of cache, in bytes
+   * @param blockSize
+   *          approximate size of each block, in bytes
+   */
+  public LruBlockCache(long maxSize, long blockSize) {
+    this(maxSize, blockSize, true);
+  }
+  
+  /**
+   * Constructor used for testing. Allows disabling of the eviction thread.
+   */
+  public LruBlockCache(long maxSize, long blockSize, boolean evictionThread) {
+    this(maxSize, blockSize, evictionThread, (int) Math.ceil(1.2 * maxSize / blockSize), DEFAULT_LOAD_FACTOR, DEFAULT_CONCURRENCY_LEVEL, DEFAULT_MIN_FACTOR,
+        DEFAULT_ACCEPTABLE_FACTOR, DEFAULT_SINGLE_FACTOR, DEFAULT_MULTI_FACTOR, DEFAULT_MEMORY_FACTOR);
+  }
+  
+  /**
+   * Configurable constructor. Use this constructor if not using defaults.
+   * 
+   * @param maxSize
+   *          maximum size of this cache, in bytes
+   * @param blockSize
+   *          expected average size of blocks, in bytes
+   * @param evictionThread
+   *          whether to run evictions in a bg thread or not
+   * @param mapInitialSize
+   *          initial size of backing ConcurrentHashMap
+   * @param mapLoadFactor
+   *          initial load factor of backing ConcurrentHashMap
+   * @param mapConcurrencyLevel
+   *          initial concurrency factor for backing CHM
+   * @param minFactor
+   *          percentage of total size that eviction will evict until
+   * @param acceptableFactor
+   *          percentage of total size that triggers eviction
+   * @param singleFactor
+   *          percentage of total size for single-access blocks
+   * @param multiFactor
+   *          percentage of total size for multiple-access blocks
+   * @param memoryFactor
+   *          percentage of total size for in-memory blocks
+   */
+  public LruBlockCache(long maxSize, long blockSize, boolean evictionThread, int mapInitialSize, float mapLoadFactor, int mapConcurrencyLevel, float minFactor,
+      float acceptableFactor, float singleFactor, float multiFactor, float memoryFactor) {
+    if (singleFactor + multiFactor + memoryFactor != 1) {
+      throw new IllegalArgumentException("Single, multi, and memory factors " + " should total 1.0");
+    }
+    if (minFactor >= acceptableFactor) {
+      throw new IllegalArgumentException("minFactor must be smaller than acceptableFactor");
+    }
+    if (minFactor >= 1.0f || acceptableFactor >= 1.0f) {
+      throw new IllegalArgumentException("all factors must be < 1");
+    }
+    this.maxSize = maxSize;
+    this.blockSize = blockSize;
+    map = new ConcurrentHashMap<String,CachedBlock>(mapInitialSize, mapLoadFactor, mapConcurrencyLevel);
+    this.minFactor = minFactor;
+    this.acceptableFactor = acceptableFactor;
+    this.singleFactor = singleFactor;
+    this.multiFactor = multiFactor;
+    this.memoryFactor = memoryFactor;
+    this.stats = new CacheStats();
+    this.count = new AtomicLong(0);
+    this.elements = new AtomicLong(0);
+    this.overhead = calculateOverhead(maxSize, blockSize, mapConcurrencyLevel);
+    this.size = new AtomicLong(this.overhead);
+    
+    if (evictionThread) {
+      this.evictionThread = new EvictionThread(this);
+      this.evictionThread.start();
+      while (!this.evictionThread.running()) {
+        try {
+          Thread.sleep(10);
+        } catch (InterruptedException ex) {
+          throw new RuntimeException(ex);
+        }
+      }
+    } else {
+      this.evictionThread = null;
+    }
+    this.scheduleThreadPool.scheduleAtFixedRate(new StatisticsThread(this), statThreadPeriod, statThreadPeriod, TimeUnit.SECONDS);
+  }
+  
+  public void setMaxSize(long maxSize) {
+    this.maxSize = maxSize;
+    if (this.size.get() > acceptableSize() && !evictionInProgress) {
+      runEviction();
+    }
+  }
+  
+  // BlockCache implementation
+  
+  /**
+   * Cache the block with the specified name and buffer.
+   * <p>
+   * It is assumed this will NEVER be called on an already cached block. If that is done, it is assumed that you are reinserting the same exact block due to a
+   * race condition and will update the buffer but not modify the size of the cache.
+   * 
+   * @param blockName
+   *          block name
+   * @param buf
+   *          block buffer
+   * @param inMemory
+   *          if block is in-memory
+   */
+  public CacheEntry cacheBlock(String blockName, byte buf[], boolean inMemory) {
+    CachedBlock cb = map.get(blockName);
+    if (cb != null) {
+      stats.duplicateReads();
+      cb.access(count.incrementAndGet());
+      
+    } else {
+      cb = new CachedBlock(blockName, buf, count.incrementAndGet(), inMemory);
+      long newSize = size.addAndGet(cb.heapSize());
+      map.put(blockName, cb);
+      elements.incrementAndGet();
+      if (newSize > acceptableSize() && !evictionInProgress) {
+        runEviction();
+      }
+    }
+    
+    return cb;
+  }
+  
+  /**
+   * Cache the block with the specified name and buffer.
+   * <p>
+   * It is assumed this will NEVER be called on an already cached block. If that is done, it is assumed that you are reinserting the same exact block due to a
+   * race condition and will update the buffer but not modify the size of the cache.
+   * 
+   * @param blockName
+   *          block name
+   * @param buf
+   *          block buffer
+   */
+  public CacheEntry cacheBlock(String blockName, byte buf[]) {
+    return cacheBlock(blockName, buf, false);
+  }
+  
+  /**
+   * Get the buffer of the block with the specified name.
+   * 
+   * @param blockName
+   *          block name
+   * @return buffer of specified block name, or null if not in cache
+   */
+  
+  public CachedBlock getBlock(String blockName) {
+    CachedBlock cb = map.get(blockName);
+    if (cb == null) {
+      stats.miss();
+      return null;
+    }
+    stats.hit();
+    cb.access(count.incrementAndGet());
+    return cb;
+  }
+  
+  protected long evictBlock(CachedBlock block) {
+    map.remove(block.getName());
+    size.addAndGet(-1 * block.heapSize());
+    elements.decrementAndGet();
+    stats.evicted();
+    return block.heapSize();
+  }
+  
+  /**
+   * Multi-threaded call to run the eviction process.
+   */
+  private void runEviction() {
+    if (evictionThread == null) {
+      evict();
+    } else {
+      evictionThread.evict();
+    }
+  }
+  
+  /**
+   * Eviction method.
+   */
+  void evict() {
+    
+    // Ensure only one eviction at a time
+    if (!evictionLock.tryLock())
+      return;
+    
+    try {
+      evictionInProgress = true;
+      
+      long bytesToFree = size.get() - minSize();
+      
+      LOG.debug("Block cache LRU eviction started.  Attempting to free " + bytesToFree + " bytes");
+      
+      if (bytesToFree <= 0)
+        return;
+      
+      // Instantiate priority buckets
+      BlockBucket bucketSingle = new BlockBucket(bytesToFree, blockSize, singleSize());
+      BlockBucket bucketMulti = new BlockBucket(bytesToFree, blockSize, multiSize());
+      BlockBucket bucketMemory = new BlockBucket(bytesToFree, blockSize, memorySize());
+      
+      // Scan entire map putting into appropriate buckets
+      for (CachedBlock cachedBlock : map.values()) {
+        switch (cachedBlock.getPriority()) {
+          case SINGLE: {
+            bucketSingle.add(cachedBlock);
+            break;
+          }
+          case MULTI: {
+            bucketMulti.add(cachedBlock);
+            break;
+          }
+          case MEMORY: {
+            bucketMemory.add(cachedBlock);
+            break;
+          }
+        }
+      }
+      
+      PriorityQueue<BlockBucket> bucketQueue = new PriorityQueue<BlockBucket>(3);
+      
+      bucketQueue.add(bucketSingle);
+      bucketQueue.add(bucketMulti);
+      bucketQueue.add(bucketMemory);
+      
+      int remainingBuckets = 3;
+      long bytesFreed = 0;
+      
+      BlockBucket bucket;
+      while ((bucket = bucketQueue.poll()) != null) {
+        long overflow = bucket.overflow();
+        if (overflow > 0) {
+          long bucketBytesToFree = Math.min(overflow, (long) Math.ceil((bytesToFree - bytesFreed) / (double) remainingBuckets));
+          bytesFreed += bucket.free(bucketBytesToFree);
+        }
+        remainingBuckets--;
+      }
+      
+      float singleMB = ((float) bucketSingle.totalSize()) / ((float) (1024 * 1024));
+      float multiMB = ((float) bucketMulti.totalSize()) / ((float) (1024 * 1024));
+      float memoryMB = ((float) bucketMemory.totalSize()) / ((float) (1024 * 1024));
+      
+      LOG.debug("Block cache LRU eviction completed. " + "Freed " + bytesFreed + " bytes.  " + "Priority Sizes: " + "Single=" + singleMB + "MB ("
+          + bucketSingle.totalSize() + "), " + "Multi=" + multiMB + "MB (" + bucketMulti.totalSize() + ")," + "Memory=" + memoryMB + "MB ("
+          + bucketMemory.totalSize() + ")");
+      
+    } finally {
+      stats.evict();
+      evictionInProgress = false;
+      evictionLock.unlock();
+    }
+  }
+  
+  /**
+   * Used to group blocks into priority buckets. There will be a BlockBucket for each priority (single, multi, memory). Once bucketed, the eviction algorithm
+   * takes the appropriate number of elements out of each according to configuration parameters and their relatives sizes.
+   */
+  private class BlockBucket implements Comparable<BlockBucket> {
+    
+    private CachedBlockQueue queue;
+    private long totalSize = 0;
+    private long bucketSize;
+    
+    public BlockBucket(long bytesToFree, long blockSize, long bucketSize) {
+      this.bucketSize = bucketSize;
+      queue = new CachedBlockQueue(bytesToFree, blockSize);
+      totalSize = 0;
+    }
+    
+    public void add(CachedBlock block) {
+      totalSize += block.heapSize();
+      queue.add(block);
+    }
+    
+    public long free(long toFree) {
+      CachedBlock[] blocks = queue.get();
+      long freedBytes = 0;
+      for (int i = 0; i < blocks.length; i++) {
+        freedBytes += evictBlock(blocks[i]);
+        if (freedBytes >= toFree) {
+          return freedBytes;
+        }
+      }
+      return freedBytes;
+    }
+    
+    public long overflow() {
+      return totalSize - bucketSize;
+    }
+    
+    public long totalSize() {
+      return totalSize;
+    }
+    
+    public int compareTo(BlockBucket that) {
+      if (this.overflow() == that.overflow())
+        return 0;
+      return this.overflow() > that.overflow() ? 1 : -1;
+    }
+  }
+  
+  /**
+   * Get the maximum size of this cache.
+   * 
+   * @return max size in bytes
+   */
+  public long getMaxSize() {
+    return this.maxSize;
+  }
+  
+  /**
+   * Get the current size of this cache.
+   * 
+   * @return current size in bytes
+   */
+  public long getCurrentSize() {
+    return this.size.get();
+  }
+  
+  /**
+   * Get the current size of this cache.
+   * 
+   * @return current size in bytes
+   */
+  public long getFreeSize() {
+    return getMaxSize() - getCurrentSize();
+  }
+  
+  /**
+   * Get the size of this cache (number of cached blocks)
+   * 
+   * @return number of cached blocks
+   */
+  public long size() {
+    return this.elements.get();
+  }
+  
+  /**
+   * Get the number of eviction runs that have occurred
+   */
+  public long getEvictionCount() {
+    return this.stats.getEvictionCount();
+  }
+  
+  /**
+   * Get the number of blocks that have been evicted during the lifetime of this cache.
+   */
+  public long getEvictedCount() {
+    return this.stats.getEvictedCount();
+  }
+  
+  /*
+   * Eviction thread. Sits in waiting state until an eviction is triggered when the cache size grows above the acceptable level.<p>
+   * 
+   * Thread is triggered into action by {@link LruBlockCache#runEviction()}
+   */
+  private static class EvictionThread extends Thread {
+    private WeakReference<LruBlockCache> cache;
+    private boolean running = false;
+    
+    public EvictionThread(LruBlockCache cache) {
+      super("LruBlockCache.EvictionThread");
+      setDaemon(true);
+      this.cache = new WeakReference<LruBlockCache>(cache);
+    }
+    
+    public synchronized boolean running() {
+      return running;
+    }
+    
+    @Override
+    public void run() {
+      while (true) {
+        synchronized (this) {
+          running = true;
+          try {
+            this.wait();
+          } catch (InterruptedException e) {}
+        }
+        LruBlockCache cache = this.cache.get();
+        if (cache == null)
+          break;
+        cache.evict();
+      }
+    }
+    
+    public void evict() {
+      synchronized (this) {
+        this.notify();
+      }
+    }
+  }
+  
+  /*
+   * Statistics thread. Periodically prints the cache statistics to the log.
+   */
+  private static class StatisticsThread extends Thread {
+    LruBlockCache lru;
+    
+    public StatisticsThread(LruBlockCache lru) {
+      super("LruBlockCache.StatisticsThread");
+      setDaemon(true);
+      this.lru = lru;
+    }
+    
+    @Override
+    public void run() {
+      lru.logStats();
+    }
+  }
+  
+  public void logStats() {
+    // Log size
+    long totalSize = heapSize();
+    long freeSize = maxSize - totalSize;
+    float sizeMB = ((float) totalSize) / ((float) (1024 * 1024));
+    float freeMB = ((float) freeSize) / ((float) (1024 * 1024));
+    float maxMB = ((float) maxSize) / ((float) (1024 * 1024));
+    LruBlockCache.LOG.debug("Cache Stats: Sizes: " + "Total=" + sizeMB + "MB (" + totalSize + "), " + "Free=" + freeMB + "MB (" + freeSize + "), " + "Max="
+        + maxMB + "MB (" + maxSize + ")" + ", Counts: " + "Blocks=" + size() + ", " + "Access=" + stats.getRequestCount() + ", " + "Hit=" + stats.getHitCount()
+        + ", " + "Miss=" + stats.getMissCount() + ", " + "Evictions=" + stats.getEvictionCount() + ", " + "Evicted=" + stats.getEvictedCount() + ", Ratios: "
+        + "Hit Ratio=" + stats.getHitRatio() * 100 + "%, " + "Miss Ratio=" + stats.getMissRatio() * 100 + "%, " + "Evicted/Run=" + stats.evictedPerEviction()
+        + ", " + "Duplicate Reads=" + stats.getDuplicateReads());
+  }
+  
+  /**
+   * Get counter statistics for this cache.
+   * 
+   * <p>
+   * Includes: total accesses, hits, misses, evicted blocks, and runs of the eviction processes.
+   */
+  public CacheStats getStats() {
+    return this.stats;
+  }
+  
+  public static class CacheStats {
+    private final AtomicLong accessCount = new AtomicLong(0);
+    private final AtomicLong hitCount = new AtomicLong(0);
+    private final AtomicLong missCount = new AtomicLong(0);
+    private final AtomicLong evictionCount = new AtomicLong(0);
+    private final AtomicLong evictedCount = new AtomicLong(0);
+    private final AtomicLong duplicateReads = new AtomicLong(0);
+    
+    public void miss() {
+      missCount.incrementAndGet();
+      accessCount.incrementAndGet();
+    }
+    
+    public void hit() {
+      hitCount.incrementAndGet();
+      accessCount.incrementAndGet();
+    }
+    
+    public void evict() {
+      evictionCount.incrementAndGet();
+    }
+    
+    public void duplicateReads() {
+      duplicateReads.incrementAndGet();
+    }
+    
+    public void evicted() {
+      evictedCount.incrementAndGet();
+    }
+    
+    public long getRequestCount() {
+      return accessCount.get();
+    }
+    
+    public long getMissCount() {
+      return missCount.get();
+    }
+    
+    public long getHitCount() {
+      return hitCount.get();
+    }
+    
+    public long getEvictionCount() {
+      return evictionCount.get();
+    }
+    
+    public long getDuplicateReads() {
+      return duplicateReads.get();
+    }
+    
+    public long getEvictedCount() {
+      return evictedCount.get();
+    }
+    
+    public double getHitRatio() {
+      return ((float) getHitCount() / (float) getRequestCount());
+    }
+    
+    public double getMissRatio() {
+      return ((float) getMissCount() / (float) getRequestCount());
+    }
+    
+    public double evictedPerEviction() {
+      return (float) ((float) getEvictedCount() / (float) getEvictionCount());
+    }
+  }
+  
+  public final static long CACHE_FIXED_OVERHEAD = ClassSize.align((3 * SizeConstants.SIZEOF_LONG) + (8 * ClassSize.REFERENCE)
+      + (5 * SizeConstants.SIZEOF_FLOAT) + SizeConstants.SIZEOF_BOOLEAN + ClassSize.OBJECT);
+  
+  // HeapSize implementation
+  public long heapSize() {
+    return getCurrentSize();
+  }
+  
+  public static long calculateOverhead(long maxSize, long blockSize, int concurrency) {
+    return CACHE_FIXED_OVERHEAD + ClassSize.CONCURRENT_HASHMAP + ((int) Math.ceil(maxSize * 1.2 / blockSize) * ClassSize.CONCURRENT_HASHMAP_ENTRY)
+        + (concurrency * ClassSize.CONCURRENT_HASHMAP_SEGMENT);
+  }
+  
+  // Simple calculators of sizes given factors and maxSize
+  
+  private long acceptableSize() {
+    return (long) Math.floor(this.maxSize * this.acceptableFactor);
+  }
+  
+  private long minSize() {
+    return (long) Math.floor(this.maxSize * this.minFactor);
+  }
+  
+  private long singleSize() {
+    return (long) Math.floor(this.maxSize * this.singleFactor * this.minFactor);
+  }
+  
+  private long multiSize() {
+    return (long) Math.floor(this.maxSize * this.multiFactor * this.minFactor);
+  }
+  
+  private long memorySize() {
+    return (long) Math.floor(this.maxSize * this.memoryFactor * this.minFactor);
+  }
+  
+  public void shutdown() {
+    this.scheduleThreadPool.shutdown();
+  }
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/7bdbfccb/1.5/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/SimpleBlockCache.java
----------------------------------------------------------------------
diff --git a/1.5/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/SimpleBlockCache.java b/1.5/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/SimpleBlockCache.java
new file mode 100644
index 0000000..2524cd8
--- /dev/null
+++ b/1.5/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/SimpleBlockCache.java
@@ -0,0 +1,122 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the
+ * License.
+ * You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.accumulo.core.file.blockfile.cache;
+
+import java.lang.ref.ReferenceQueue;
+import java.lang.ref.SoftReference;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Simple one RFile soft reference cache.
+ */
+public class SimpleBlockCache implements BlockCache {
+  
+  private static class SimpleCacheEntry implements CacheEntry {
+    
+    private byte[] buffer;
+    private Object index;
+    
+    SimpleCacheEntry(byte[] buffer) {
+      this.buffer = buffer;
+    }
+    
+    @Override
+    public byte[] getBuffer() {
+      return buffer;
+    }
+    
+    @Override
+    public Object getIndex() {
+      return index;
+    }
+    
+    @Override
+    public void setIndex(Object idx) {
+      this.index = idx;
+    }
+    
+  }
+  
+  private static class Ref extends SoftReference<SimpleCacheEntry> {
+    public String blockId;
+    
+    public Ref(String blockId, SimpleCacheEntry sce, ReferenceQueue<SimpleCacheEntry> q) {
+      super(sce, q);
+      this.blockId = blockId;
+    }
+  }
+  
+  private Map<String,Ref> cache = new HashMap<String,Ref>();
+  
+  private ReferenceQueue<SimpleCacheEntry> q = new ReferenceQueue<SimpleCacheEntry>();
+  public int dumps = 0;
+  
+  /**
+   * Constructor
+   */
+  public SimpleBlockCache() {
+    super();
+  }
+  
+  void processQueue() {
+    Ref r;
+    while ((r = (Ref) q.poll()) != null) {
+      cache.remove(r.blockId);
+      dumps++;
+    }
+  }
+  
+  /**
+   * @return the size
+   */
+  public synchronized int size() {
+    processQueue();
+    return cache.size();
+  }
+  
+  public synchronized SimpleCacheEntry getBlock(String blockName) {
+    processQueue(); // clear out some crap.
+    Ref ref = cache.get(blockName);
+    if (ref == null)
+      return null;
+    return ref.get();
+  }
+  
+  public synchronized SimpleCacheEntry cacheBlock(String blockName, byte buf[]) {
+    SimpleCacheEntry sce = new SimpleCacheEntry(buf);
+    cache.put(blockName, new Ref(blockName, sce, q));
+    return sce;
+  }
+  
+  public synchronized SimpleCacheEntry cacheBlock(String blockName, byte buf[], boolean inMemory) {
+    SimpleCacheEntry sce = new SimpleCacheEntry(buf);
+    cache.put(blockName, new Ref(blockName, sce, q));
+    return sce;
+  }
+  
+  public void shutdown() {
+    // noop
+  }
+  
+  @Override
+  public long getMaxSize() {
+    return Long.MAX_VALUE;
+  }
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/7bdbfccb/1.5/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/SizeConstants.java
----------------------------------------------------------------------
diff --git a/1.5/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/SizeConstants.java b/1.5/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/SizeConstants.java
new file mode 100644
index 0000000..f3e483f
--- /dev/null
+++ b/1.5/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/SizeConstants.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.core.file.blockfile.cache;
+
+public class SizeConstants {
+  
+  public static final int SIZEOF_BOOLEAN = Byte.SIZE / Byte.SIZE;
+  
+  /**
+   * Size of byte in bytes
+   */
+  public static final int SIZEOF_BYTE = SIZEOF_BOOLEAN;
+  
+  /**
+   * Size of char in bytes
+   */
+  public static final int SIZEOF_CHAR = Character.SIZE / Byte.SIZE;
+  
+  /**
+   * Size of double in bytes
+   */
+  public static final int SIZEOF_DOUBLE = Double.SIZE / Byte.SIZE;
+  
+  /**
+   * Size of float in bytes
+   */
+  public static final int SIZEOF_FLOAT = Float.SIZE / Byte.SIZE;
+  
+  /**
+   * Size of int in bytes
+   */
+  public static final int SIZEOF_INT = Integer.SIZE / Byte.SIZE;
+  
+  /**
+   * Size of long in bytes
+   */
+  public static final int SIZEOF_LONG = Long.SIZE / Byte.SIZE;
+  
+  /**
+   * Size of short in bytes
+   */
+  public static final int SIZEOF_SHORT = Short.SIZE / Byte.SIZE;
+  
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/7bdbfccb/1.5/core/src/main/java/org/apache/accumulo/core/file/blockfile/impl/CachableBlockFile.java
----------------------------------------------------------------------
diff --git a/1.5/core/src/main/java/org/apache/accumulo/core/file/blockfile/impl/CachableBlockFile.java b/1.5/core/src/main/java/org/apache/accumulo/core/file/blockfile/impl/CachableBlockFile.java
new file mode 100644
index 0000000..4c96cbd
--- /dev/null
+++ b/1.5/core/src/main/java/org/apache/accumulo/core/file/blockfile/impl/CachableBlockFile.java
@@ -0,0 +1,540 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.core.file.blockfile.impl;
+
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.lang.ref.SoftReference;
+
+import org.apache.accumulo.core.file.blockfile.ABlockReader;
+import org.apache.accumulo.core.file.blockfile.ABlockWriter;
+import org.apache.accumulo.core.file.blockfile.BlockFileReader;
+import org.apache.accumulo.core.file.blockfile.BlockFileWriter;
+import org.apache.accumulo.core.file.blockfile.cache.BlockCache;
+import org.apache.accumulo.core.file.blockfile.cache.CacheEntry;
+import org.apache.accumulo.core.file.rfile.bcfile.BCFile;
+import org.apache.accumulo.core.file.rfile.bcfile.BCFile.Reader.BlockReader;
+import org.apache.accumulo.core.file.rfile.bcfile.BCFile.Writer.BlockAppender;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.log4j.Logger;
+
+/***
+ * 
+ * This is a wrapper class for BCFile that includes a cache for independent caches for datablocks and metadatablocks
+ */
+
+public class CachableBlockFile {
+  
+  private CachableBlockFile() {};
+  
+  private static final Logger log = Logger.getLogger(CachableBlockFile.class);
+  
+  public static class Writer implements BlockFileWriter {
+    private BCFile.Writer _bc;
+    private BlockWrite _bw;
+    private FSDataOutputStream fsout = null;
+    
+    public Writer(FileSystem fs, Path fName, String compressAlgor, Configuration conf) throws IOException {
+      this.fsout = fs.create(fName);
+      init(fsout, compressAlgor, conf);
+    }
+    
+    public Writer(FSDataOutputStream fsout, String compressAlgor, Configuration conf) throws IOException {
+      this.fsout = fsout;
+      init(fsout, compressAlgor, conf);
+    }
+    
+    private void init(FSDataOutputStream fsout, String compressAlgor, Configuration conf) throws IOException {
+      _bc = new BCFile.Writer(fsout, compressAlgor, conf, false);
+    }
+    
+    public ABlockWriter prepareMetaBlock(String name) throws IOException {
+      _bw = new BlockWrite(_bc.prepareMetaBlock(name));
+      return _bw;
+    }
+    
+    public ABlockWriter prepareMetaBlock(String name, String compressionName) throws IOException {
+      _bw = new BlockWrite(_bc.prepareMetaBlock(name, compressionName));
+      return _bw;
+    }
+    
+    public ABlockWriter prepareDataBlock() throws IOException {
+      _bw = new BlockWrite(_bc.prepareDataBlock());
+      return _bw;
+    }
+    
+    public void close() throws IOException {
+      
+      _bw.close();
+      _bc.close();
+      
+      if (this.fsout != null) {
+        this.fsout.close();
+      }
+      
+    }
+    
+  }
+  
+  public static class BlockWrite extends DataOutputStream implements ABlockWriter {
+    BlockAppender _ba;
+    
+    public BlockWrite(BlockAppender ba) {
+      super(ba);
+      this._ba = ba;
+    };
+    
+    @Override
+    public long getCompressedSize() throws IOException {
+      return _ba.getCompressedSize();
+    }
+    
+    @Override
+    public long getRawSize() throws IOException {
+      return _ba.getRawSize();
+    }
+    
+    @Override
+    public void close() throws IOException {
+      
+      _ba.close();
+    }
+    
+    @Override
+    public DataOutputStream getStream() throws IOException {
+      
+      return this;
+    }
+    
+    @Override
+    public long getStartPos() throws IOException {
+      return _ba.getStartPos();
+    }
+    
+  }
+  
+  /**
+   * 
+   * 
+   * Class wraps the BCFile reader.
+   * 
+   */
+  public static class Reader implements BlockFileReader {
+    private BCFile.Reader _bc;
+    private String fileName = "not_available";
+    private BlockCache _dCache = null;
+    private BlockCache _iCache = null;
+    private FSDataInputStream fin = null;
+    private FileSystem fs;
+    private Configuration conf;
+    private boolean closed = false;
+    
+    private interface BlockLoader {
+      BlockReader get() throws IOException;
+      
+      String getInfo();
+    }
+    
+    private class OffsetBlockLoader implements BlockLoader {
+      
+      private int blockIndex;
+      
+      OffsetBlockLoader(int blockIndex) {
+        this.blockIndex = blockIndex;
+      }
+      
+      @Override
+      public BlockReader get() throws IOException {
+        return getBCFile().getDataBlock(blockIndex);
+      }
+      
+      @Override
+      public String getInfo() {
+        return "" + blockIndex;
+      }
+      
+    }
+    
+    private class RawBlockLoader implements BlockLoader {
+      
+      private long offset;
+      private long compressedSize;
+      private long rawSize;
+      
+      RawBlockLoader(long offset, long compressedSize, long rawSize) {
+        this.offset = offset;
+        this.compressedSize = compressedSize;
+        this.rawSize = rawSize;
+      }
+      
+      @Override
+      public BlockReader get() throws IOException {
+        return getBCFile().getDataBlock(offset, compressedSize, rawSize);
+      }
+      
+      @Override
+      public String getInfo() {
+        return "" + offset + "," + compressedSize + "," + rawSize;
+      }
+    }
+    
+    private class MetaBlockLoader implements BlockLoader {
+      
+      private String name;
+      
+      MetaBlockLoader(String name) {
+        this.name = name;
+      }
+      
+      @Override
+      public BlockReader get() throws IOException {
+        return getBCFile().getMetaBlock(name);
+      }
+      
+      @Override
+      public String getInfo() {
+        return name;
+      }
+    }
+    
+    public Reader(FileSystem fs, Path dataFile, Configuration conf, BlockCache data, BlockCache index) throws IOException {
+      
+      /*
+       * Grab path create input stream grab len create file
+       */
+      
+      fileName = dataFile.toString();
+      this._dCache = data;
+      this._iCache = index;
+      this.fs = fs;
+      this.conf = conf;
+    }
+    
+    public Reader(FSDataInputStream fsin, long len, Configuration conf, BlockCache data, BlockCache index) throws IOException {
+      this._dCache = data;
+      this._iCache = index;
+      init(fsin, len, conf);
+    }
+
+    public Reader(FSDataInputStream fsin, long len, Configuration conf) throws IOException {
+      // this.fin = fsin;
+      init(fsin, len, conf);
+    }
+    
+    private void init(FSDataInputStream fsin, long len, Configuration conf) throws IOException {
+      this._bc = new BCFile.Reader(this, fsin, len, conf);
+    }
+    
+    private synchronized BCFile.Reader getBCFile() throws IOException {
+      if (closed)
+        throw new IllegalStateException("File " + fileName + " is closed");
+      
+      if (_bc == null) {
+        // lazily open file if needed
+        Path path = new Path(fileName);
+        fin = fs.open(path);
+        init(fin, fs.getFileStatus(path).getLen(), conf);
+      }
+      
+      return _bc;
+    }
+    
+    public BlockRead getCachedMetaBlock(String blockName) throws IOException {
+      String _lookup = fileName + "M" + blockName;
+      
+      if (_iCache != null) {
+        CacheEntry cacheEntry = _iCache.getBlock(_lookup);
+        
+        if (cacheEntry != null) {
+          return new CachedBlockRead(cacheEntry, cacheEntry.getBuffer());
+        }
+        
+      }
+      
+      return null;
+    }
+    
+    public BlockRead cacheMetaBlock(String blockName, BlockReader _currBlock) throws IOException {
+      String _lookup = fileName + "M" + blockName;
+      return cacheBlock(_lookup, _iCache, _currBlock, blockName);
+    }
+    
+    public void cacheMetaBlock(String blockName, byte[] b) {
+      
+      if (_iCache == null)
+        return;
+      
+      String _lookup = fileName + "M" + blockName;
+      try {
+        _iCache.cacheBlock(_lookup, b);
+      } catch (Exception e) {
+        log.warn("Already cached block: " + _lookup, e);
+      }
+    }
+    
+    private BlockRead getBlock(String _lookup, BlockCache cache, BlockLoader loader) throws IOException {
+      
+      BlockReader _currBlock;
+      
+      if (cache != null) {
+        CacheEntry cb = null;
+        cb = cache.getBlock(_lookup);
+        
+        if (cb != null) {
+          return new CachedBlockRead(cb, cb.getBuffer());
+        }
+        
+      }
+      /**
+       * grab the currBlock at this point the block is still in the data stream
+       * 
+       */
+      _currBlock = loader.get();
+      
+      /**
+       * If the block is bigger than the cache just return the stream
+       */
+      return cacheBlock(_lookup, cache, _currBlock, loader.getInfo());
+      
+    }
+    
+    private BlockRead cacheBlock(String _lookup, BlockCache cache, BlockReader _currBlock, String block) throws IOException {
+      
+      if ((cache == null) || (_currBlock.getRawSize() > cache.getMaxSize())) {
+        return new BlockRead(_currBlock, _currBlock.getRawSize());
+      } else {
+        
+        /**
+         * Try to fully read block for meta data if error try to close file
+         * 
+         */
+        byte b[] = null;
+        try {
+          b = new byte[(int) _currBlock.getRawSize()];
+          _currBlock.readFully(b);
+        } catch (IOException e) {
+          log.debug("Error full blockRead for file " + fileName + " for block " + block, e);
+          throw e;
+        } finally {
+          _currBlock.close();
+        }
+        
+        CacheEntry ce = null;
+        try {
+          ce = cache.cacheBlock(_lookup, b);
+        } catch (Exception e) {
+          log.warn("Already cached block: " + _lookup, e);
+        }
+        
+        if (ce == null)
+          return new BlockRead(new DataInputStream(new ByteArrayInputStream(b)), b.length);
+        else
+          return new CachedBlockRead(ce, ce.getBuffer());
+        
+      }
+    }
+    
+    /**
+     * It is intended that once the BlockRead object is returned to the caller, that the caller will read the entire block and then call close on the BlockRead
+     * class.
+     * 
+     * NOTE: In the case of multi-read threads: This method can do redundant work where an entry is read from disk and other threads check the cache before it
+     * has been inserted.
+     */
+    public BlockRead getMetaBlock(String blockName) throws IOException {
+      String _lookup = this.fileName + "M" + blockName;
+      return getBlock(_lookup, _iCache, new MetaBlockLoader(blockName));
+    }
+    
+    @Override
+    public ABlockReader getMetaBlock(long offset, long compressedSize, long rawSize) throws IOException {
+      String _lookup = this.fileName + "R" + offset;
+      return getBlock(_lookup, _iCache, new RawBlockLoader(offset, compressedSize, rawSize));
+    }
+    
+    /**
+     * It is intended that once the BlockRead object is returned to the caller, that the caller will read the entire block and then call close on the BlockRead
+     * class.
+     * 
+     * NOTE: In the case of multi-read threads: This method can do redundant work where an entry is read from disk and other threads check the cache before it
+     * has been inserted.
+     */
+    
+    public BlockRead getDataBlock(int blockIndex) throws IOException {
+      String _lookup = this.fileName + "O" + blockIndex;
+      return getBlock(_lookup, _dCache, new OffsetBlockLoader(blockIndex));
+      
+    }
+    
+    @Override
+    public ABlockReader getDataBlock(long offset, long compressedSize, long rawSize) throws IOException {
+      String _lookup = this.fileName + "R" + offset;
+      return getBlock(_lookup, _dCache, new RawBlockLoader(offset, compressedSize, rawSize));
+    }
+    
+    public synchronized void close() throws IOException {
+      if (closed)
+        return;
+      
+      closed = true;
+      
+      if (_bc != null)
+        _bc.close();
+      
+      if (fin != null) {
+        fin.close();
+      }
+    }
+    
+  }
+  
+  static class SeekableByteArrayInputStream extends ByteArrayInputStream {
+    
+    public SeekableByteArrayInputStream(byte[] buf) {
+      super(buf);
+    }
+    
+    public SeekableByteArrayInputStream(byte buf[], int offset, int length) {
+      super(buf, offset, length);
+      throw new UnsupportedOperationException("Seek code assumes offset is zero"); // do not need this constructor, documenting that seek will not work
+                                                                                  // unless offset it kept track of
+    }
+    
+    public void seek(int position) {
+      if (pos < 0 || pos >= buf.length)
+        throw new IllegalArgumentException("pos = " + pos + " buf.lenght = " + buf.length);
+      this.pos = position;
+    }
+    
+    public int getPosition() {
+      return this.pos;
+    }
+    
+  }
+
+  public static class CachedBlockRead extends BlockRead {
+    private SeekableByteArrayInputStream seekableInput;
+    private CacheEntry cb;
+    
+    public CachedBlockRead(CacheEntry cb, byte buf[]) {
+      this(new SeekableByteArrayInputStream(buf), buf.length);
+      this.cb = cb;
+    }
+    
+    private CachedBlockRead(SeekableByteArrayInputStream seekableInput, long size) {
+      super(seekableInput, size);
+      this.seekableInput = seekableInput;
+    }
+
+    @Override
+    public void seek(int position) {
+      seekableInput.seek(position);
+    }
+    
+    @Override
+    public int getPosition() {
+      return seekableInput.getPosition();
+    }
+    
+    @Override
+    public boolean isIndexable() {
+      return true;
+    }
+    
+    @Override
+    public <T> T getIndex(Class<T> clazz) {
+      T bi = null;
+      synchronized (cb) {
+        @SuppressWarnings("unchecked")
+        SoftReference<T> softRef = (SoftReference<T>) cb.getIndex();
+        if (softRef != null)
+          bi = softRef.get();
+        
+        if (bi == null) {
+          try {
+            bi = clazz.newInstance();
+          } catch (Exception e) {
+            throw new RuntimeException(e);
+          }
+          cb.setIndex(new SoftReference<T>(bi));
+        }
+      }
+      
+      return bi;
+    }
+  }
+
+  /**
+   * 
+   * Class provides functionality to read one block from the underlying BCFile Since We are caching blocks in the Reader class as bytearrays, this class will
+   * wrap a DataInputStream(ByteArrayStream(cachedBlock)).
+   * 
+   * 
+   */
+  public static class BlockRead extends DataInputStream implements ABlockReader {
+    private long size;
+    
+    public BlockRead(InputStream in, long size) {
+      super(in);
+      this.size = size;
+    }
+    
+    /**
+     * Size is the size of the bytearray that was read form the cache
+     */
+    public long getRawSize() {
+      return size;
+    }
+    
+    /**
+     * It is intended that the caller of this method will close the stream we also only intend that this be called once per BlockRead. This method is provide
+     * for methods up stream that expect to receive a DataInputStream object.
+     */
+    @Override
+    public DataInputStream getStream() throws IOException {
+      return this;
+    }
+    
+    @Override
+    public boolean isIndexable() {
+      return false;
+    }
+    
+    @Override
+    public void seek(int position) {
+      throw new UnsupportedOperationException();
+    }
+    
+    @Override
+    public int getPosition() {
+      throw new UnsupportedOperationException();
+    }
+    
+    @Override
+    public <T> T getIndex(Class<T> clazz) {
+      throw new UnsupportedOperationException();
+    }
+    
+  }
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/7bdbfccb/1.5/core/src/main/java/org/apache/accumulo/core/file/keyfunctor/ColumnFamilyFunctor.java
----------------------------------------------------------------------
diff --git a/1.5/core/src/main/java/org/apache/accumulo/core/file/keyfunctor/ColumnFamilyFunctor.java b/1.5/core/src/main/java/org/apache/accumulo/core/file/keyfunctor/ColumnFamilyFunctor.java
new file mode 100644
index 0000000..244bd49
--- /dev/null
+++ b/1.5/core/src/main/java/org/apache/accumulo/core/file/keyfunctor/ColumnFamilyFunctor.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.core.file.keyfunctor;
+
+import org.apache.accumulo.core.data.ByteSequence;
+import org.apache.accumulo.core.data.PartialKey;
+import org.apache.accumulo.core.data.Range;
+import org.apache.hadoop.util.bloom.Key;
+
+public class ColumnFamilyFunctor implements KeyFunctor {
+  
+  public static final PartialKey kDepth = PartialKey.ROW_COLFAM;
+  
+  @Override
+  public org.apache.hadoop.util.bloom.Key transform(org.apache.accumulo.core.data.Key acuKey) {
+    
+    byte keyData[];
+    
+    ByteSequence row = acuKey.getRowData();
+    ByteSequence cf = acuKey.getColumnFamilyData();
+    keyData = new byte[row.length() + cf.length()];
+    System.arraycopy(row.getBackingArray(), row.offset(), keyData, 0, row.length());
+    System.arraycopy(cf.getBackingArray(), cf.offset(), keyData, row.length(), cf.length());
+    
+    return new org.apache.hadoop.util.bloom.Key(keyData, 1.0);
+  }
+  
+  @Override
+  public Key transform(Range range) {
+    if (RowFunctor.isRangeInBloomFilter(range, PartialKey.ROW_COLFAM)) {
+      return transform(range.getStartKey());
+    }
+    return null;
+  }
+  
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/7bdbfccb/1.5/core/src/main/java/org/apache/accumulo/core/file/keyfunctor/ColumnQualifierFunctor.java
----------------------------------------------------------------------
diff --git a/1.5/core/src/main/java/org/apache/accumulo/core/file/keyfunctor/ColumnQualifierFunctor.java b/1.5/core/src/main/java/org/apache/accumulo/core/file/keyfunctor/ColumnQualifierFunctor.java
new file mode 100644
index 0000000..7456486
--- /dev/null
+++ b/1.5/core/src/main/java/org/apache/accumulo/core/file/keyfunctor/ColumnQualifierFunctor.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.core.file.keyfunctor;
+
+import org.apache.accumulo.core.data.ByteSequence;
+import org.apache.accumulo.core.data.PartialKey;
+import org.apache.accumulo.core.data.Range;
+import org.apache.hadoop.util.bloom.Key;
+
+public class ColumnQualifierFunctor implements KeyFunctor {
+  
+  @Override
+  public org.apache.hadoop.util.bloom.Key transform(org.apache.accumulo.core.data.Key acuKey) {
+    byte keyData[];
+    
+    ByteSequence row = acuKey.getRowData();
+    ByteSequence cf = acuKey.getColumnFamilyData();
+    ByteSequence cq = acuKey.getColumnQualifierData();
+    keyData = new byte[row.length() + cf.length() + cq.length()];
+    System.arraycopy(row.getBackingArray(), row.offset(), keyData, 0, row.length());
+    System.arraycopy(cf.getBackingArray(), cf.offset(), keyData, row.length(), cf.length());
+    System.arraycopy(cq.getBackingArray(), cq.offset(), keyData, row.length() + cf.length(), cq.length());
+    
+    return new org.apache.hadoop.util.bloom.Key(keyData, 1.0);
+  }
+  
+  @Override
+  public Key transform(Range range) {
+    if (RowFunctor.isRangeInBloomFilter(range, PartialKey.ROW_COLFAM_COLQUAL)) {
+      return transform(range.getStartKey());
+    }
+    return null;
+  }
+  
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/7bdbfccb/1.5/core/src/main/java/org/apache/accumulo/core/file/keyfunctor/KeyFunctor.java
----------------------------------------------------------------------
diff --git a/1.5/core/src/main/java/org/apache/accumulo/core/file/keyfunctor/KeyFunctor.java b/1.5/core/src/main/java/org/apache/accumulo/core/file/keyfunctor/KeyFunctor.java
new file mode 100644
index 0000000..c5a0e59
--- /dev/null
+++ b/1.5/core/src/main/java/org/apache/accumulo/core/file/keyfunctor/KeyFunctor.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.core.file.keyfunctor;
+
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Range;
+
+public interface KeyFunctor {
+  /**
+   * Implementations should return null if a range can not be converted to a bloom key.
+   * 
+   */
+  public org.apache.hadoop.util.bloom.Key transform(Range range);
+  
+  public org.apache.hadoop.util.bloom.Key transform(Key key);
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/7bdbfccb/1.5/core/src/main/java/org/apache/accumulo/core/file/keyfunctor/RowFunctor.java
----------------------------------------------------------------------
diff --git a/1.5/core/src/main/java/org/apache/accumulo/core/file/keyfunctor/RowFunctor.java b/1.5/core/src/main/java/org/apache/accumulo/core/file/keyfunctor/RowFunctor.java
new file mode 100644
index 0000000..06597a3
--- /dev/null
+++ b/1.5/core/src/main/java/org/apache/accumulo/core/file/keyfunctor/RowFunctor.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.core.file.keyfunctor;
+
+import org.apache.accumulo.core.data.ByteSequence;
+import org.apache.accumulo.core.data.PartialKey;
+import org.apache.accumulo.core.data.Range;
+import org.apache.hadoop.util.bloom.Key;
+
+public class RowFunctor implements KeyFunctor {
+  
+  @Override
+  public org.apache.hadoop.util.bloom.Key transform(org.apache.accumulo.core.data.Key acuKey) {
+    byte keyData[];
+    
+    ByteSequence row = acuKey.getRowData();
+    keyData = new byte[row.length()];
+    System.arraycopy(row.getBackingArray(), 0, keyData, 0, row.length());
+    
+    return new org.apache.hadoop.util.bloom.Key(keyData, 1.0);
+  }
+  
+  @Override
+  public Key transform(Range range) {
+    if (isRangeInBloomFilter(range, PartialKey.ROW)) {
+      return transform(range.getStartKey());
+    }
+    return null;
+  }
+  
+  static boolean isRangeInBloomFilter(Range range, PartialKey keyDepth) {
+    
+    if (range.getStartKey() == null || range.getEndKey() == null) {
+      return false;
+    }
+    
+    if (range.getStartKey().equals(range.getEndKey(), keyDepth))
+      return true;
+    
+    // include everything but the deleted flag in the comparison...
+    return range.getStartKey().followingKey(keyDepth).equals(range.getEndKey(), PartialKey.ROW_COLFAM_COLQUAL_COLVIS_TIME) && !range.isEndKeyInclusive();
+  }
+}


Mime
View raw message