accumulo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ctubb...@apache.org
Subject [25/61] [abbrv] [partial] accumulo git commit: ACCUMULO-722 put trunk in my sandbox
Date Thu, 03 Mar 2016 21:59:50 GMT
http://git-wip-us.apache.org/repos/asf/accumulo/blob/7bdbfccb/1.5/core/src/main/java/org/apache/accumulo/core/file/rfile/RFileOperations.java
----------------------------------------------------------------------
diff --git a/1.5/core/src/main/java/org/apache/accumulo/core/file/rfile/RFileOperations.java b/1.5/core/src/main/java/org/apache/accumulo/core/file/rfile/RFileOperations.java
new file mode 100644
index 0000000..5374332
--- /dev/null
+++ b/1.5/core/src/main/java/org/apache/accumulo/core/file/rfile/RFileOperations.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.core.file.rfile;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Set;
+
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.data.ByteSequence;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.file.FileOperations;
+import org.apache.accumulo.core.file.FileSKVIterator;
+import org.apache.accumulo.core.file.FileSKVWriter;
+import org.apache.accumulo.core.file.blockfile.cache.BlockCache;
+import org.apache.accumulo.core.file.blockfile.impl.CachableBlockFile;
+import org.apache.accumulo.core.file.rfile.RFile.Reader;
+import org.apache.accumulo.core.file.rfile.RFile.Writer;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+public class RFileOperations extends FileOperations {
+  
+  private static final Collection<ByteSequence> EMPTY_CF_SET = Collections.emptySet();
+  
+  @Override
+  public long getFileSize(String file, FileSystem fs, Configuration conf, AccumuloConfiguration acuconf) throws IOException {
+    return fs.getFileStatus(new Path(file)).getLen();
+  }
+  
+  @Override
+  public FileSKVIterator openIndex(String file, FileSystem fs, Configuration conf, AccumuloConfiguration acuconf) throws IOException {
+    
+    return openIndex(file, fs, conf, acuconf, null, null);
+  }
+  
+  @Override
+  public FileSKVIterator openIndex(String file, FileSystem fs, Configuration conf, AccumuloConfiguration acuconf, BlockCache dataCache, BlockCache indexCache)
+      throws IOException {
+    Path path = new Path(file);
+    // long len = fs.getFileStatus(path).getLen();
+    // FSDataInputStream in = fs.open(path);
+    // Reader reader = new RFile.Reader(in, len , conf);
+    CachableBlockFile.Reader _cbr = new CachableBlockFile.Reader(fs, path, conf, dataCache, indexCache);
+    final Reader reader = new RFile.Reader(_cbr);
+    
+    return reader.getIndex();
+  }
+  
+  @Override
+  public FileSKVIterator openReader(String file, boolean seekToBeginning, FileSystem fs, Configuration conf, AccumuloConfiguration acuconf) throws IOException {
+    return openReader(file, seekToBeginning, fs, conf, acuconf, null, null);
+  }
+  
+  @Override
+  public FileSKVIterator openReader(String file, boolean seekToBeginning, FileSystem fs, Configuration conf, AccumuloConfiguration acuconf,
+      BlockCache dataCache, BlockCache indexCache) throws IOException {
+    Path path = new Path(file);
+    
+    CachableBlockFile.Reader _cbr = new CachableBlockFile.Reader(fs, path, conf, dataCache, indexCache);
+    Reader iter = new RFile.Reader(_cbr);
+    
+    if (seekToBeginning) {
+      iter.seek(new Range((Key) null, null), EMPTY_CF_SET, false);
+    }
+    
+    return iter;
+  }
+  
+  @Override
+  public FileSKVIterator openReader(String file, Range range, Set<ByteSequence> columnFamilies, boolean inclusive, FileSystem fs, Configuration conf,
+      AccumuloConfiguration tableConf) throws IOException {
+    FileSKVIterator iter = openReader(file, false, fs, conf, tableConf, null, null);
+    iter.seek(range, columnFamilies, inclusive);
+    return iter;
+  }
+  
+  @Override
+  public FileSKVIterator openReader(String file, Range range, Set<ByteSequence> columnFamilies, boolean inclusive, FileSystem fs, Configuration conf,
+      AccumuloConfiguration tableConf, BlockCache dataCache, BlockCache indexCache) throws IOException {
+    FileSKVIterator iter = openReader(file, false, fs, conf, tableConf, dataCache, indexCache);
+    iter.seek(range, columnFamilies, inclusive);
+    return iter;
+  }
+  
+  @Override
+  public FileSKVWriter openWriter(String file, FileSystem fs, Configuration conf, AccumuloConfiguration acuconf) throws IOException {
+    int hrep = conf.getInt("dfs.replication", -1);
+    int trep = acuconf.getCount(Property.TABLE_FILE_REPLICATION);
+    int rep = hrep;
+    if (trep > 0 && trep != hrep) {
+      rep = trep;
+    }
+    long hblock = conf.getLong("dfs.block.size", 1 << 26);
+    long tblock = acuconf.getMemoryInBytes(Property.TABLE_FILE_BLOCK_SIZE);
+    long block = hblock;
+    if (tblock > 0)
+      block = tblock;
+    int bufferSize = conf.getInt("io.file.buffer.size", 4096);
+    
+    long blockSize = acuconf.getMemoryInBytes(Property.TABLE_FILE_COMPRESSED_BLOCK_SIZE);
+    long indexBlockSize = acuconf.getMemoryInBytes(Property.TABLE_FILE_COMPRESSED_BLOCK_SIZE_INDEX);
+    
+    String compression = acuconf.get(Property.TABLE_FILE_COMPRESSION_TYPE);
+    
+    CachableBlockFile.Writer _cbw = new CachableBlockFile.Writer(fs.create(new Path(file), false, bufferSize, (short) rep, block), compression, conf);
+    Writer writer = new RFile.Writer(_cbw, (int) blockSize, (int) indexBlockSize);
+    return writer;
+  }
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/7bdbfccb/1.5/core/src/main/java/org/apache/accumulo/core/file/rfile/RelativeKey.java
----------------------------------------------------------------------
diff --git a/1.5/core/src/main/java/org/apache/accumulo/core/file/rfile/RelativeKey.java b/1.5/core/src/main/java/org/apache/accumulo/core/file/rfile/RelativeKey.java
new file mode 100644
index 0000000..1711765
--- /dev/null
+++ b/1.5/core/src/main/java/org/apache/accumulo/core/file/rfile/RelativeKey.java
@@ -0,0 +1,438 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.core.file.rfile;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.zip.GZIPOutputStream;
+
+import org.apache.accumulo.core.data.ArrayByteSequence;
+import org.apache.accumulo.core.data.ByteSequence;
+import org.apache.accumulo.core.data.Key;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.io.WritableUtils;
+
+public class RelativeKey implements WritableComparable<RelativeKey> {
+  
+  private Key key;
+  
+  private byte fieldsSame;
+  
+  private Key prevKey;
+  
+  private static final byte ROW_SAME = 0x01;
+  private static final byte CF_SAME = 0x02;
+  private static final byte CQ_SAME = 0x04;
+  private static final byte CV_SAME = 0x08;
+  private static final byte TS_SAME = 0x10;
+  private static final byte DELETED = 0x20;
+  
+  private static HashMap<Text,Integer> colFams = new HashMap<Text,Integer>();
+  
+  private static long bytesWritten = 0;
+  
+  public static void printStats() throws Exception {
+    System.out.println("colFams.size() : " + colFams.size());
+    Set<Entry<Text,Integer>> es = colFams.entrySet();
+    
+    int sum = 0;
+    
+    for (Entry<Text,Integer> entry : es) {
+      sum += entry.getKey().getLength();
+    }
+    
+    System.out.println("Total Column name bytes : " + sum);
+    
+    ByteArrayOutputStream baos = new ByteArrayOutputStream();
+    DataOutputStream dos = new DataOutputStream(new GZIPOutputStream(baos));
+    for (Entry<Text,Integer> entry : es) {
+      entry.getKey().write(dos);
+      dos.writeInt(entry.getValue());
+    }
+    
+    dos.close();
+    
+    System.out.println("Compressed column map size : " + baos.toByteArray().length);
+    System.out.printf("Bytes written : %,d\n", bytesWritten);
+    
+  }
+  
+  public RelativeKey() {
+    
+  }
+  
+  public RelativeKey(Key prevKey, Key key) {
+    
+    this.key = key;
+    
+    fieldsSame = 0;
+    
+    if (prevKey != null) {
+      if (prevKey.getRowData().equals(key.getRowData()))
+        fieldsSame |= ROW_SAME;
+      
+      if (prevKey.getColumnFamilyData().equals(key.getColumnFamilyData()))
+        fieldsSame |= CF_SAME;
+      
+      if (prevKey.getColumnQualifierData().equals(key.getColumnQualifierData()))
+        fieldsSame |= CQ_SAME;
+      
+      if (prevKey.getColumnVisibilityData().equals(key.getColumnVisibilityData()))
+        fieldsSame |= CV_SAME;
+      
+      if (prevKey.getTimestamp() == key.getTimestamp())
+        fieldsSame |= TS_SAME;
+      
+    }
+    
+    // stored deleted information in bit vector instead of its own byte
+    if (key.isDeleted())
+      fieldsSame |= DELETED;
+  }
+  
+  public void setPrevKey(Key pk) {
+    this.prevKey = pk;
+  }
+  
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    fieldsSame = in.readByte();
+    
+    byte[] row, cf, cq, cv;
+    long ts;
+    
+    if ((fieldsSame & ROW_SAME) == 0) {
+      row = read(in);
+    } else {
+      row = prevKey.getRowData().toArray();
+    }
+    
+    if ((fieldsSame & CF_SAME) == 0) {
+      cf = read(in);
+    } else {
+      cf = prevKey.getColumnFamilyData().toArray();
+    }
+    
+    if ((fieldsSame & CQ_SAME) == 0) {
+      cq = read(in);
+    } else {
+      cq = prevKey.getColumnQualifierData().toArray();
+    }
+    
+    if ((fieldsSame & CV_SAME) == 0) {
+      cv = read(in);
+    } else {
+      cv = prevKey.getColumnVisibilityData().toArray();
+    }
+    
+    if ((fieldsSame & TS_SAME) == 0) {
+      ts = WritableUtils.readVLong(in);
+    } else {
+      ts = prevKey.getTimestamp();
+    }
+    
+    this.key = new Key(row, cf, cq, cv, ts, (fieldsSame & DELETED) != 0, false);
+    
+    this.prevKey = this.key;
+  }
+  
+  static class MByteSequence extends ArrayByteSequence {
+    MByteSequence(byte[] data, int offset, int length) {
+      super(data, offset, length);
+    }
+    
+    MByteSequence(ByteSequence bs) {
+      super(new byte[Math.max(64, bs.length())]);
+      System.arraycopy(bs.getBackingArray(), bs.offset(), data, 0, bs.length());
+      this.length = bs.length();
+      this.offset = 0;
+    }
+    
+    void setArray(byte[] data) {
+      this.data = data;
+      this.offset = 0;
+      this.length = 0;
+    }
+    
+    void setLength(int len) {
+      this.length = len;
+    }
+  }
+  
+  int fastSkip(DataInput in, Key seekKey, MByteSequence value, Key pkey, Key currKey) throws IOException {
+    // this method assumes that fast skip is being called on a compressed block where the last key
+    // in the compressed block is >= seekKey... therefore this method should go passed the end of the
+    // compressed block... if it does, there is probably an error in the callers logic
+    
+    // this method mostly avoids object allocation and only does compares when the row changes
+    
+    MByteSequence row, cf, cq, cv;
+    MByteSequence prow, pcf, pcq, pcv;
+    
+    ByteSequence stopRow = seekKey.getRowData();
+    ByteSequence stopCF = seekKey.getColumnFamilyData();
+    ByteSequence stopCQ = seekKey.getColumnQualifierData();
+    
+    long ts = -1;
+    long pts = -1;
+    boolean pdel = false;
+    
+    int rowCmp = -1, cfCmp = -1, cqCmp = -1;
+    
+    if (currKey != null) {
+      
+      prow = new MByteSequence(pkey.getRowData());
+      pcf = new MByteSequence(pkey.getColumnFamilyData());
+      pcq = new MByteSequence(pkey.getColumnQualifierData());
+      pcv = new MByteSequence(pkey.getColumnVisibilityData());
+      pts = pkey.getTimestamp();
+      
+      row = new MByteSequence(currKey.getRowData());
+      cf = new MByteSequence(currKey.getColumnFamilyData());
+      cq = new MByteSequence(currKey.getColumnQualifierData());
+      cv = new MByteSequence(currKey.getColumnVisibilityData());
+      ts = currKey.getTimestamp();
+      
+      rowCmp = row.compareTo(stopRow);
+      cfCmp = cf.compareTo(stopCF);
+      cqCmp = cq.compareTo(stopCQ);
+      
+      if (rowCmp >= 0) {
+        if (rowCmp > 0)
+          return 0;
+        
+        if (cfCmp >= 0) {
+          if (cfCmp > 0)
+            return 0;
+          
+          if (cqCmp >= 0)
+            return 0;
+        }
+      }
+      
+    } else {
+      row = new MByteSequence(new byte[64], 0, 0);
+      cf = new MByteSequence(new byte[64], 0, 0);
+      cq = new MByteSequence(new byte[64], 0, 0);
+      cv = new MByteSequence(new byte[64], 0, 0);
+      
+      prow = new MByteSequence(new byte[64], 0, 0);
+      pcf = new MByteSequence(new byte[64], 0, 0);
+      pcq = new MByteSequence(new byte[64], 0, 0);
+      pcv = new MByteSequence(new byte[64], 0, 0);
+    }
+    
+    byte fieldsSame = -1;
+    int count = 0;
+    
+    while (true) {
+      
+      pdel = (fieldsSame & DELETED) != 0;
+      
+      fieldsSame = in.readByte();
+      
+      boolean changed = false;
+      
+      if ((fieldsSame & ROW_SAME) == 0) {
+        
+        MByteSequence tmp = prow;
+        prow = row;
+        row = tmp;
+        
+        read(in, row);
+        
+        // read a new row, so need to compare...
+        rowCmp = row.compareTo(stopRow);
+        changed = true;
+      }// else the row is the same as the last, so no need to compare
+      
+      if ((fieldsSame & CF_SAME) == 0) {
+        
+        MByteSequence tmp = pcf;
+        pcf = cf;
+        cf = tmp;
+        
+        read(in, cf);
+        
+        cfCmp = cf.compareTo(stopCF);
+        changed = true;
+      }
+      
+      if ((fieldsSame & CQ_SAME) == 0) {
+        
+        MByteSequence tmp = pcq;
+        pcq = cq;
+        cq = tmp;
+        
+        read(in, cq);
+        
+        cqCmp = cq.compareTo(stopCQ);
+        changed = true;
+      }
+      
+      if ((fieldsSame & CV_SAME) == 0) {
+        
+        MByteSequence tmp = pcv;
+        pcv = cv;
+        cv = tmp;
+        
+        read(in, cv);
+      }
+      
+      if ((fieldsSame & TS_SAME) == 0) {
+        pts = ts;
+        ts = WritableUtils.readVLong(in);
+      }
+      
+      readValue(in, value);
+      
+      count++;
+      
+      if (changed && rowCmp >= 0) {
+        if (rowCmp > 0)
+          break;
+        
+        if (cfCmp >= 0) {
+          if (cfCmp > 0)
+            break;
+          
+          if (cqCmp >= 0)
+            break;
+        }
+      }
+      
+    }
+    
+    if (count > 1) {
+      MByteSequence trow, tcf, tcq, tcv;
+      long tts;
+      
+      // when the current keys field is same as the last, then
+      // set the prev keys field the same as the current key
+      trow = (fieldsSame & ROW_SAME) == 0 ? prow : row;
+      tcf = (fieldsSame & CF_SAME) == 0 ? pcf : cf;
+      tcq = (fieldsSame & CQ_SAME) == 0 ? pcq : cq;
+      tcv = (fieldsSame & CV_SAME) == 0 ? pcv : cv;
+      tts = (fieldsSame & TS_SAME) == 0 ? pts : ts;
+      
+      Key tmp = new Key(trow.getBackingArray(), trow.offset(), trow.length(), tcf.getBackingArray(), tcf.offset(), tcf.length(), tcq.getBackingArray(),
+          tcq.offset(), tcq.length(), tcv.getBackingArray(), tcv.offset(), tcv.length(), tts);
+      tmp.setDeleted(pdel);
+      pkey.set(tmp);
+    }
+    
+    this.key = new Key(row.getBackingArray(), row.offset(), row.length(), cf.getBackingArray(), cf.offset(), cf.length(), cq.getBackingArray(), cq.offset(),
+        cq.length(), cv.getBackingArray(), cv.offset(), cv.length(), ts);
+    this.key.setDeleted((fieldsSame & DELETED) != 0);
+    
+    this.prevKey = this.key;
+    
+    return count;
+  }
+  
+  private void read(DataInput in, MByteSequence mbseq) throws IOException {
+    int len = WritableUtils.readVInt(in);
+    read(in, mbseq, len);
+  }
+  
+  private void readValue(DataInput in, MByteSequence mbseq) throws IOException {
+    int len = in.readInt();
+    read(in, mbseq, len);
+  }
+  
+  private void read(DataInput in, MByteSequence mbseq, int len) throws IOException {
+    if (mbseq.getBackingArray().length < len) {
+      int newLen = mbseq.getBackingArray().length;
+      
+      while (newLen < len) {
+        newLen = newLen * 2;
+      }
+      
+      mbseq.setArray(new byte[newLen]);
+    }
+    
+    in.readFully(mbseq.getBackingArray(), 0, len);
+    mbseq.setLength(len);
+  }
+  
+  private byte[] read(DataInput in) throws IOException {
+    int len = WritableUtils.readVInt(in);
+    byte[] data = new byte[len];
+    in.readFully(data);
+    return data;
+  }
+  
+  public Key getKey() {
+    return key;
+  }
+  
+  private void write(DataOutput out, ByteSequence bs) throws IOException {
+    WritableUtils.writeVInt(out, bs.length());
+    out.write(bs.getBackingArray(), bs.offset(), bs.length());
+  }
+  
+  @Override
+  public void write(DataOutput out) throws IOException {
+    
+    out.writeByte(fieldsSame);
+    
+    // System.out.printf("wrote fs %x\n", fieldsSame);
+    
+    bytesWritten += 1;
+    
+    if ((fieldsSame & ROW_SAME) == 0) {
+      write(out, key.getRowData());
+    }
+    
+    if ((fieldsSame & CF_SAME) == 0) {
+      write(out, key.getColumnFamilyData());
+    }
+    
+    if ((fieldsSame & CQ_SAME) == 0) {
+      
+      write(out, key.getColumnQualifierData());
+      
+      /*
+       * Integer id = colFams.get(key.getColumnQualifier()); if(id == null){ id = nextId++; colFams.put(key.getColumnQualifier(), id); }
+       * 
+       * WritableUtils.writeVInt(out, id); bytesWritten += 1;
+       */
+      
+    }
+    
+    if ((fieldsSame & CV_SAME) == 0) {
+      write(out, key.getColumnVisibilityData());
+    }
+    
+    if ((fieldsSame & TS_SAME) == 0) {
+      WritableUtils.writeVLong(out, key.getTimestamp());
+    }
+  }
+  
+  @Override
+  public int compareTo(RelativeKey o) {
+    throw new UnsupportedOperationException();
+  }
+  
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/7bdbfccb/1.5/core/src/main/java/org/apache/accumulo/core/file/rfile/SplitLarge.java
----------------------------------------------------------------------
diff --git a/1.5/core/src/main/java/org/apache/accumulo/core/file/rfile/SplitLarge.java b/1.5/core/src/main/java/org/apache/accumulo/core/file/rfile/SplitLarge.java
new file mode 100644
index 0000000..f5b218b
--- /dev/null
+++ b/1.5/core/src/main/java/org/apache/accumulo/core/file/rfile/SplitLarge.java
@@ -0,0 +1,94 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.core.file.rfile;
+
+import java.util.ArrayList;
+
+import org.apache.accumulo.core.conf.DefaultConfiguration;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.data.ByteSequence;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.file.blockfile.impl.CachableBlockFile;
+import org.apache.accumulo.core.file.rfile.RFile.Reader;
+import org.apache.accumulo.core.file.rfile.RFile.Writer;
+import org.apache.accumulo.core.util.CachedConfiguration;
+import org.apache.commons.cli.BasicParser;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.Options;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+/**
+ * Split an RFile into large and small key/value files.
+ * 
+ */
+public class SplitLarge {
+  
+  public static void main(String[] args) throws Exception {
+    Configuration conf = CachedConfiguration.getInstance();
+    FileSystem fs = FileSystem.get(conf);
+    long maxSize = 10 * 1024 * 1024;
+    
+    Options opts = new Options();
+    Option maxSizeOption = new Option("m", "", true, "the maximum size of the key/value pair to shunt to the small file");
+    opts.addOption(maxSizeOption);
+    
+    CommandLine commandLine = new BasicParser().parse(opts, args);
+    if (commandLine.hasOption(maxSizeOption.getOpt())) {
+      maxSize = Long.parseLong(commandLine.getOptionValue(maxSizeOption.getOpt()));
+    }
+    
+    for (String arg : commandLine.getArgs()) {
+      Path path = new Path(arg);
+      CachableBlockFile.Reader rdr = new CachableBlockFile.Reader(fs, path, conf, null, null);
+      Reader iter = new RFile.Reader(rdr);
+      
+      if (!arg.endsWith(".rf")) {
+        throw new IllegalArgumentException("File must end with .rf");
+      }
+      String smallName = arg.substring(0, arg.length() - 3) + "_small.rf";
+      String largeName = arg.substring(0, arg.length() - 3) + "_large.rf";
+      
+      int blockSize = (int) DefaultConfiguration.getDefaultConfiguration().getMemoryInBytes(Property.TABLE_FILE_BLOCK_SIZE);
+      Writer small = new RFile.Writer(new CachableBlockFile.Writer(fs, new Path(smallName), "gz", conf), blockSize);
+      small.startDefaultLocalityGroup();
+      Writer large = new RFile.Writer(new CachableBlockFile.Writer(fs, new Path(largeName), "gz", conf), blockSize);
+      large.startDefaultLocalityGroup();
+
+      iter.seek(new Range(), new ArrayList<ByteSequence>(), false);
+      while (iter.hasTop()) {
+        Key key = iter.getTopKey();
+        Value value = iter.getTopValue();
+        if (key.getSize() + value.getSize() < maxSize) {
+          small.append(key, value);
+        } else {
+          large.append(key, value);
+        }
+        iter.next();
+      }
+
+      iter.close();
+      large.close();
+      small.close();
+    }
+  }
+  
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/7bdbfccb/1.5/core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/BCFile.java
----------------------------------------------------------------------
diff --git a/1.5/core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/BCFile.java b/1.5/core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/BCFile.java
new file mode 100644
index 0000000..e5de892
--- /dev/null
+++ b/1.5/core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/BCFile.java
@@ -0,0 +1,972 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.accumulo.core.file.rfile.bcfile;
+
+import java.io.ByteArrayOutputStream;
+import java.io.Closeable;
+import java.io.DataInput;
+import java.io.DataInputStream;
+import java.io.DataOutput;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Map;
+import java.util.TreeMap;
+
+import org.apache.accumulo.core.file.blockfile.impl.CachableBlockFile;
+import org.apache.accumulo.core.file.blockfile.impl.CachableBlockFile.BlockRead;
+import org.apache.accumulo.core.file.rfile.bcfile.CompareUtils.Scalar;
+import org.apache.accumulo.core.file.rfile.bcfile.CompareUtils.ScalarComparator;
+import org.apache.accumulo.core.file.rfile.bcfile.CompareUtils.ScalarLong;
+import org.apache.accumulo.core.file.rfile.bcfile.Compression.Algorithm;
+import org.apache.accumulo.core.file.rfile.bcfile.Utils.Version;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.compress.Compressor;
+import org.apache.hadoop.io.compress.Decompressor;
+
+/**
+ * Block Compressed file, the underlying physical storage layer for TFile. BCFile provides the basic block level compression for the data block and meta blocks.
+ * It is separated from TFile as it may be used for other block-compressed file implementation.
+ */
+public final class BCFile {
+  // the current version of BCFile impl, increment them (major or minor) made
+  // enough changes
+  static final Version API_VERSION = new Version((short) 1, (short) 0);
+  static final Log LOG = LogFactory.getLog(BCFile.class);
+  
+  /**
+   * Prevent the instantiation of BCFile objects.
+   */
+  private BCFile() {
+    // nothing
+  }
+  
+  /**
+   * BCFile writer, the entry point for creating a new BCFile.
+   */
+  static public class Writer implements Closeable {
+    private final FSDataOutputStream out;
+    private final Configuration conf;
+    // the single meta block containing index of compressed data blocks
+    final DataIndex dataIndex;
+    // index for meta blocks
+    final MetaIndex metaIndex;
+    boolean blkInProgress = false;
+    private boolean metaBlkSeen = false;
+    private boolean closed = false;
+    long errorCount = 0;
+    // reusable buffers.
+    private BytesWritable fsOutputBuffer;
+    
+    /**
+     * Call-back interface to register a block after a block is closed.
+     */
+    private static interface BlockRegister {
+      /**
+       * Register a block that is fully closed.
+       * 
+       * @param raw
+       *          The size of block in terms of uncompressed bytes.
+       * @param offsetStart
+       *          The start offset of the block.
+       * @param offsetEnd
+       *          One byte after the end of the block. Compressed block size is offsetEnd - offsetStart.
+       */
+      public void register(long raw, long offsetStart, long offsetEnd);
+    }
+    
+    /**
+     * Intermediate class that maintain the state of a Writable Compression Block.
+     */
+    private static final class WBlockState {
+      private final Algorithm compressAlgo;
+      private Compressor compressor; // !null only if using native
+      // Hadoop compression
+      private final FSDataOutputStream fsOut;
+      private final long posStart;
+      private final SimpleBufferedOutputStream fsBufferedOutput;
+      private OutputStream out;
+      
+      /**
+       * @param compressionAlgo
+       *          The compression algorithm to be used to for compression.
+       * @throws IOException
+       */
+      public WBlockState(Algorithm compressionAlgo, FSDataOutputStream fsOut, BytesWritable fsOutputBuffer, Configuration conf) throws IOException {
+        this.compressAlgo = compressionAlgo;
+        this.fsOut = fsOut;
+        this.posStart = fsOut.getPos();
+        
+        fsOutputBuffer.setCapacity(TFile.getFSOutputBufferSize(conf));
+        
+        this.fsBufferedOutput = new SimpleBufferedOutputStream(this.fsOut, fsOutputBuffer.getBytes());
+        this.compressor = compressAlgo.getCompressor();
+        
+        try {
+          this.out = compressionAlgo.createCompressionStream(fsBufferedOutput, compressor, 0);
+        } catch (IOException e) {
+          compressAlgo.returnCompressor(compressor);
+          throw e;
+        }
+      }
+      
+      /**
+       * Get the output stream for BlockAppender's consumption.
+       * 
+       * @return the output stream suitable for writing block data.
+       */
+      OutputStream getOutputStream() {
+        return out;
+      }
+      
+      /**
+       * Get the current position in file.
+       * 
+       * @return The current byte offset in underlying file.
+       * @throws IOException
+       */
+      long getCurrentPos() throws IOException {
+        return fsOut.getPos() + fsBufferedOutput.size();
+      }
+      
+      long getStartPos() {
+        return posStart;
+      }
+      
+      /**
+       * Current size of compressed data.
+       * 
+       * @return
+       * @throws IOException
+       */
+      long getCompressedSize() throws IOException {
+        long ret = getCurrentPos() - posStart;
+        return ret;
+      }
+      
+      /**
+       * Finishing up the current block.
+       */
+      public void finish() throws IOException {
+        try {
+          if (out != null) {
+            out.flush();
+            out = null;
+          }
+        } finally {
+          compressAlgo.returnCompressor(compressor);
+          compressor = null;
+        }
+      }
+    }
+    
+    /**
+     * Access point to stuff data into a block.
+     * 
+     */
+    public class BlockAppender extends DataOutputStream {
+      private final BlockRegister blockRegister;
+      private final WBlockState wBlkState;
+      private boolean closed = false;
+      
+      /**
+       * Constructor
+       * 
+       * @param register
+       *          the block register, which is called when the block is closed.
+       * @param wbs
+       *          The writable compression block state.
+       */
+      BlockAppender(BlockRegister register, WBlockState wbs) {
+        super(wbs.getOutputStream());
+        this.blockRegister = register;
+        this.wBlkState = wbs;
+      }
+      
+      /**
+       * Get the raw size of the block.
+       * 
+       * @return the number of uncompressed bytes written through the BlockAppender so far.
+       * @throws IOException
+       */
+      public long getRawSize() throws IOException {
+        /**
+         * Expecting the size() of a block not exceeding 4GB. Assuming the size() will wrap to negative integer if it exceeds 2GB.
+         */
+        return size() & 0x00000000ffffffffL;
+      }
+      
+      /**
+       * Get the compressed size of the block in progress.
+       * 
+       * @return the number of compressed bytes written to the underlying FS file. The size may be smaller than actual need to compress the all data written due
+       *         to internal buffering inside the compressor.
+       * @throws IOException
+       */
+      public long getCompressedSize() throws IOException {
+        return wBlkState.getCompressedSize();
+      }
+      
+      public long getStartPos() {
+        return wBlkState.getStartPos();
+      }
+      
+      @Override
+      public void flush() {
+        // The down stream is a special kind of stream that finishes a
+        // compression block upon flush. So we disable flush() here.
+      }
+      
+      /**
+       * Signaling the end of write to the block. The block register will be called for registering the finished block.
+       */
+      @Override
+      public void close() throws IOException {
+        if (closed == true) {
+          return;
+        }
+        try {
+          ++errorCount;
+          wBlkState.finish();
+          blockRegister.register(getRawSize(), wBlkState.getStartPos(), wBlkState.getCurrentPos());
+          --errorCount;
+        } finally {
+          closed = true;
+          blkInProgress = false;
+        }
+      }
+    }
+    
+    /**
+     * Constructor
+     * 
+     * @param fout
+     *          FS output stream.
+     * @param compressionName
+     *          Name of the compression algorithm, which will be used for all data blocks.
+     * @throws IOException
+     * @see Compression#getSupportedAlgorithms
+     */
+    public Writer(FSDataOutputStream fout, String compressionName, Configuration conf, boolean trackDataBlocks) throws IOException {
+      if (fout.getPos() != 0) {
+        throw new IOException("Output file not at zero offset.");
+      }
+      
+      this.out = fout;
+      this.conf = conf;
+      dataIndex = new DataIndex(compressionName, trackDataBlocks);
+      metaIndex = new MetaIndex();
+      fsOutputBuffer = new BytesWritable();
+      Magic.write(fout);
+    }
+    
+    /**
+     * Close the BCFile Writer. Attempting to use the Writer after calling <code>close</code> is not allowed and may lead to undetermined results.
+     */
+    public void close() throws IOException {
+      if (closed == true) {
+        return;
+      }
+      
+      try {
+        if (errorCount == 0) {
+          if (blkInProgress == true) {
+            throw new IllegalStateException("Close() called with active block appender.");
+          }
+          
+          // add metaBCFileIndex to metaIndex as the last meta block
+          BlockAppender appender = prepareMetaBlock(DataIndex.BLOCK_NAME, getDefaultCompressionAlgorithm());
+          try {
+            dataIndex.write(appender);
+          } finally {
+            appender.close();
+          }
+          
+          long offsetIndexMeta = out.getPos();
+          metaIndex.write(out);
+          
+          // Meta Index and the trailing section are written out directly.
+          out.writeLong(offsetIndexMeta);
+          
+          API_VERSION.write(out);
+          Magic.write(out);
+          out.flush();
+        }
+      } finally {
+        closed = true;
+      }
+    }
+    
+    private Algorithm getDefaultCompressionAlgorithm() {
+      return dataIndex.getDefaultCompressionAlgorithm();
+    }
+    
+    private BlockAppender prepareMetaBlock(String name, Algorithm compressAlgo) throws IOException, MetaBlockAlreadyExists {
+      if (blkInProgress == true) {
+        throw new IllegalStateException("Cannot create Meta Block until previous block is closed.");
+      }
+      
+      if (metaIndex.getMetaByName(name) != null) {
+        throw new MetaBlockAlreadyExists("name=" + name);
+      }
+      
+      MetaBlockRegister mbr = new MetaBlockRegister(name, compressAlgo);
+      WBlockState wbs = new WBlockState(compressAlgo, out, fsOutputBuffer, conf);
+      BlockAppender ba = new BlockAppender(mbr, wbs);
+      blkInProgress = true;
+      metaBlkSeen = true;
+      return ba;
+    }
+    
+    /**
+     * Create a Meta Block and obtain an output stream for adding data into the block. There can only be one BlockAppender stream active at any time. Regular
+     * Blocks may not be created after the first Meta Blocks. The caller must call BlockAppender.close() to conclude the block creation.
+     * 
+     * @param name
+     *          The name of the Meta Block. The name must not conflict with existing Meta Blocks.
+     * @param compressionName
+     *          The name of the compression algorithm to be used.
+     * @return The BlockAppender stream
+     * @throws IOException
+     * @throws MetaBlockAlreadyExists
+     *           If the meta block with the name already exists.
+     */
+    public BlockAppender prepareMetaBlock(String name, String compressionName) throws IOException, MetaBlockAlreadyExists {
+      return prepareMetaBlock(name, Compression.getCompressionAlgorithmByName(compressionName));
+    }
+    
+    /**
+     * Create a Meta Block and obtain an output stream for adding data into the block. The Meta Block will be compressed with the same compression algorithm as
+     * data blocks. There can only be one BlockAppender stream active at any time. Regular Blocks may not be created after the first Meta Blocks. The caller
+     * must call BlockAppender.close() to conclude the block creation.
+     * 
+     * @param name
+     *          The name of the Meta Block. The name must not conflict with existing Meta Blocks.
+     * @return The BlockAppender stream
+     * @throws MetaBlockAlreadyExists
+     *           If the meta block with the name already exists.
+     * @throws IOException
+     */
+    public BlockAppender prepareMetaBlock(String name) throws IOException, MetaBlockAlreadyExists {
+      return prepareMetaBlock(name, getDefaultCompressionAlgorithm());
+    }
+    
+    /**
+     * Create a Data Block and obtain an output stream for adding data into the block. There can only be one BlockAppender stream active at any time. Data
+     * Blocks may not be created after the first Meta Blocks. The caller must call BlockAppender.close() to conclude the block creation.
+     * 
+     * @return The BlockAppender stream
+     * @throws IOException
+     */
+    public BlockAppender prepareDataBlock() throws IOException {
+      if (blkInProgress == true) {
+        throw new IllegalStateException("Cannot create Data Block until previous block is closed.");
+      }
+      
+      if (metaBlkSeen == true) {
+        throw new IllegalStateException("Cannot create Data Block after Meta Blocks.");
+      }
+      
+      DataBlockRegister dbr = new DataBlockRegister();
+      
+      WBlockState wbs = new WBlockState(getDefaultCompressionAlgorithm(), out, fsOutputBuffer, conf);
+      BlockAppender ba = new BlockAppender(dbr, wbs);
+      blkInProgress = true;
+      return ba;
+    }
+    
+    /**
+     * Callback to make sure a meta block is added to the internal list when its stream is closed.
+     */
+    private class MetaBlockRegister implements BlockRegister {
+      private final String name;
+      private final Algorithm compressAlgo;
+      
+      MetaBlockRegister(String name, Algorithm compressAlgo) {
+        this.name = name;
+        this.compressAlgo = compressAlgo;
+      }
+      
+      public void register(long raw, long begin, long end) {
+        metaIndex.addEntry(new MetaIndexEntry(name, compressAlgo, new BlockRegion(begin, end - begin, raw)));
+      }
+    }
+    
+    /**
+     * Callback to make sure a data block is added to the internal list when it's being closed.
+     * 
+     */
+    private class DataBlockRegister implements BlockRegister {
+      DataBlockRegister() {
+        // do nothing
+      }
+      
+      public void register(long raw, long begin, long end) {
+        dataIndex.addBlockRegion(new BlockRegion(begin, end - begin, raw));
+      }
+    }
+  }
+  
+  /**
+   * BCFile Reader, interface to read the file's data and meta blocks.
+   */
+  static public class Reader implements Closeable {
+    private static final String META_NAME = "BCFile.metaindex";
+    private final FSDataInputStream in;
+    private final Configuration conf;
+    final DataIndex dataIndex;
+    // Index for meta blocks
+    final MetaIndex metaIndex;
+    final Version version;
+    
+    /**
+     * Intermediate class that maintain the state of a Readable Compression Block.
+     */
+    static private final class RBlockState {
+      private final Algorithm compressAlgo;
+      private Decompressor decompressor;
+      private final BlockRegion region;
+      private final InputStream in;
+      
+      public RBlockState(Algorithm compressionAlgo, FSDataInputStream fsin, BlockRegion region, Configuration conf) throws IOException {
+        this.compressAlgo = compressionAlgo;
+        this.region = region;
+        this.decompressor = compressionAlgo.getDecompressor();
+        
+        try {
+          this.in = compressAlgo.createDecompressionStream(new BoundedRangeFileInputStream(fsin, this.region.getOffset(), this.region.getCompressedSize()),
+              decompressor, TFile.getFSInputBufferSize(conf));
+        } catch (IOException e) {
+          compressAlgo.returnDecompressor(decompressor);
+          throw e;
+        }
+      }
+      
+      /**
+       * Get the output stream for BlockAppender's consumption.
+       * 
+       * @return the output stream suitable for writing block data.
+       */
+      public InputStream getInputStream() {
+        return in;
+      }
+      
+      public String getCompressionName() {
+        return compressAlgo.getName();
+      }
+      
+      public BlockRegion getBlockRegion() {
+        return region;
+      }
+      
+      public void finish() throws IOException {
+        try {
+          in.close();
+        } finally {
+          compressAlgo.returnDecompressor(decompressor);
+          decompressor = null;
+        }
+      }
+    }
+    
+    /**
+     * Access point to read a block.
+     */
+    public static class BlockReader extends DataInputStream {
+      private final RBlockState rBlkState;
+      private boolean closed = false;
+      
+      BlockReader(RBlockState rbs) {
+        super(rbs.getInputStream());
+        rBlkState = rbs;
+      }
+      
+      /**
+       * Finishing reading the block. Release all resources.
+       */
+      @Override
+      public void close() throws IOException {
+        if (closed == true) {
+          return;
+        }
+        try {
+          // Do not set rBlkState to null. People may access stats after calling
+          // close().
+          rBlkState.finish();
+        } finally {
+          closed = true;
+        }
+      }
+      
+      /**
+       * Get the name of the compression algorithm used to compress the block.
+       * 
+       * @return name of the compression algorithm.
+       */
+      public String getCompressionName() {
+        return rBlkState.getCompressionName();
+      }
+      
+      /**
+       * Get the uncompressed size of the block.
+       * 
+       * @return uncompressed size of the block.
+       */
+      public long getRawSize() {
+        return rBlkState.getBlockRegion().getRawSize();
+      }
+      
+      /**
+       * Get the compressed size of the block.
+       * 
+       * @return compressed size of the block.
+       */
+      public long getCompressedSize() {
+        return rBlkState.getBlockRegion().getCompressedSize();
+      }
+      
+      /**
+       * Get the starting position of the block in the file.
+       * 
+       * @return the starting position of the block in the file.
+       */
+      public long getStartPos() {
+        return rBlkState.getBlockRegion().getOffset();
+      }
+    }
+    
+    /**
+     * Constructor
+     * 
+     * @param fin
+     *          FS input stream.
+     * @param fileLength
+     *          Length of the corresponding file
+     * @throws IOException
+     */
+    public Reader(FSDataInputStream fin, long fileLength, Configuration conf) throws IOException {
+      this.in = fin;
+      this.conf = conf;
+      
+      // move the cursor to the beginning of the tail, containing: offset to the
+      // meta block index, version and magic
+      fin.seek(fileLength - Magic.size() - Version.size() - Long.SIZE / Byte.SIZE);
+      long offsetIndexMeta = fin.readLong();
+      version = new Version(fin);
+      Magic.readAndVerify(fin);
+      
+      if (!version.compatibleWith(BCFile.API_VERSION)) {
+        throw new RuntimeException("Incompatible BCFile fileBCFileVersion.");
+      }
+      
+      // read meta index
+      fin.seek(offsetIndexMeta);
+      metaIndex = new MetaIndex(fin);
+      
+      // read data:BCFile.index, the data block index
+      BlockReader blockR = getMetaBlock(DataIndex.BLOCK_NAME);
+      try {
+        dataIndex = new DataIndex(blockR);
+      } finally {
+        blockR.close();
+      }
+    }
+    
+    public Reader(CachableBlockFile.Reader cache, FSDataInputStream fin, long fileLength, Configuration conf) throws IOException {
+      this.in = fin;
+      this.conf = conf;
+      
+      BlockRead cachedMetaIndex = cache.getCachedMetaBlock(META_NAME);
+      BlockRead cachedDataIndex = cache.getCachedMetaBlock(DataIndex.BLOCK_NAME);
+      
+      if (cachedMetaIndex == null || cachedDataIndex == null) {
+        // move the cursor to the beginning of the tail, containing: offset to the
+        // meta block index, version and magic
+        fin.seek(fileLength - Magic.size() - Version.size() - Long.SIZE / Byte.SIZE);
+        long offsetIndexMeta = fin.readLong();
+        version = new Version(fin);
+        Magic.readAndVerify(fin);
+        
+        if (!version.compatibleWith(BCFile.API_VERSION)) {
+          throw new RuntimeException("Incompatible BCFile fileBCFileVersion.");
+        }
+        
+        // read meta index
+        fin.seek(offsetIndexMeta);
+        metaIndex = new MetaIndex(fin);
+        if (cachedMetaIndex == null) {
+          ByteArrayOutputStream baos = new ByteArrayOutputStream();
+          DataOutputStream dos = new DataOutputStream(baos);
+          metaIndex.write(dos);
+          dos.close();
+          cache.cacheMetaBlock(META_NAME, baos.toByteArray());
+        }
+        
+        // read data:BCFile.index, the data block index
+        if (cachedDataIndex == null) {
+          BlockReader blockR = getMetaBlock(DataIndex.BLOCK_NAME);
+          cachedDataIndex = cache.cacheMetaBlock(DataIndex.BLOCK_NAME, blockR);
+        }
+        
+        dataIndex = new DataIndex(cachedDataIndex);
+        cachedDataIndex.close();
+        
+      } else {
+        // Logger.getLogger(Reader.class).debug("Read bcfile !METADATA from cache");
+        version = null;
+        metaIndex = new MetaIndex(cachedMetaIndex);
+        dataIndex = new DataIndex(cachedDataIndex);
+      }
+    }
+    
+    /**
+     * Get the name of the default compression algorithm.
+     * 
+     * @return the name of the default compression algorithm.
+     */
+    public String getDefaultCompressionName() {
+      return dataIndex.getDefaultCompressionAlgorithm().getName();
+    }
+    
+    /**
+     * Get version of BCFile file being read.
+     * 
+     * @return version of BCFile file being read.
+     */
+    public Version getBCFileVersion() {
+      return version;
+    }
+    
+    /**
+     * Get version of BCFile API.
+     * 
+     * @return version of BCFile API.
+     */
+    public Version getAPIVersion() {
+      return API_VERSION;
+    }
+    
+    /**
+     * Finishing reading the BCFile. Release all resources.
+     */
+    public void close() {
+      // nothing to be done now
+    }
+    
+    /**
+     * Get the number of data blocks.
+     * 
+     * @return the number of data blocks.
+     */
+    public int getBlockCount() {
+      return dataIndex.getBlockRegionList().size();
+    }
+    
+    /**
+     * Stream access to a Meta Block.
+     * 
+     * @param name
+     *          meta block name
+     * @return BlockReader input stream for reading the meta block.
+     * @throws IOException
+     * @throws MetaBlockDoesNotExist
+     *           The Meta Block with the given name does not exist.
+     */
+    public BlockReader getMetaBlock(String name) throws IOException, MetaBlockDoesNotExist {
+      MetaIndexEntry imeBCIndex = metaIndex.getMetaByName(name);
+      if (imeBCIndex == null) {
+        throw new MetaBlockDoesNotExist("name=" + name);
+      }
+      
+      BlockRegion region = imeBCIndex.getRegion();
+      return createReader(imeBCIndex.getCompressionAlgorithm(), region);
+    }
+    
+    /**
+     * Stream access to a Data Block.
+     * 
+     * @param blockIndex
+     *          0-based data block index.
+     * @return BlockReader input stream for reading the data block.
+     * @throws IOException
+     */
+    public BlockReader getDataBlock(int blockIndex) throws IOException {
+      if (blockIndex < 0 || blockIndex >= getBlockCount()) {
+        throw new IndexOutOfBoundsException(String.format("blockIndex=%d, numBlocks=%d", blockIndex, getBlockCount()));
+      }
+      
+      BlockRegion region = dataIndex.getBlockRegionList().get(blockIndex);
+      return createReader(dataIndex.getDefaultCompressionAlgorithm(), region);
+    }
+    
+    public BlockReader getDataBlock(long offset, long compressedSize, long rawSize) throws IOException {
+      BlockRegion region = new BlockRegion(offset, compressedSize, rawSize);
+      return createReader(dataIndex.getDefaultCompressionAlgorithm(), region);
+    }
+    
+    private BlockReader createReader(Algorithm compressAlgo, BlockRegion region) throws IOException {
+      RBlockState rbs = new RBlockState(compressAlgo, in, region, conf);
+      return new BlockReader(rbs);
+    }
+    
+    /**
+     * Find the smallest Block index whose starting offset is greater than or equal to the specified offset.
+     * 
+     * @param offset
+     *          User-specific offset.
+     * @return the index to the data Block if such block exists; or -1 otherwise.
+     */
+    public int getBlockIndexNear(long offset) {
+      ArrayList<BlockRegion> list = dataIndex.getBlockRegionList();
+      int idx = Utils.lowerBound(list, new ScalarLong(offset), new ScalarComparator());
+      
+      if (idx == list.size()) {
+        return -1;
+      }
+      
+      return idx;
+    }
+  }
+  
+  /**
+   * Index for all Meta blocks.
+   */
+  static class MetaIndex {
+    // use a tree map, for getting a meta block entry by name
+    final Map<String,MetaIndexEntry> index;
+    
+    // for write
+    public MetaIndex() {
+      index = new TreeMap<String,MetaIndexEntry>();
+    }
+    
+    // for read, construct the map from the file
+    public MetaIndex(DataInput in) throws IOException {
+      int count = Utils.readVInt(in);
+      index = new TreeMap<String,MetaIndexEntry>();
+      
+      for (int nx = 0; nx < count; nx++) {
+        MetaIndexEntry indexEntry = new MetaIndexEntry(in);
+        index.put(indexEntry.getMetaName(), indexEntry);
+      }
+    }
+    
+    public void addEntry(MetaIndexEntry indexEntry) {
+      index.put(indexEntry.getMetaName(), indexEntry);
+    }
+    
+    public MetaIndexEntry getMetaByName(String name) {
+      return index.get(name);
+    }
+    
+    public void write(DataOutput out) throws IOException {
+      Utils.writeVInt(out, index.size());
+      
+      for (MetaIndexEntry indexEntry : index.values()) {
+        indexEntry.write(out);
+      }
+    }
+  }
+  
+  /**
+   * An entry describes a meta block in the MetaIndex.
+   */
+  static final class MetaIndexEntry {
+    private final String metaName;
+    private final Algorithm compressionAlgorithm;
+    private final static String defaultPrefix = "data:";
+    
+    private final BlockRegion region;
+    
+    public MetaIndexEntry(DataInput in) throws IOException {
+      String fullMetaName = Utils.readString(in);
+      if (fullMetaName.startsWith(defaultPrefix)) {
+        metaName = fullMetaName.substring(defaultPrefix.length(), fullMetaName.length());
+      } else {
+        throw new IOException("Corrupted Meta region Index");
+      }
+      
+      compressionAlgorithm = Compression.getCompressionAlgorithmByName(Utils.readString(in));
+      region = new BlockRegion(in);
+    }
+    
+    public MetaIndexEntry(String metaName, Algorithm compressionAlgorithm, BlockRegion region) {
+      this.metaName = metaName;
+      this.compressionAlgorithm = compressionAlgorithm;
+      this.region = region;
+    }
+    
+    public String getMetaName() {
+      return metaName;
+    }
+    
+    public Algorithm getCompressionAlgorithm() {
+      return compressionAlgorithm;
+    }
+    
+    public BlockRegion getRegion() {
+      return region;
+    }
+    
+    public void write(DataOutput out) throws IOException {
+      Utils.writeString(out, defaultPrefix + metaName);
+      Utils.writeString(out, compressionAlgorithm.getName());
+      
+      region.write(out);
+    }
+  }
+  
+  /**
+   * Index of all compressed data blocks.
+   */
+  static class DataIndex {
+    final static String BLOCK_NAME = "BCFile.index";
+    
+    private final Algorithm defaultCompressionAlgorithm;
+    
+    // for data blocks, each entry specifies a block's offset, compressed size
+    // and raw size
+    private final ArrayList<BlockRegion> listRegions;
+    
+    private boolean trackBlocks;
+    
+    // for read, deserialized from a file
+    public DataIndex(DataInput in) throws IOException {
+      defaultCompressionAlgorithm = Compression.getCompressionAlgorithmByName(Utils.readString(in));
+      
+      int n = Utils.readVInt(in);
+      listRegions = new ArrayList<BlockRegion>(n);
+      
+      for (int i = 0; i < n; i++) {
+        BlockRegion region = new BlockRegion(in);
+        listRegions.add(region);
+      }
+    }
+    
+    // for write
+    public DataIndex(String defaultCompressionAlgorithmName, boolean trackBlocks) {
+      this.trackBlocks = trackBlocks;
+      this.defaultCompressionAlgorithm = Compression.getCompressionAlgorithmByName(defaultCompressionAlgorithmName);
+      listRegions = new ArrayList<BlockRegion>();
+    }
+    
+    public Algorithm getDefaultCompressionAlgorithm() {
+      return defaultCompressionAlgorithm;
+    }
+    
+    public ArrayList<BlockRegion> getBlockRegionList() {
+      return listRegions;
+    }
+    
+    public void addBlockRegion(BlockRegion region) {
+      if (trackBlocks)
+        listRegions.add(region);
+    }
+    
+    public void write(DataOutput out) throws IOException {
+      Utils.writeString(out, defaultCompressionAlgorithm.getName());
+      
+      Utils.writeVInt(out, listRegions.size());
+      
+      for (BlockRegion region : listRegions) {
+        region.write(out);
+      }
+    }
+  }
+  
+  /**
+   * Magic number uniquely identifying a BCFile in the header/footer.
+   */
+  static final class Magic {
+    private final static byte[] AB_MAGIC_BCFILE = {
+        // ... total of 16 bytes
+        (byte) 0xd1, (byte) 0x11, (byte) 0xd3, (byte) 0x68, (byte) 0x91, (byte) 0xb5, (byte) 0xd7, (byte) 0xb6, (byte) 0x39, (byte) 0xdf, (byte) 0x41,
+        (byte) 0x40, (byte) 0x92, (byte) 0xba, (byte) 0xe1, (byte) 0x50};
+    
+    public static void readAndVerify(DataInput in) throws IOException {
+      byte[] abMagic = new byte[size()];
+      in.readFully(abMagic);
+      
+      // check against AB_MAGIC_BCFILE, if not matching, throw an
+      // Exception
+      if (!Arrays.equals(abMagic, AB_MAGIC_BCFILE)) {
+        throw new IOException("Not a valid BCFile.");
+      }
+    }
+    
+    public static void write(DataOutput out) throws IOException {
+      out.write(AB_MAGIC_BCFILE);
+    }
+    
+    public static int size() {
+      return AB_MAGIC_BCFILE.length;
+    }
+  }
+  
+  /**
+   * Block region.
+   */
+  static final class BlockRegion implements Scalar {
+    private final long offset;
+    private final long compressedSize;
+    private final long rawSize;
+    
+    public BlockRegion(DataInput in) throws IOException {
+      offset = Utils.readVLong(in);
+      compressedSize = Utils.readVLong(in);
+      rawSize = Utils.readVLong(in);
+    }
+    
+    public BlockRegion(long offset, long compressedSize, long rawSize) {
+      this.offset = offset;
+      this.compressedSize = compressedSize;
+      this.rawSize = rawSize;
+    }
+    
+    public void write(DataOutput out) throws IOException {
+      Utils.writeVLong(out, offset);
+      Utils.writeVLong(out, compressedSize);
+      Utils.writeVLong(out, rawSize);
+    }
+    
+    public long getOffset() {
+      return offset;
+    }
+    
+    public long getCompressedSize() {
+      return compressedSize;
+    }
+    
+    public long getRawSize() {
+      return rawSize;
+    }
+    
+    @Override
+    public long magnitude() {
+      return offset;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/7bdbfccb/1.5/core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/BoundedByteArrayOutputStream.java
----------------------------------------------------------------------
diff --git a/1.5/core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/BoundedByteArrayOutputStream.java b/1.5/core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/BoundedByteArrayOutputStream.java
new file mode 100644
index 0000000..3484350
--- /dev/null
+++ b/1.5/core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/BoundedByteArrayOutputStream.java
@@ -0,0 +1,94 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.accumulo.core.file.rfile.bcfile;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.OutputStream;
+
+/**
+ * A byte array backed output stream with a limit. The limit should be smaller than the buffer capacity. The object can be reused through <code>reset</code> API
+ * and choose different limits in each round.
+ */
+class BoundedByteArrayOutputStream extends OutputStream {
+  private final byte[] buffer;
+  private int limit;
+  private int count;
+  
+  public BoundedByteArrayOutputStream(int capacity) {
+    this(capacity, capacity);
+  }
+  
+  public BoundedByteArrayOutputStream(int capacity, int limit) {
+    if ((capacity < limit) || (capacity | limit) < 0) {
+      throw new IllegalArgumentException("Invalid capacity/limit");
+    }
+    this.buffer = new byte[capacity];
+    this.limit = limit;
+    this.count = 0;
+  }
+  
+  @Override
+  public void write(int b) throws IOException {
+    if (count >= limit) {
+      throw new EOFException("Reaching the limit of the buffer.");
+    }
+    buffer[count++] = (byte) b;
+  }
+  
+  @Override
+  public void write(byte b[], int off, int len) throws IOException {
+    if ((off < 0) || (off > b.length) || (len < 0) || ((off + len) > b.length) || ((off + len) < 0)) {
+      throw new IndexOutOfBoundsException();
+    } else if (len == 0) {
+      return;
+    }
+    
+    if (count + len > limit) {
+      throw new EOFException("Reach the limit of the buffer");
+    }
+    
+    System.arraycopy(b, off, buffer, count, len);
+    count += len;
+  }
+  
+  public void reset(int newlim) {
+    if (newlim > buffer.length) {
+      throw new IndexOutOfBoundsException("Limit exceeds buffer size");
+    }
+    this.limit = newlim;
+    this.count = 0;
+  }
+  
+  public void reset() {
+    this.limit = buffer.length;
+    this.count = 0;
+  }
+  
+  public int getLimit() {
+    return limit;
+  }
+  
+  public byte[] getBuffer() {
+    return buffer;
+  }
+  
+  public int size() {
+    return count;
+  }
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/7bdbfccb/1.5/core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/BoundedRangeFileInputStream.java
----------------------------------------------------------------------
diff --git a/1.5/core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/BoundedRangeFileInputStream.java b/1.5/core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/BoundedRangeFileInputStream.java
new file mode 100644
index 0000000..f8f4886
--- /dev/null
+++ b/1.5/core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/BoundedRangeFileInputStream.java
@@ -0,0 +1,153 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.accumulo.core.file.rfile.bcfile;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.security.AccessController;
+import java.security.PrivilegedActionException;
+import java.security.PrivilegedExceptionAction;
+
+import org.apache.hadoop.fs.FSDataInputStream;
+
+/**
+ * BoundedRangeFIleInputStream abstracts a contiguous region of a Hadoop FSDataInputStream as a regular input stream. One can create multiple
+ * BoundedRangeFileInputStream on top of the same FSDataInputStream and they would not interfere with each other.
+ */
+class BoundedRangeFileInputStream extends InputStream {
+  
+  private FSDataInputStream in;
+  private long pos;
+  private long end;
+  private long mark;
+  private final byte[] oneByte = new byte[1];
+  
+  /**
+   * Constructor
+   * 
+   * @param in
+   *          The FSDataInputStream we connect to.
+   * @param offset
+   *          Beginning offset of the region.
+   * @param length
+   *          Length of the region.
+   * 
+   *          The actual length of the region may be smaller if (off_begin + length) goes beyond the end of FS input stream.
+   */
+  public BoundedRangeFileInputStream(FSDataInputStream in, long offset, long length) {
+    if (offset < 0 || length < 0) {
+      throw new IndexOutOfBoundsException("Invalid offset/length: " + offset + "/" + length);
+    }
+    
+    this.in = in;
+    this.pos = offset;
+    this.end = offset + length;
+    this.mark = -1;
+  }
+  
+  @Override
+  public int available() throws IOException {
+    int avail = in.available();
+    if (pos + avail > end) {
+      avail = (int) (end - pos);
+    }
+    
+    return avail;
+  }
+  
+  @Override
+  public int read() throws IOException {
+    int ret = read(oneByte);
+    if (ret == 1)
+      return oneByte[0] & 0xff;
+    return -1;
+  }
+  
+  @Override
+  public int read(byte[] b) throws IOException {
+    return read(b, 0, b.length);
+  }
+  
+  @Override
+  public int read(final byte[] b, final int off, int len) throws IOException {
+    if ((off | len | (off + len) | (b.length - (off + len))) < 0) {
+      throw new IndexOutOfBoundsException();
+    }
+    
+    final int n = (int) Math.min(Integer.MAX_VALUE, Math.min(len, (end - pos)));
+    if (n == 0)
+      return -1;
+    Integer ret = 0;
+    synchronized (in) {
+      in.seek(pos);
+      try {
+        ret = AccessController.doPrivileged(new PrivilegedExceptionAction<Integer>() {
+          @Override
+          public Integer run() throws IOException {
+            int ret = 0;
+            ret = in.read(b, off, n);
+            return ret;
+          }
+        });
+      } catch (PrivilegedActionException e) {
+        throw (IOException) e.getException();
+      }
+    }
+    if (ret < 0) {
+      end = pos;
+      return -1;
+    }
+    pos += ret;
+    return ret;
+  }
+  
+  @Override
+  /*
+   * We may skip beyond the end of the file.
+   */
+  public long skip(long n) throws IOException {
+    long len = Math.min(n, end - pos);
+    pos += len;
+    return len;
+  }
+  
+  @Override
+  public void mark(int readlimit) {
+    mark = pos;
+  }
+  
+  @Override
+  public void reset() throws IOException {
+    if (mark < 0)
+      throw new IOException("Resetting to invalid mark");
+    pos = mark;
+  }
+  
+  @Override
+  public boolean markSupported() {
+    return true;
+  }
+  
+  @Override
+  public void close() {
+    // Invalidate the state of the stream.
+    in = null;
+    pos = end;
+    mark = -1;
+  }
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/7bdbfccb/1.5/core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/ByteArray.java
----------------------------------------------------------------------
diff --git a/1.5/core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/ByteArray.java b/1.5/core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/ByteArray.java
new file mode 100644
index 0000000..2e39289
--- /dev/null
+++ b/1.5/core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/ByteArray.java
@@ -0,0 +1,91 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.accumulo.core.file.rfile.bcfile;
+
+import org.apache.hadoop.io.BytesWritable;
+
+/**
+ * Adaptor class to wrap byte-array backed objects (including java byte array) as RawComparable objects.
+ */
+public final class ByteArray implements RawComparable {
+  private final byte[] buffer;
+  private final int offset;
+  private final int len;
+  
+  /**
+   * Constructing a ByteArray from a {@link BytesWritable}.
+   * 
+   * @param other
+   */
+  public ByteArray(BytesWritable other) {
+    this(other.getBytes(), 0, other.getLength());
+  }
+  
+  /**
+   * Wrap a whole byte array as a RawComparable.
+   * 
+   * @param buffer
+   *          the byte array buffer.
+   */
+  public ByteArray(byte[] buffer) {
+    this(buffer, 0, buffer.length);
+  }
+  
+  /**
+   * Wrap a partial byte array as a RawComparable.
+   * 
+   * @param buffer
+   *          the byte array buffer.
+   * @param offset
+   *          the starting offset
+   * @param len
+   *          the length of the consecutive bytes to be wrapped.
+   */
+  public ByteArray(byte[] buffer, int offset, int len) {
+    if ((offset | len | (buffer.length - offset - len)) < 0) {
+      throw new IndexOutOfBoundsException();
+    }
+    this.buffer = buffer;
+    this.offset = offset;
+    this.len = len;
+  }
+  
+  /**
+   * @return the underlying buffer.
+   */
+  @Override
+  public byte[] buffer() {
+    return buffer;
+  }
+  
+  /**
+   * @return the offset in the buffer.
+   */
+  @Override
+  public int offset() {
+    return offset;
+  }
+  
+  /**
+   * @return the size of the byte array.
+   */
+  @Override
+  public int size() {
+    return len;
+  }
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/7bdbfccb/1.5/core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/Chunk.java
----------------------------------------------------------------------
diff --git a/1.5/core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/Chunk.java b/1.5/core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/Chunk.java
new file mode 100644
index 0000000..7090c7e
--- /dev/null
+++ b/1.5/core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/Chunk.java
@@ -0,0 +1,418 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.accumulo.core.file.rfile.bcfile;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
+/**
+ * Several related classes to support chunk-encoded sub-streams on top of a regular stream.
+ */
+final class Chunk {
+  
+  /**
+   * Prevent the instantiation of class.
+   */
+  private Chunk() {
+    // nothing
+  }
+  
+  /**
+   * Decoding a chain of chunks encoded through ChunkEncoder or SingleChunkEncoder.
+   */
+  static public class ChunkDecoder extends InputStream {
+    private DataInputStream in = null;
+    private boolean lastChunk;
+    private int remain = 0;
+    private boolean closed;
+    
+    public ChunkDecoder() {
+      lastChunk = true;
+      closed = true;
+    }
+    
+    public void reset(DataInputStream downStream) {
+      // no need to wind forward the old input.
+      in = downStream;
+      lastChunk = false;
+      remain = 0;
+      closed = false;
+    }
+    
+    /**
+     * Constructor
+     * 
+     * @param in
+     *          The source input stream which contains chunk-encoded data stream.
+     */
+    public ChunkDecoder(DataInputStream in) {
+      this.in = in;
+      lastChunk = false;
+      closed = false;
+    }
+    
+    /**
+     * Have we reached the last chunk.
+     * 
+     * @return true if we have reached the last chunk.
+     * @throws java.io.IOException
+     */
+    public boolean isLastChunk() throws IOException {
+      checkEOF();
+      return lastChunk;
+    }
+    
+    /**
+     * How many bytes remain in the current chunk?
+     * 
+     * @return remaining bytes left in the current chunk.
+     * @throws java.io.IOException
+     */
+    public int getRemain() throws IOException {
+      checkEOF();
+      return remain;
+    }
+    
+    /**
+     * Reading the length of next chunk.
+     * 
+     * @throws java.io.IOException
+     *           when no more data is available.
+     */
+    private void readLength() throws IOException {
+      remain = Utils.readVInt(in);
+      if (remain >= 0) {
+        lastChunk = true;
+      } else {
+        remain = -remain;
+      }
+    }
+    
+    /**
+     * Check whether we reach the end of the stream.
+     * 
+     * @return false if the chunk encoded stream has more data to read (in which case available() will be greater than 0); true otherwise.
+     * @throws java.io.IOException
+     *           on I/O errors.
+     */
+    private boolean checkEOF() throws IOException {
+      if (isClosed())
+        return true;
+      while (true) {
+        if (remain > 0)
+          return false;
+        if (lastChunk)
+          return true;
+        readLength();
+      }
+    }
+    
+    @Override
+    /*
+     * This method never blocks the caller. Returning 0 does not mean we reach the end of the stream.
+     */
+    public int available() {
+      return remain;
+    }
+    
+    @Override
+    public int read() throws IOException {
+      if (checkEOF())
+        return -1;
+      int ret = in.read();
+      if (ret < 0)
+        throw new IOException("Corrupted chunk encoding stream");
+      --remain;
+      return ret;
+    }
+    
+    @Override
+    public int read(byte[] b) throws IOException {
+      return read(b, 0, b.length);
+    }
+    
+    @Override
+    public int read(byte[] b, int off, int len) throws IOException {
+      if ((off | len | (off + len) | (b.length - (off + len))) < 0) {
+        throw new IndexOutOfBoundsException();
+      }
+      
+      if (!checkEOF()) {
+        int n = Math.min(remain, len);
+        int ret = in.read(b, off, n);
+        if (ret < 0)
+          throw new IOException("Corrupted chunk encoding stream");
+        remain -= ret;
+        return ret;
+      }
+      return -1;
+    }
+    
+    @Override
+    public long skip(long n) throws IOException {
+      if (!checkEOF()) {
+        long ret = in.skip(Math.min(remain, n));
+        remain -= ret;
+        return ret;
+      }
+      return 0;
+    }
+    
+    @Override
+    public boolean markSupported() {
+      return false;
+    }
+    
+    public boolean isClosed() {
+      return closed;
+    }
+    
+    @Override
+    public void close() throws IOException {
+      if (closed == false) {
+        try {
+          while (!checkEOF()) {
+            skip(Integer.MAX_VALUE);
+          }
+        } finally {
+          closed = true;
+        }
+      }
+    }
+  }
+  
+  /**
+   * Chunk Encoder. Encoding the output data into a chain of chunks in the following sequences: -len1, byte[len1], -len2, byte[len2], ... len_n, byte[len_n].
+   * Where len1, len2, ..., len_n are the lengths of the data chunks. Non-terminal chunks have their lengths negated. Non-terminal chunks cannot have length 0.
+   * All lengths are in the range of 0 to Integer.MAX_VALUE and are encoded in Utils.VInt format.
+   */
+  static public class ChunkEncoder extends OutputStream {
+    /**
+     * The data output stream it connects to.
+     */
+    private DataOutputStream out;
+    
+    /**
+     * The internal buffer that is only used when we do not know the advertised size.
+     */
+    private byte buf[];
+    
+    /**
+     * The number of valid bytes in the buffer. This value is always in the range <tt>0</tt> through <tt>buf.length</tt>; elements <tt>buf[0]</tt> through
+     * <tt>buf[count-1]</tt> contain valid byte data.
+     */
+    private int count;
+    
+    /**
+     * Constructor.
+     * 
+     * @param out
+     *          the underlying output stream.
+     * @param buf
+     *          user-supplied buffer. The buffer would be used exclusively by the ChunkEncoder during its life cycle.
+     */
+    public ChunkEncoder(DataOutputStream out, byte[] buf) {
+      this.out = out;
+      this.buf = buf;
+      this.count = 0;
+    }
+    
+    /**
+     * Write out a chunk.
+     * 
+     * @param chunk
+     *          The chunk buffer.
+     * @param offset
+     *          Offset to chunk buffer for the beginning of chunk.
+     * @param len
+     * @param last
+     *          Is this the last call to flushBuffer?
+     */
+    private void writeChunk(byte[] chunk, int offset, int len, boolean last) throws IOException {
+      if (last) { // always write out the length for the last chunk.
+        Utils.writeVInt(out, len);
+        if (len > 0) {
+          out.write(chunk, offset, len);
+        }
+      } else {
+        if (len > 0) {
+          Utils.writeVInt(out, -len);
+          out.write(chunk, offset, len);
+        }
+      }
+    }
+    
+    /**
+     * Write out a chunk that is a concatenation of the internal buffer plus user supplied data. This will never be the last block.
+     * 
+     * @param data
+     *          User supplied data buffer.
+     * @param offset
+     *          Offset to user data buffer.
+     * @param len
+     *          User data buffer size.
+     */
+    private void writeBufData(byte[] data, int offset, int len) throws IOException {
+      if (count + len > 0) {
+        Utils.writeVInt(out, -(count + len));
+        out.write(buf, 0, count);
+        count = 0;
+        out.write(data, offset, len);
+      }
+    }
+    
+    /**
+     * Flush the internal buffer.
+     * 
+     * Is this the last call to flushBuffer?
+     * 
+     * @throws java.io.IOException
+     */
+    private void flushBuffer() throws IOException {
+      if (count > 0) {
+        writeChunk(buf, 0, count, false);
+        count = 0;
+      }
+    }
+    
+    @Override
+    public void write(int b) throws IOException {
+      if (count >= buf.length) {
+        flushBuffer();
+      }
+      buf[count++] = (byte) b;
+    }
+    
+    @Override
+    public void write(byte b[]) throws IOException {
+      write(b, 0, b.length);
+    }
+    
+    @Override
+    public void write(byte b[], int off, int len) throws IOException {
+      if ((len + count) >= buf.length) {
+        /*
+         * If the input data do not fit in buffer, flush the output buffer and then write the data directly. In this way buffered streams will cascade
+         * harmlessly.
+         */
+        writeBufData(b, off, len);
+        return;
+      }
+      
+      System.arraycopy(b, off, buf, count, len);
+      count += len;
+    }
+    
+    @Override
+    public void flush() throws IOException {
+      flushBuffer();
+      out.flush();
+    }
+    
+    @Override
+    public void close() throws IOException {
+      if (buf != null) {
+        try {
+          writeChunk(buf, 0, count, true);
+        } finally {
+          buf = null;
+          out = null;
+        }
+      }
+    }
+  }
+  
+  /**
+   * Encode the whole stream as a single chunk. Expecting to know the size of the chunk up-front.
+   */
+  static public class SingleChunkEncoder extends OutputStream {
+    /**
+     * The data output stream it connects to.
+     */
+    private final DataOutputStream out;
+    
+    /**
+     * The remaining bytes to be written.
+     */
+    private int remain;
+    private boolean closed = false;
+    
+    /**
+     * Constructor.
+     * 
+     * @param out
+     *          the underlying output stream.
+     * @param size
+     *          The total # of bytes to be written as a single chunk.
+     * @throws java.io.IOException
+     *           if an I/O error occurs.
+     */
+    public SingleChunkEncoder(DataOutputStream out, int size) throws IOException {
+      this.out = out;
+      this.remain = size;
+      Utils.writeVInt(out, size);
+    }
+    
+    @Override
+    public void write(int b) throws IOException {
+      if (remain > 0) {
+        out.write(b);
+        --remain;
+      } else {
+        throw new IOException("Writing more bytes than advertised size.");
+      }
+    }
+    
+    @Override
+    public void write(byte b[]) throws IOException {
+      write(b, 0, b.length);
+    }
+    
+    @Override
+    public void write(byte b[], int off, int len) throws IOException {
+      if (remain >= len) {
+        out.write(b, off, len);
+        remain -= len;
+      } else {
+        throw new IOException("Writing more bytes than advertised size.");
+      }
+    }
+    
+    @Override
+    public void flush() throws IOException {
+      out.flush();
+    }
+    
+    @Override
+    public void close() throws IOException {
+      if (closed == true) {
+        return;
+      }
+      
+      try {
+        if (remain > 0) {
+          throw new IOException("Writing less bytes than advertised size.");
+        }
+      } finally {
+        closed = true;
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/7bdbfccb/1.5/core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/CompareUtils.java
----------------------------------------------------------------------
diff --git a/1.5/core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/CompareUtils.java b/1.5/core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/CompareUtils.java
new file mode 100644
index 0000000..74a3767
--- /dev/null
+++ b/1.5/core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/CompareUtils.java
@@ -0,0 +1,94 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.accumulo.core.file.rfile.bcfile;
+
+import java.util.Comparator;
+
+import org.apache.hadoop.io.RawComparator;
+import org.apache.hadoop.io.WritableComparator;
+
+class CompareUtils {
+  /**
+   * Prevent the instantiation of class.
+   */
+  private CompareUtils() {
+    // nothing
+  }
+  
+  /**
+   * A comparator to compare anything that implements {@link RawComparable} using a customized comparator.
+   */
+  public static final class BytesComparator implements Comparator<RawComparable> {
+    private RawComparator<Object> cmp;
+    
+    public BytesComparator(RawComparator<Object> cmp) {
+      this.cmp = cmp;
+    }
+    
+    @Override
+    public int compare(RawComparable o1, RawComparable o2) {
+      return compare(o1.buffer(), o1.offset(), o1.size(), o2.buffer(), o2.offset(), o2.size());
+    }
+    
+    public int compare(byte[] a, int off1, int len1, byte[] b, int off2, int len2) {
+      return cmp.compare(a, off1, len1, b, off2, len2);
+    }
+  }
+  
+  /**
+   * Interface for all objects that has a single integer magnitude.
+   */
+  static interface Scalar {
+    long magnitude();
+  }
+  
+  static final class ScalarLong implements Scalar {
+    private long magnitude;
+    
+    public ScalarLong(long m) {
+      magnitude = m;
+    }
+    
+    public long magnitude() {
+      return magnitude;
+    }
+  }
+  
+  public static final class ScalarComparator implements Comparator<Scalar> {
+    @Override
+    public int compare(Scalar o1, Scalar o2) {
+      long diff = o1.magnitude() - o2.magnitude();
+      if (diff < 0)
+        return -1;
+      if (diff > 0)
+        return 1;
+      return 0;
+    }
+  }
+  
+  public static final class MemcmpRawComparator implements RawComparator<Object> {
+    @Override
+    public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
+      return WritableComparator.compareBytes(b1, s1, l1, b2, s2, l2);
+    }
+    
+    @Override
+    public int compare(Object o1, Object o2) {
+      throw new RuntimeException("Object comparison not supported");
+    }
+  }
+}


Mime
View raw message