accumulo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ctubb...@apache.org
Subject [26/61] [abbrv] [partial] accumulo git commit: ACCUMULO-722 put trunk in my sandbox
Date Thu, 03 Mar 2016 21:59:51 GMT
http://git-wip-us.apache.org/repos/asf/accumulo/blob/7bdbfccb/1.5/core/src/main/java/org/apache/accumulo/core/file/map/MapFileOperations.java
----------------------------------------------------------------------
diff --git a/1.5/core/src/main/java/org/apache/accumulo/core/file/map/MapFileOperations.java b/1.5/core/src/main/java/org/apache/accumulo/core/file/map/MapFileOperations.java
new file mode 100644
index 0000000..267f805
--- /dev/null
+++ b/1.5/core/src/main/java/org/apache/accumulo/core/file/map/MapFileOperations.java
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.core.file.map;
+
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
+import org.apache.accumulo.core.data.ByteSequence;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.file.FileOperations;
+import org.apache.accumulo.core.file.FileSKVIterator;
+import org.apache.accumulo.core.file.FileSKVWriter;
+import org.apache.accumulo.core.file.blockfile.cache.BlockCache;
+import org.apache.accumulo.core.iterators.IteratorEnvironment;
+import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
+import org.apache.accumulo.core.iterators.system.MapFileIterator;
+import org.apache.accumulo.core.iterators.system.SequenceFileIterator;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.MapFile;
+
+public class MapFileOperations extends FileOperations {
+  
+  public static class RangeIterator implements FileSKVIterator {
+    
+    SortedKeyValueIterator<Key,Value> reader;
+    private Range range;
+    private boolean hasTop;
+    
+    public RangeIterator(SortedKeyValueIterator<Key,Value> reader) {
+      this.reader = reader;
+    }
+    
+    @Override
+    public void close() throws IOException {
+      ((FileSKVIterator) reader).close();
+    }
+    
+    @Override
+    public Key getFirstKey() throws IOException {
+      return ((FileSKVIterator) reader).getFirstKey();
+    }
+    
+    @Override
+    public Key getLastKey() throws IOException {
+      return ((FileSKVIterator) reader).getLastKey();
+    }
+    
+    @Override
+    public DataInputStream getMetaStore(String name) throws IOException {
+      return ((FileSKVIterator) reader).getMetaStore(name);
+    }
+    
+    @Override
+    public SortedKeyValueIterator<Key,Value> deepCopy(IteratorEnvironment env) {
+      return new RangeIterator(reader.deepCopy(env));
+    }
+    
+    @Override
+    public Key getTopKey() {
+      if (!hasTop)
+        throw new IllegalStateException();
+      return reader.getTopKey();
+    }
+    
+    @Override
+    public Value getTopValue() {
+      if (!hasTop)
+        throw new IllegalStateException();
+      return reader.getTopValue();
+    }
+    
+    @Override
+    public boolean hasTop() {
+      return hasTop;
+    }
+    
+    @Override
+    public void init(SortedKeyValueIterator<Key,Value> source, Map<String,String> options, IteratorEnvironment env) throws IOException {
+      throw new UnsupportedOperationException();
+    }
+    
+    @Override
+    public void next() throws IOException {
+      if (!hasTop)
+        throw new IllegalStateException();
+      reader.next();
+      hasTop = reader.hasTop() && !range.afterEndKey(reader.getTopKey());
+    }
+    
+    @Override
+    public void seek(Range range, Collection<ByteSequence> columnFamilies, boolean inclusive) throws IOException {
+      reader.seek(range, columnFamilies, inclusive);
+      this.range = range;
+      
+      hasTop = reader.hasTop() && !range.afterEndKey(reader.getTopKey());
+      
+      while (hasTop() && range.beforeStartKey(getTopKey())) {
+        next();
+      }
+    }
+    
+    @Override
+    public void closeDeepCopies() throws IOException {
+      ((FileSKVIterator) reader).closeDeepCopies();
+    }
+    
+    @Override
+    public void setInterruptFlag(AtomicBoolean flag) {
+      ((FileSKVIterator) reader).setInterruptFlag(flag);
+    }
+  }
+  
+  @Override
+  public FileSKVIterator openReader(String file, boolean seekToBeginning, FileSystem fs, Configuration conf, AccumuloConfiguration acuconf) throws IOException {
+    FileSKVIterator iter = new RangeIterator(new MapFileIterator(acuconf, fs, file, conf));
+    
+    if (seekToBeginning)
+      iter.seek(new Range(new Key(), null), new ArrayList<ByteSequence>(), false);
+    
+    return iter;
+  }
+  
+  @Override
+  public FileSKVWriter openWriter(final String file, final FileSystem fs, Configuration conf, AccumuloConfiguration acuconf) throws IOException {
+    
+    throw new UnsupportedOperationException();
+
+  }
+  
+  @Override
+  public FileSKVIterator openIndex(String file, FileSystem fs, Configuration conf, AccumuloConfiguration acuconf) throws IOException {
+    return new SequenceFileIterator(MapFileUtil.openIndex(conf, fs, new Path(file)), false);
+  }
+  
+  @Override
+  public long getFileSize(String file, FileSystem fs, Configuration conf, AccumuloConfiguration acuconf) throws IOException {
+    return fs.getFileStatus(new Path(file + "/" + MapFile.DATA_FILE_NAME)).getLen();
+  }
+  
+  @Override
+  public FileSKVIterator openReader(String file, Range range, Set<ByteSequence> columnFamilies, boolean inclusive, FileSystem fs, Configuration conf,
+      AccumuloConfiguration tableConf) throws IOException {
+    MapFileIterator mfIter = new MapFileIterator(tableConf, fs, file, conf);
+    
+    FileSKVIterator iter = new RangeIterator(mfIter);
+    
+    iter.seek(range, columnFamilies, inclusive);
+    
+    return iter;
+  }
+  
+  @Override
+  public FileSKVIterator openReader(String file, Range range, Set<ByteSequence> columnFamilies, boolean inclusive, FileSystem fs, Configuration conf,
+      AccumuloConfiguration tableConf, BlockCache dataCache, BlockCache indexCache) throws IOException {
+    
+    return openReader(file, range, columnFamilies, inclusive, fs, conf, tableConf);
+  }
+  
+  @Override
+  public FileSKVIterator openReader(String file, boolean seekToBeginning, FileSystem fs, Configuration conf, AccumuloConfiguration acuconf,
+      BlockCache dataCache, BlockCache indexCache) throws IOException {
+    
+    return openReader(file, seekToBeginning, fs, conf, acuconf);
+  }
+  
+  @Override
+  public FileSKVIterator openIndex(String file, FileSystem fs, Configuration conf, AccumuloConfiguration acuconf, BlockCache dCache, BlockCache iCache)
+      throws IOException {
+    
+    return openIndex(file, fs, conf, acuconf);
+  }
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/7bdbfccb/1.5/core/src/main/java/org/apache/accumulo/core/file/map/MapFileUtil.java
----------------------------------------------------------------------
diff --git a/1.5/core/src/main/java/org/apache/accumulo/core/file/map/MapFileUtil.java b/1.5/core/src/main/java/org/apache/accumulo/core/file/map/MapFileUtil.java
new file mode 100644
index 0000000..d776ebe
--- /dev/null
+++ b/1.5/core/src/main/java/org/apache/accumulo/core/file/map/MapFileUtil.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.core.file.map;
+
+import java.io.IOException;
+
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.MapFile;
+import org.apache.hadoop.io.SequenceFile;
+
+public class MapFileUtil {
+  public static MapFile.Reader openMapFile(AccumuloConfiguration acuconf, FileSystem fs, String dirName, Configuration conf) throws IOException {
+    MapFile.Reader mfr = null;
+    try {
+      mfr = new MapFile.Reader(fs, dirName, conf);
+      return mfr;
+    } catch (IOException e) {
+      throw e;
+    }
+  }
+  
+  public static SequenceFile.Reader openIndex(Configuration conf, FileSystem fs, Path mapFile) throws IOException {
+    Path indexPath = new Path(mapFile, MapFile.INDEX_FILE_NAME);
+    SequenceFile.Reader index = null;
+    try {
+      index = new SequenceFile.Reader(fs, indexPath, conf);
+      return index;
+    } catch (IOException e) {
+      throw e;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/7bdbfccb/1.5/core/src/main/java/org/apache/accumulo/core/file/rfile/BlockIndex.java
----------------------------------------------------------------------
diff --git a/1.5/core/src/main/java/org/apache/accumulo/core/file/rfile/BlockIndex.java b/1.5/core/src/main/java/org/apache/accumulo/core/file/rfile/BlockIndex.java
new file mode 100644
index 0000000..3745e33
--- /dev/null
+++ b/1.5/core/src/main/java/org/apache/accumulo/core/file/rfile/BlockIndex.java
@@ -0,0 +1,168 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.core.file.rfile;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.file.blockfile.ABlockReader;
+import org.apache.accumulo.core.file.rfile.MultiLevelIndex.IndexEntry;
+
+/**
+ * 
+ */
+public class BlockIndex {
+  
+  public static BlockIndex getIndex(ABlockReader cacheBlock, IndexEntry indexEntry) throws IOException {
+    
+    BlockIndex blockIndex = cacheBlock.getIndex(BlockIndex.class);
+    
+    int accessCount = blockIndex.accessCount.incrementAndGet();
+    
+    // 1 is a power of two, but do not care about it
+    if (accessCount >= 2 && isPowerOfTwo(accessCount)) {
+      blockIndex.buildIndex(accessCount, cacheBlock, indexEntry);
+    }
+    
+    if (blockIndex.blockIndex != null)
+      return blockIndex;
+
+    return null;
+  }
+  
+  private static boolean isPowerOfTwo(int x) {
+    return ((x > 0) && (x & (x - 1)) == 0);
+  }
+  
+  private AtomicInteger accessCount = new AtomicInteger(0);
+  private volatile BlockIndexEntry[] blockIndex = null;
+
+  public static class BlockIndexEntry implements Comparable<BlockIndexEntry> {
+    
+    private Key key;
+    private int entriesLeft;
+    private int pos;
+    
+    public BlockIndexEntry(int pos, int entriesLeft, Key key) {
+      this.pos = pos;
+      this.entriesLeft = entriesLeft;
+      this.key = key;
+    }
+
+    /**
+     * @param key
+     */
+    public BlockIndexEntry(Key key) {
+      this.key = key;
+    }
+
+    public Key getKey() {
+      return key;
+    }
+    
+    public int getEntriesLeft() {
+      return entriesLeft;
+    }
+
+    @Override
+    public int compareTo(BlockIndexEntry o) {
+      return key.compareTo(o.key);
+    }
+    
+    public String toString() {
+      return key + " " + entriesLeft + " " + pos;
+    }
+  }
+  
+  public BlockIndexEntry seekBlock(Key startKey, ABlockReader cacheBlock) {
+
+    
+    // get a local ref to the index, another thread could change it
+    BlockIndexEntry[] blockIndex = this.blockIndex;
+    
+    int pos = Arrays.binarySearch(blockIndex, new BlockIndexEntry(startKey));
+
+    int index;
+    
+    if (pos < 0) {
+      if (pos == -1)
+        return null; // less than the first key in index, did not index the first key in block so just return null... code calling this will scan from beginning
+                     // of block
+      index = (pos * -1) - 2;
+    } else {
+      // found exact key in index
+      index = pos;
+    }
+    
+    // handle case where multiple keys in block are exactly the same, want to find the earliest key in the index
+    while (index - 1 > 0) {
+      if (blockIndex[index].getKey().equals(blockIndex[index - 1].getKey()))
+        index--;
+      else
+        break;
+
+    }
+    
+    if (index == 0 && blockIndex[index].getKey().equals(startKey))
+      return null;
+
+    BlockIndexEntry bie = blockIndex[index];
+    cacheBlock.seek(bie.pos);
+    return bie;
+  }
+  
+  private synchronized void buildIndex(int indexEntries, ABlockReader cacheBlock, IndexEntry indexEntry) throws IOException {
+    cacheBlock.seek(0);
+    
+    RelativeKey rk = new RelativeKey();
+    Value val = new Value();
+    
+    int interval = indexEntry.getNumEntries() / indexEntries;
+    
+    if (interval <= 32)
+      return;
+    
+    // multiple threads could try to create the index with different sizes, do not replace a large index with a smaller one
+    if (this.blockIndex != null && this.blockIndex.length > indexEntries - 1)
+      return;
+
+    int count = 0;
+    
+    ArrayList<BlockIndexEntry> index = new ArrayList<BlockIndexEntry>(indexEntries - 1);
+
+    while (count < (indexEntry.getNumEntries() - interval + 1)) {
+
+      int pos = cacheBlock.getPosition();
+      rk.readFields(cacheBlock);
+      val.readFields(cacheBlock);
+
+      if (count > 0 && count % interval == 0) {
+        index.add(new BlockIndexEntry(pos, indexEntry.getNumEntries() - count, rk.getKey()));
+      }
+      
+      count++;
+    }
+
+    this.blockIndex = index.toArray(new BlockIndexEntry[index.size()]);
+
+    cacheBlock.seek(0);
+  }
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/7bdbfccb/1.5/core/src/main/java/org/apache/accumulo/core/file/rfile/IndexIterator.java
----------------------------------------------------------------------
diff --git a/1.5/core/src/main/java/org/apache/accumulo/core/file/rfile/IndexIterator.java b/1.5/core/src/main/java/org/apache/accumulo/core/file/rfile/IndexIterator.java
new file mode 100644
index 0000000..f9c8686
--- /dev/null
+++ b/1.5/core/src/main/java/org/apache/accumulo/core/file/rfile/IndexIterator.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.core.file.rfile;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.Map;
+
+import org.apache.accumulo.core.data.ByteSequence;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.file.rfile.MultiLevelIndex.IndexEntry;
+import org.apache.accumulo.core.iterators.IteratorEnvironment;
+import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
+
+class IndexIterator implements SortedKeyValueIterator<Key,Value> {
+  
+  private Key key;
+  private Iterator<IndexEntry> indexIter;
+  
+  IndexIterator(Iterator<IndexEntry> indexIter) {
+    this.indexIter = indexIter;
+    if (indexIter.hasNext())
+      key = indexIter.next().getKey();
+    else
+      key = null;
+  }
+  
+  @Override
+  public SortedKeyValueIterator<Key,Value> deepCopy(IteratorEnvironment env) {
+    throw new UnsupportedOperationException();
+  }
+  
+  @Override
+  public Key getTopKey() {
+    return key;
+  }
+  
+  @Override
+  public Value getTopValue() {
+    throw new UnsupportedOperationException();
+  }
+  
+  @Override
+  public boolean hasTop() {
+    return key != null;
+  }
+  
+  @Override
+  public void init(SortedKeyValueIterator<Key,Value> source, Map<String,String> options, IteratorEnvironment env) throws IOException {
+    throw new UnsupportedOperationException();
+  }
+  
+  @Override
+  public void next() throws IOException {
+    if (indexIter.hasNext())
+      key = indexIter.next().getKey();
+    else
+      key = null;
+  }
+  
+  @Override
+  public void seek(Range range, Collection<ByteSequence> columnFamilies, boolean inclusive) throws IOException {
+    throw new UnsupportedOperationException();
+  }
+  
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/7bdbfccb/1.5/core/src/main/java/org/apache/accumulo/core/file/rfile/MultiIndexIterator.java
----------------------------------------------------------------------
diff --git a/1.5/core/src/main/java/org/apache/accumulo/core/file/rfile/MultiIndexIterator.java b/1.5/core/src/main/java/org/apache/accumulo/core/file/rfile/MultiIndexIterator.java
new file mode 100644
index 0000000..5dade97
--- /dev/null
+++ b/1.5/core/src/main/java/org/apache/accumulo/core/file/rfile/MultiIndexIterator.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.core.file.rfile;
+
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.accumulo.core.data.ByteSequence;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.file.FileSKVIterator;
+import org.apache.accumulo.core.file.rfile.MultiLevelIndex.IndexEntry;
+import org.apache.accumulo.core.iterators.IteratorEnvironment;
+import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
+import org.apache.accumulo.core.iterators.system.HeapIterator;
+
+class MultiIndexIterator extends HeapIterator implements FileSKVIterator {
+  
+  private RFile.Reader source;
+  
+  MultiIndexIterator(RFile.Reader source, List<Iterator<IndexEntry>> indexes) {
+    super(indexes.size());
+    
+    this.source = source;
+    
+    for (Iterator<IndexEntry> index : indexes) {
+      addSource(new IndexIterator(index));
+    }
+  }
+  
+  @Override
+  public SortedKeyValueIterator<Key,Value> deepCopy(IteratorEnvironment env) {
+    throw new UnsupportedOperationException();
+  }
+  
+  @Override
+  public void init(SortedKeyValueIterator<Key,Value> source, Map<String,String> options, IteratorEnvironment env) throws IOException {
+    throw new UnsupportedOperationException();
+  }
+  
+  @Override
+  public void seek(Range range, Collection<ByteSequence> columnFamilies, boolean inclusive) throws IOException {
+    throw new UnsupportedOperationException();
+  }
+  
+  @Override
+  public void close() throws IOException {
+    source.close();
+  }
+  
+  @Override
+  public void closeDeepCopies() throws IOException {
+    throw new UnsupportedOperationException();
+  }
+  
+  @Override
+  public Key getFirstKey() throws IOException {
+    throw new UnsupportedOperationException();
+  }
+  
+  @Override
+  public Key getLastKey() throws IOException {
+    throw new UnsupportedOperationException();
+  }
+  
+  @Override
+  public DataInputStream getMetaStore(String name) throws IOException {
+    throw new UnsupportedOperationException();
+  }
+  
+  @Override
+  public void setInterruptFlag(AtomicBoolean flag) {
+    throw new UnsupportedOperationException();
+  }
+  
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/7bdbfccb/1.5/core/src/main/java/org/apache/accumulo/core/file/rfile/MultiLevelIndex.java
----------------------------------------------------------------------
diff --git a/1.5/core/src/main/java/org/apache/accumulo/core/file/rfile/MultiLevelIndex.java b/1.5/core/src/main/java/org/apache/accumulo/core/file/rfile/MultiLevelIndex.java
new file mode 100644
index 0000000..b973cc3
--- /dev/null
+++ b/1.5/core/src/main/java/org/apache/accumulo/core/file/rfile/MultiLevelIndex.java
@@ -0,0 +1,774 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.core.file.rfile;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInput;
+import java.io.DataInputStream;
+import java.io.DataOutput;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.AbstractList;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import java.util.ListIterator;
+import java.util.Map;
+import java.util.RandomAccess;
+
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.file.blockfile.ABlockReader;
+import org.apache.accumulo.core.file.blockfile.ABlockWriter;
+import org.apache.accumulo.core.file.blockfile.BlockFileReader;
+import org.apache.accumulo.core.file.blockfile.BlockFileWriter;
+import org.apache.accumulo.core.file.rfile.bcfile.Utils;
+import org.apache.hadoop.io.WritableComparable;
+
+public class MultiLevelIndex {
+  
+  public static class IndexEntry implements WritableComparable<IndexEntry> {
+    private Key key;
+    private int entries;
+    private long offset;
+    private long compressedSize;
+    private long rawSize;
+    private boolean newFormat;
+    
+    IndexEntry(Key k, int e, long offset, long compressedSize, long rawSize) {
+      this.key = k;
+      this.entries = e;
+      this.offset = offset;
+      this.compressedSize = compressedSize;
+      this.rawSize = rawSize;
+      newFormat = true;
+    }
+    
+    public IndexEntry(boolean newFormat) {
+      this.newFormat = newFormat;
+    }
+    
+    @Override
+    public void readFields(DataInput in) throws IOException {
+      key = new Key();
+      key.readFields(in);
+      entries = in.readInt();
+      if (newFormat) {
+        offset = Utils.readVLong(in);
+        compressedSize = Utils.readVLong(in);
+        rawSize = Utils.readVLong(in);
+      } else {
+        offset = -1;
+        compressedSize = -1;
+        rawSize = -1;
+      }
+    }
+    
+    @Override
+    public void write(DataOutput out) throws IOException {
+      key.write(out);
+      out.writeInt(entries);
+      if (newFormat) {
+        Utils.writeVLong(out, offset);
+        Utils.writeVLong(out, compressedSize);
+        Utils.writeVLong(out, rawSize);
+      }
+    }
+    
+    public Key getKey() {
+      return key;
+    }
+    
+    public int getNumEntries() {
+      return entries;
+    }
+    
+    public long getOffset() {
+      return offset;
+    }
+    
+    public long getCompressedSize() {
+      return compressedSize;
+    }
+    
+    public long getRawSize() {
+      return rawSize;
+    }
+    
+    @Override
+    public int compareTo(IndexEntry o) {
+      return key.compareTo(o.key);
+    }
+  }
+  
+  // a list that deserializes index entries on demand
+  private static class SerializedIndex extends AbstractList<IndexEntry> implements List<IndexEntry>, RandomAccess {
+    
+    private int[] offsets;
+    private byte[] data;
+    private boolean newFormat;
+    
+    SerializedIndex(int[] offsets, byte[] data, boolean newFormat) {
+      this.offsets = offsets;
+      this.data = data;
+      this.newFormat = newFormat;
+    }
+    
+    @Override
+    public IndexEntry get(int index) {
+      int len;
+      if (index == offsets.length - 1)
+        len = data.length - offsets[index];
+      else
+        len = offsets[index + 1] - offsets[index];
+      
+      ByteArrayInputStream bais = new ByteArrayInputStream(data, offsets[index], len);
+      DataInputStream dis = new DataInputStream(bais);
+      
+      IndexEntry ie = new IndexEntry(newFormat);
+      try {
+        ie.readFields(dis);
+      } catch (IOException e) {
+        throw new RuntimeException(e);
+      }
+      
+      return ie;
+    }
+    
+    @Override
+    public int size() {
+      return offsets.length;
+    }
+    
+    public long sizeInBytes() {
+      return data.length + 4 * offsets.length;
+    }
+    
+  }
+  
+  private static class KeyIndex extends AbstractList<Key> implements List<Key>, RandomAccess {
+    
+    private int[] offsets;
+    private byte[] data;
+    
+    KeyIndex(int[] offsets, byte[] data) {
+      this.offsets = offsets;
+      this.data = data;
+    }
+    
+    @Override
+    public Key get(int index) {
+      int len;
+      if (index == offsets.length - 1)
+        len = data.length - offsets[index];
+      else
+        len = offsets[index + 1] - offsets[index];
+      
+      ByteArrayInputStream bais = new ByteArrayInputStream(data, offsets[index], len);
+      DataInputStream dis = new DataInputStream(bais);
+      
+      Key key = new Key();
+      try {
+        key.readFields(dis);
+      } catch (IOException e) {
+        throw new RuntimeException(e);
+      }
+      
+      return key;
+    }
+    
+    @Override
+    public int size() {
+      return offsets.length;
+    }
+  }
+  
+  static class IndexBlock {
+    
+    private ByteArrayOutputStream indexBytes;
+    private DataOutputStream indexOut;
+    
+    private ArrayList<Integer> offsets;
+    private int level;
+    private int offset;
+    
+    SerializedIndex index;
+    KeyIndex keyIndex;
+    private boolean hasNext;
+    
+    public IndexBlock(int level, int totalAdded) {
+      // System.out.println("IndexBlock("+level+","+levelCount+","+totalAdded+")");
+      
+      this.level = level;
+      this.offset = totalAdded;
+      
+      indexBytes = new ByteArrayOutputStream();
+      indexOut = new DataOutputStream(indexBytes);
+      offsets = new ArrayList<Integer>();
+    }
+    
+    public IndexBlock() {}
+    
+    public void add(Key key, int value, long offset, long compressedSize, long rawSize) throws IOException {
+      offsets.add(indexOut.size());
+      new IndexEntry(key, value, offset, compressedSize, rawSize).write(indexOut);
+    }
+    
+    int getSize() {
+      return indexOut.size() + 4 * offsets.size();
+    }
+    
+    public void write(DataOutput out) throws IOException {
+      out.writeInt(level);
+      out.writeInt(offset);
+      out.writeBoolean(hasNext);
+      
+      out.writeInt(offsets.size());
+      for (Integer offset : offsets) {
+        out.writeInt(offset);
+      }
+      
+      indexOut.close();
+      byte[] indexData = indexBytes.toByteArray();
+      
+      out.writeInt(indexData.length);
+      out.write(indexData);
+    }
+    
+    public void readFields(DataInput in, int version) throws IOException {
+      
+      if (version == RFile.RINDEX_VER_6) {
+        level = in.readInt();
+        offset = in.readInt();
+        hasNext = in.readBoolean();
+        
+        int numOffsets = in.readInt();
+        int[] offsets = new int[numOffsets];
+        
+        for (int i = 0; i < numOffsets; i++)
+          offsets[i] = in.readInt();
+        
+        int indexSize = in.readInt();
+        byte[] serializedIndex = new byte[indexSize];
+        in.readFully(serializedIndex);
+        
+        index = new SerializedIndex(offsets, serializedIndex, true);
+        keyIndex = new KeyIndex(offsets, serializedIndex);
+      } else if (version == RFile.RINDEX_VER_3) {
+        level = 0;
+        offset = 0;
+        hasNext = false;
+        
+        int size = in.readInt();
+        
+        ByteArrayOutputStream baos = new ByteArrayOutputStream();
+        DataOutputStream dos = new DataOutputStream(baos);
+        ArrayList<Integer> oal = new ArrayList<Integer>();
+        
+        for (int i = 0; i < size; i++) {
+          IndexEntry ie = new IndexEntry(false);
+          oal.add(dos.size());
+          ie.readFields(in);
+          ie.write(dos);
+        }
+        
+        dos.close();
+        
+        int[] oia = new int[oal.size()];
+        for (int i = 0; i < oal.size(); i++) {
+          oia[i] = oal.get(i);
+        }
+        
+        byte[] serializedIndex = baos.toByteArray();
+        index = new SerializedIndex(oia, serializedIndex, false);
+        keyIndex = new KeyIndex(oia, serializedIndex);
+      } else if (version == RFile.RINDEX_VER_4) {
+        level = 0;
+        offset = 0;
+        hasNext = false;
+        
+        int numIndexEntries = in.readInt();
+        int offsets[] = new int[numIndexEntries];
+        for (int i = 0; i < numIndexEntries; i++) {
+          offsets[i] = in.readInt();
+        }
+        
+        int size = in.readInt();
+        byte[] indexData = new byte[size];
+        in.readFully(indexData);
+        
+        index = new SerializedIndex(offsets, indexData, false);
+        keyIndex = new KeyIndex(offsets, indexData);
+      } else {
+        throw new RuntimeException("Unexpected version " + version);
+      }
+      
+    }
+    
+    List<IndexEntry> getIndex() {
+      return index;
+    }
+    
+    public List<Key> getKeyIndex() {
+      return keyIndex;
+    }
+    
+    int getLevel() {
+      return level;
+    }
+    
+    int getOffset() {
+      return offset;
+    }
+    
+    boolean hasNext() {
+      return hasNext;
+    }
+    
+    void setHasNext(boolean b) {
+      this.hasNext = b;
+    }
+    
+  }
+  
+  /**
+   * this class buffers writes to the index so that chunks of index blocks are contiguous in the file instead of having index blocks sprinkled throughout the
+   * file making scans of the entire index slow.
+   */
+  public static class BufferedWriter {
+    
+    private Writer writer;
+    private DataOutputStream buffer;
+    private int buffered;
+    private ByteArrayOutputStream baos;
+    
+    public BufferedWriter(Writer writer) {
+      this.writer = writer;
+      baos = new ByteArrayOutputStream(1 << 20);
+      buffer = new DataOutputStream(baos);
+      buffered = 0;
+    }
+    
+    private void flush() throws IOException {
+      buffer.close();
+      
+      DataInputStream dis = new DataInputStream(new ByteArrayInputStream(baos.toByteArray()));
+      
+      IndexEntry ie = new IndexEntry(true);
+      for (int i = 0; i < buffered; i++) {
+        ie.readFields(dis);
+        writer.add(ie.getKey(), ie.getNumEntries(), ie.getOffset(), ie.getCompressedSize(), ie.getRawSize());
+      }
+      
+      buffered = 0;
+      baos = new ByteArrayOutputStream(1 << 20);
+      buffer = new DataOutputStream(baos);
+      
+    }
+    
+    public void add(Key key, int data, long offset, long compressedSize, long rawSize) throws IOException {
+      if (buffer.size() > (10 * 1 << 20)) {
+        flush();
+      }
+      
+      new IndexEntry(key, data, offset, compressedSize, rawSize).write(buffer);
+      buffered++;
+    }
+    
+    public void addLast(Key key, int data, long offset, long compressedSize, long rawSize) throws IOException {
+      flush();
+      writer.addLast(key, data, offset, compressedSize, rawSize);
+    }
+    
+    public void close(DataOutput out) throws IOException {
+      writer.close(out);
+    }
+  }
+
+  public static class Writer {
+    private int threshold;
+    
+    private ArrayList<IndexBlock> levels;
+    
+    private int totalAdded;
+    
+    private boolean addedLast = false;
+    
+    private BlockFileWriter blockFileWriter;
+    
+    Writer(BlockFileWriter blockFileWriter, int maxBlockSize) {
+      this.blockFileWriter = blockFileWriter;
+      this.threshold = maxBlockSize;
+      levels = new ArrayList<IndexBlock>();
+    }
+    
+    private void add(int level, Key key, int data, long offset, long compressedSize, long rawSize) throws IOException {
+      if (level == levels.size()) {
+        levels.add(new IndexBlock(level, 0));
+      }
+      
+      IndexBlock iblock = levels.get(level);
+      
+      iblock.add(key, data, offset, compressedSize, rawSize);
+    }
+    
+    private void flush(int level, Key lastKey, boolean last) throws IOException {
+      
+      if (last && level == levels.size() - 1)
+        return;
+      
+      IndexBlock iblock = levels.get(level);
+      if ((iblock.getSize() > threshold && iblock.offsets.size() > 1) || last) {
+        ABlockWriter out = blockFileWriter.prepareDataBlock();
+        iblock.setHasNext(!last);
+        iblock.write(out);
+        out.close();
+        
+        add(level + 1, lastKey, 0, out.getStartPos(), out.getCompressedSize(), out.getRawSize());
+        flush(level + 1, lastKey, last);
+        
+        if (last)
+          levels.set(level, null);
+        else
+          levels.set(level, new IndexBlock(level, totalAdded));
+      }
+    }
+    
+    public void add(Key key, int data, long offset, long compressedSize, long rawSize) throws IOException {
+      totalAdded++;
+      add(0, key, data, offset, compressedSize, rawSize);
+      flush(0, key, false);
+    }
+    
+    public void addLast(Key key, int data, long offset, long compressedSize, long rawSize) throws IOException {
+      if (addedLast)
+        throw new IllegalStateException("already added last");
+      
+      totalAdded++;
+      add(0, key, data, offset, compressedSize, rawSize);
+      flush(0, key, true);
+      addedLast = true;
+      
+    }
+    
+    public void close(DataOutput out) throws IOException {
+      if (totalAdded > 0 && !addedLast)
+        throw new IllegalStateException("did not call addLast");
+      
+      out.writeInt(totalAdded);
+      // save root node
+      if (levels.size() > 0) {
+        levels.get(levels.size() - 1).write(out);
+      } else {
+        new IndexBlock(0, 0).write(out);
+      }
+      
+    }
+  }
+  
+  public static class Reader {
+    private IndexBlock rootBlock;
+    private BlockFileReader blockStore;
+    private int version;
+    private int size;
+    
+    public class Node {
+      
+      private Node parent;
+      private IndexBlock indexBlock;
+      private int currentPos;
+      
+      Node(Node parent, IndexBlock iBlock) {
+        this.parent = parent;
+        this.indexBlock = iBlock;
+      }
+      
+      Node(IndexBlock rootInfo) {
+        this.parent = null;
+        this.indexBlock = rootInfo;
+      }
+      
+      private Node lookup(Key key) throws IOException {
+        int pos = Collections.binarySearch(indexBlock.getKeyIndex(), key, new Comparator<Key>() {
+          @Override
+          public int compare(Key o1, Key o2) {
+            return o1.compareTo(o2);
+          }
+        });
+        
+        if (pos < 0)
+          pos = (pos * -1) - 1;
+        
+        if (pos == indexBlock.getIndex().size()) {
+          if (parent != null)
+            throw new IllegalStateException();
+          this.currentPos = pos;
+          return this;
+        }
+        
+        this.currentPos = pos;
+        
+        if (indexBlock.getLevel() == 0) {
+          return this;
+        }
+        
+        IndexEntry ie = indexBlock.getIndex().get(pos);
+        Node child = new Node(this, getIndexBlock(ie));
+        return child.lookup(key);
+      }
+      
+      private Node getLast() throws IOException {
+        currentPos = indexBlock.getIndex().size() - 1;
+        if (indexBlock.getLevel() == 0)
+          return this;
+        
+        IndexEntry ie = indexBlock.getIndex().get(currentPos);
+        Node child = new Node(this, getIndexBlock(ie));
+        return child.getLast();
+      }
+      
+      private Node getFirst() throws IOException {
+        currentPos = 0;
+        if (indexBlock.getLevel() == 0)
+          return this;
+        
+        IndexEntry ie = indexBlock.getIndex().get(currentPos);
+        Node child = new Node(this, getIndexBlock(ie));
+        return child.getFirst();
+      }
+      
+      private Node getPrevious() throws IOException {
+        if (currentPos == 0)
+          return parent.getPrevious();
+        
+        currentPos--;
+        
+        IndexEntry ie = indexBlock.getIndex().get(currentPos);
+        Node child = new Node(this, getIndexBlock(ie));
+        return child.getLast();
+        
+      }
+      
+      private Node getNext() throws IOException {
+        if (currentPos == indexBlock.getIndex().size() - 1)
+          return parent.getNext();
+        
+        currentPos++;
+        
+        IndexEntry ie = indexBlock.getIndex().get(currentPos);
+        Node child = new Node(this, getIndexBlock(ie));
+        return child.getFirst();
+        
+      }
+      
+      Node getNextNode() throws IOException {
+        return parent.getNext();
+      }
+      
+      Node getPreviousNode() throws IOException {
+        return parent.getPrevious();
+      }
+    }
+    
+    public class IndexIterator implements ListIterator<IndexEntry> {
+      
+      private Node node;
+      private ListIterator<IndexEntry> liter;
+      
+      private Node getPrevNode() {
+        try {
+          return node.getPreviousNode();
+        } catch (IOException e) {
+          throw new RuntimeException(e);
+        }
+      }
+      
+      private Node getNextNode() {
+        try {
+          return node.getNextNode();
+        } catch (IOException e) {
+          throw new RuntimeException(e);
+        }
+      }
+      
+      public IndexIterator() {
+        node = null;
+      }
+      
+      public IndexIterator(Node node) {
+        this.node = node;
+        liter = node.indexBlock.getIndex().listIterator(node.currentPos);
+      }
+      
+      @Override
+      public boolean hasNext() {
+        if (node == null)
+          return false;
+        
+        if (!liter.hasNext()) {
+          return node.indexBlock.hasNext();
+        } else {
+          return true;
+        }
+        
+      }
+      
+      public IndexEntry peekPrevious() {
+        IndexEntry ret = previous();
+        next();
+        return ret;
+      }
+      
+      public IndexEntry peek() {
+        IndexEntry ret = next();
+        previous();
+        return ret;
+      }
+      
+      @Override
+      public IndexEntry next() {
+        if (!liter.hasNext()) {
+          node = getNextNode();
+          liter = node.indexBlock.getIndex().listIterator();
+        }
+        
+        return liter.next();
+      }
+      
+      @Override
+      public boolean hasPrevious() {
+        if (node == null)
+          return false;
+        
+        if (!liter.hasPrevious()) {
+          return node.indexBlock.getOffset() > 0;
+        } else {
+          return true;
+        }
+      }
+      
+      @Override
+      public IndexEntry previous() {
+        if (!liter.hasPrevious()) {
+          node = getPrevNode();
+          liter = node.indexBlock.getIndex().listIterator(node.indexBlock.getIndex().size());
+        }
+        
+        return liter.previous();
+      }
+      
+      @Override
+      public int nextIndex() {
+        return node.indexBlock.getOffset() + liter.nextIndex();
+      }
+      
+      @Override
+      public int previousIndex() {
+        return node.indexBlock.getOffset() + liter.previousIndex();
+      }
+      
+      @Override
+      public void remove() {
+        throw new UnsupportedOperationException();
+      }
+      
+      @Override
+      public void set(IndexEntry e) {
+        throw new UnsupportedOperationException();
+        
+      }
+      
+      @Override
+      public void add(IndexEntry e) {
+        throw new UnsupportedOperationException();
+      }
+      
+    }
+    
+    public Reader(BlockFileReader blockStore, int version) {
+      this.version = version;
+      this.blockStore = blockStore;
+    }
+    
+    private IndexBlock getIndexBlock(IndexEntry ie) throws IOException {
+      IndexBlock iblock = new IndexBlock();
+      ABlockReader in = blockStore.getMetaBlock(ie.getOffset(), ie.getCompressedSize(), ie.getRawSize());
+      iblock.readFields(in, version);
+      in.close();
+      
+      return iblock;
+    }
+    
+    public IndexIterator lookup(Key key) throws IOException {
+      Node node = new Node(rootBlock);
+      return new IndexIterator(node.lookup(key));
+    }
+    
+    public void readFields(DataInput in) throws IOException {
+      
+      size = 0;
+      
+      if (version == RFile.RINDEX_VER_6) {
+        size = in.readInt();
+      }
+      
+      rootBlock = new IndexBlock();
+      rootBlock.readFields(in, version);
+      
+      if (version == RFile.RINDEX_VER_3 || version == RFile.RINDEX_VER_4) {
+        size = rootBlock.getIndex().size();
+      }
+    }
+    
+    public int size() {
+      return size;
+    }
+    
+    private void getIndexInfo(IndexBlock ib, Map<Integer,Long> sizesByLevel, Map<Integer,Long> countsByLevel) throws IOException {
+      Long size = sizesByLevel.get(ib.getLevel());
+      if (size == null)
+        size = 0l;
+      
+      Long count = countsByLevel.get(ib.getLevel());
+      if (count == null)
+        count = 0l;
+      
+      size += ib.index.sizeInBytes();
+      count++;
+      
+      sizesByLevel.put(ib.getLevel(), size);
+      countsByLevel.put(ib.getLevel(), count);
+      
+      if (ib.getLevel() > 0) {
+        for (IndexEntry ie : ib.index) {
+          IndexBlock cib = getIndexBlock(ie);
+          getIndexInfo(cib, sizesByLevel, countsByLevel);
+        }
+      }
+    }
+    
+    public void getIndexInfo(Map<Integer,Long> sizes, Map<Integer,Long> counts) throws IOException {
+      getIndexInfo(rootBlock, sizes, counts);
+    }
+    
+    public Key getLastKey() {
+      return rootBlock.getIndex().get(rootBlock.getIndex().size() - 1).getKey();
+    }
+  }
+  
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/7bdbfccb/1.5/core/src/main/java/org/apache/accumulo/core/file/rfile/PrintInfo.java
----------------------------------------------------------------------
diff --git a/1.5/core/src/main/java/org/apache/accumulo/core/file/rfile/PrintInfo.java b/1.5/core/src/main/java/org/apache/accumulo/core/file/rfile/PrintInfo.java
new file mode 100644
index 0000000..2cc7fa7
--- /dev/null
+++ b/1.5/core/src/main/java/org/apache/accumulo/core/file/rfile/PrintInfo.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.core.file.rfile;
+
+import java.util.ArrayList;
+
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
+import org.apache.accumulo.core.data.ByteSequence;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.file.FileUtil;
+import org.apache.accumulo.core.file.blockfile.impl.CachableBlockFile;
+import org.apache.accumulo.core.file.rfile.RFile.Reader;
+import org.apache.commons.cli.BasicParser;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.Options;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+public class PrintInfo {
+  public static void main(String[] args) throws Exception {
+    Configuration conf = new Configuration();
+    @SuppressWarnings("deprecation")
+    //Not for client use
+    FileSystem fs = FileUtil.getFileSystem(conf, AccumuloConfiguration.getSiteConfiguration());
+    
+    Options opts = new Options();
+    Option dumpKeys = new Option("d", "dump", false, "dump the key/value pairs");
+    opts.addOption(dumpKeys);
+    Option histogramOption = new Option("h", "histogram", false, "print a histogram of the key-value sizes");
+    opts.addOption(histogramOption);
+    
+    CommandLine commandLine = new BasicParser().parse(opts, args);
+    
+    boolean dump = commandLine.hasOption(dumpKeys.getOpt());
+    boolean doHistogram = commandLine.hasOption(histogramOption.getOpt());
+    long countBuckets[] = new long[11];
+    long sizeBuckets[] = new long[countBuckets.length];
+    long totalSize = 0;
+
+    for (String arg : commandLine.getArgs()) {
+      
+      Path path = new Path(arg);
+      CachableBlockFile.Reader _rdr = new CachableBlockFile.Reader(fs, path, conf, null, null);
+      Reader iter = new RFile.Reader(_rdr);
+      
+      iter.printInfo();
+      System.out.println();
+      org.apache.accumulo.core.file.rfile.bcfile.PrintInfo.main(new String[] {arg});
+      
+      if (doHistogram || dump) {
+        iter.seek(new Range((Key) null, (Key) null), new ArrayList<ByteSequence>(), false);
+        while (iter.hasTop()) {
+          Key key = iter.getTopKey();
+          Value value = iter.getTopValue();
+          if (dump)
+            System.out.println(key + " -> " + value);
+          if (doHistogram) {
+            long size = key.getSize() + value.getSize();
+            int bucket = (int) Math.log10(size);
+            countBuckets[bucket]++;
+            sizeBuckets[bucket] += size;
+            totalSize += size;
+          }
+          iter.next();
+        }
+      }
+      iter.close();
+      if (doHistogram) {
+        System.out.println("Up to size      count      %-age");
+        for (int i = 1; i < countBuckets.length; i++) {
+          System.out.println(String.format("%11.0f : %10d %6.2f%%", Math.pow(10, i), countBuckets[i], sizeBuckets[i] * 100. / totalSize));
+        }
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/7bdbfccb/1.5/core/src/main/java/org/apache/accumulo/core/file/rfile/RFile.java
----------------------------------------------------------------------
diff --git a/1.5/core/src/main/java/org/apache/accumulo/core/file/rfile/RFile.java b/1.5/core/src/main/java/org/apache/accumulo/core/file/rfile/RFile.java
new file mode 100644
index 0000000..2362d56
--- /dev/null
+++ b/1.5/core/src/main/java/org/apache/accumulo/core/file/rfile/RFile.java
@@ -0,0 +1,1181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.core.file.rfile;
+
+import java.io.DataInput;
+import java.io.DataInputStream;
+import java.io.DataOutput;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.PrintStream;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Random;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.data.ArrayByteSequence;
+import org.apache.accumulo.core.data.ByteSequence;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.file.FileSKVIterator;
+import org.apache.accumulo.core.file.FileSKVWriter;
+import org.apache.accumulo.core.file.NoSuchMetaStoreException;
+import org.apache.accumulo.core.file.blockfile.ABlockReader;
+import org.apache.accumulo.core.file.blockfile.ABlockWriter;
+import org.apache.accumulo.core.file.blockfile.BlockFileReader;
+import org.apache.accumulo.core.file.blockfile.BlockFileWriter;
+import org.apache.accumulo.core.file.blockfile.impl.CachableBlockFile;
+import org.apache.accumulo.core.file.rfile.BlockIndex.BlockIndexEntry;
+import org.apache.accumulo.core.file.rfile.MultiLevelIndex.IndexEntry;
+import org.apache.accumulo.core.file.rfile.MultiLevelIndex.Reader.IndexIterator;
+import org.apache.accumulo.core.file.rfile.RelativeKey.MByteSequence;
+import org.apache.accumulo.core.file.rfile.bcfile.MetaBlockDoesNotExist;
+import org.apache.accumulo.core.iterators.IterationInterruptedException;
+import org.apache.accumulo.core.iterators.IteratorEnvironment;
+import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
+import org.apache.accumulo.core.iterators.system.HeapIterator;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.log4j.Logger;
+
+public class RFile {
+  
+  public static final String EXTENSION = "rf";
+  
+  private static final Logger log = Logger.getLogger(RFile.class);
+  
+  private RFile() {}
+  
+  private static final int RINDEX_MAGIC = 0x20637474;
+  static final int RINDEX_VER_6 = 6;
+  static final int RINDEX_VER_4 = 4;
+  static final int RINDEX_VER_3 = 3;
+  
+  private static class Count {
+    public Count(int i) {
+      this.count = i;
+    }
+    
+    public Count(long count) {
+      this.count = count;
+    }
+    
+    long count;
+  }
+  
+  private static class LocalityGroupMetadata implements Writable {
+    
+    private int startBlock;
+    private Key firstKey;
+    private Map<ByteSequence,Count> columnFamilies;
+    
+    private boolean isDefaultLG = false;
+    private String name;
+    private Set<ByteSequence> previousColumnFamilies;
+    
+    private MultiLevelIndex.BufferedWriter indexWriter;
+    private MultiLevelIndex.Reader indexReader;
+    
+    public LocalityGroupMetadata(int version, BlockFileReader br) {
+      columnFamilies = new HashMap<ByteSequence,Count>();
+      indexReader = new MultiLevelIndex.Reader(br, version);
+    }
+    
+    public LocalityGroupMetadata(int nextBlock, Set<ByteSequence> pcf, int indexBlockSize, BlockFileWriter bfw) {
+      this.startBlock = nextBlock;
+      isDefaultLG = true;
+      columnFamilies = new HashMap<ByteSequence,Count>();
+      previousColumnFamilies = pcf;
+      
+      indexWriter = new MultiLevelIndex.BufferedWriter(new MultiLevelIndex.Writer(bfw, indexBlockSize));
+    }
+    
+    public LocalityGroupMetadata(String name, Set<ByteSequence> cfset, int nextBlock, int indexBlockSize, BlockFileWriter bfw) {
+      this.startBlock = nextBlock;
+      this.name = name;
+      isDefaultLG = false;
+      columnFamilies = new HashMap<ByteSequence,Count>();
+      for (ByteSequence cf : cfset) {
+        columnFamilies.put(cf, new Count(0));
+      }
+      
+      indexWriter = new MultiLevelIndex.BufferedWriter(new MultiLevelIndex.Writer(bfw, indexBlockSize));
+    }
+    
+    private Key getFirstKey() {
+      return firstKey;
+    }
+    
+    private void setFirstKey(Key key) {
+      if (firstKey != null)
+        throw new IllegalStateException();
+      this.firstKey = new Key(key);
+    }
+    
+    public void updateColumnCount(Key key) {
+      
+      if (isDefaultLG && columnFamilies == null) {
+        if (previousColumnFamilies.size() > 0) {
+          // only do this check when there are previous column families
+          ByteSequence cf = key.getColumnFamilyData();
+          if (previousColumnFamilies.contains(cf)) {
+            throw new IllegalArgumentException("Added column family \"" + cf + "\" to default locality group that was in previous locality group");
+          }
+        }
+        
+        // no longer keeping track of column families, so return
+        return;
+      }
+      
+      ByteSequence cf = key.getColumnFamilyData();
+      Count count = columnFamilies.get(cf);
+      
+      if (count == null) {
+        if (!isDefaultLG) {
+          throw new IllegalArgumentException("invalid column family : " + cf);
+        }
+        
+        if (previousColumnFamilies.contains(cf)) {
+          throw new IllegalArgumentException("Added column family \"" + cf + "\" to default locality group that was in previous locality group");
+        }
+        
+        if (columnFamilies.size() > Writer.MAX_CF_IN_DLG) {
+          // stop keeping track, there are too many
+          columnFamilies = null;
+          return;
+        }
+        count = new Count(0);
+        columnFamilies.put(new ArrayByteSequence(cf.getBackingArray(), cf.offset(), cf.length()), count);
+        
+      }
+      
+      count.count++;
+      
+    }
+    
+    @Override
+    public void readFields(DataInput in) throws IOException {
+      
+      isDefaultLG = in.readBoolean();
+      if (!isDefaultLG) {
+        name = in.readUTF();
+      }
+      
+      startBlock = in.readInt();
+      
+      int size = in.readInt();
+      
+      if (size == -1) {
+        if (!isDefaultLG)
+          throw new IllegalStateException("Non default LG " + name + " does not have column families");
+        
+        columnFamilies = null;
+      } else {
+        if (columnFamilies == null)
+          columnFamilies = new HashMap<ByteSequence,Count>();
+        else
+          columnFamilies.clear();
+        
+        for (int i = 0; i < size; i++) {
+          int len = in.readInt();
+          byte cf[] = new byte[len];
+          in.readFully(cf);
+          long count = in.readLong();
+          
+          columnFamilies.put(new ArrayByteSequence(cf), new Count(count));
+        }
+      }
+      
+      if (in.readBoolean()) {
+        firstKey = new Key();
+        firstKey.readFields(in);
+      } else {
+        firstKey = null;
+      }
+      
+      indexReader.readFields(in);
+    }
+    
+    @Override
+    public void write(DataOutput out) throws IOException {
+      
+      out.writeBoolean(isDefaultLG);
+      if (!isDefaultLG) {
+        out.writeUTF(name);
+      }
+      
+      out.writeInt(startBlock);
+      
+      if (isDefaultLG && columnFamilies == null) {
+        // only expect null when default LG, otherwise let a NPE occur
+        out.writeInt(-1);
+      } else {
+        out.writeInt(columnFamilies.size());
+        
+        for (Entry<ByteSequence,Count> entry : columnFamilies.entrySet()) {
+          out.writeInt(entry.getKey().length());
+          out.write(entry.getKey().getBackingArray(), entry.getKey().offset(), entry.getKey().length());
+          out.writeLong(entry.getValue().count);
+        }
+      }
+      
+      out.writeBoolean(firstKey != null);
+      if (firstKey != null)
+        firstKey.write(out);
+      
+      indexWriter.close(out);
+    }
+    
+    public void printInfo() throws IOException {
+      PrintStream out = System.out;
+      out.println("Locality group         : " + (isDefaultLG ? "<DEFAULT>" : name));
+      out.println("\tStart block          : " + startBlock);
+      out.println("\tNum   blocks         : " + String.format("%,d", indexReader.size()));
+      TreeMap<Integer,Long> sizesByLevel = new TreeMap<Integer,Long>();
+      TreeMap<Integer,Long> countsByLevel = new TreeMap<Integer,Long>();
+      indexReader.getIndexInfo(sizesByLevel, countsByLevel);
+      for (Entry<Integer,Long> entry : sizesByLevel.descendingMap().entrySet()) {
+        out.println("\tIndex level " + entry.getKey() + "        : "
+            + String.format("%,d bytes  %,d blocks", entry.getValue(), countsByLevel.get(entry.getKey())));
+      }
+      out.println("\tFirst key            : " + firstKey);
+      
+      Key lastKey = null;
+      if (indexReader != null && indexReader.size() > 0) {
+        lastKey = indexReader.getLastKey();
+      }
+      
+      out.println("\tLast key             : " + lastKey);
+      
+      long numKeys = 0;
+      IndexIterator countIter = indexReader.lookup(new Key());
+      while (countIter.hasNext()) {
+        numKeys += countIter.next().getNumEntries();
+      }
+      
+      out.println("\tNum entries          : " + String.format("%,d", numKeys));
+      out.println("\tColumn families      : " + (isDefaultLG && columnFamilies == null ? "<UNKNOWN>" : columnFamilies.keySet()));
+    }
+    
+  }
+  
+  public static class Writer implements FileSKVWriter {
+    
+    public static final int MAX_CF_IN_DLG = 1000;
+    
+    private BlockFileWriter fileWriter;
+    private ABlockWriter blockWriter;
+    
+    // private BlockAppender blockAppender;
+    private long blockSize = 100000;
+    private int indexBlockSize;
+    private int entries = 0;
+    
+    private ArrayList<LocalityGroupMetadata> localityGroups = new ArrayList<LocalityGroupMetadata>();
+    private LocalityGroupMetadata currentLocalityGroup = null;
+    private int nextBlock = 0;
+    
+    private Key lastKeyInBlock = null;
+    
+    private boolean dataClosed = false;
+    private boolean closed = false;
+    private Key prevKey = new Key();
+    private boolean startedDefaultLocalityGroup = false;
+    
+    private HashSet<ByteSequence> previousColumnFamilies;
+    
+    public Writer(BlockFileWriter bfw, int blockSize) throws IOException {
+      this(bfw, blockSize, (int) AccumuloConfiguration.getDefaultConfiguration().getMemoryInBytes(Property.TABLE_FILE_COMPRESSED_BLOCK_SIZE_INDEX));
+    }
+    
+    public Writer(BlockFileWriter bfw, int blockSize, int indexBlockSize) throws IOException {
+      this.blockSize = blockSize;
+      this.indexBlockSize = indexBlockSize;
+      this.fileWriter = bfw;
+      this.blockWriter = null;
+      previousColumnFamilies = new HashSet<ByteSequence>();
+    }
+    
+    public synchronized void close() throws IOException {
+      
+      if (closed) {
+        return;
+      }
+      
+      closeData();
+      
+      ABlockWriter mba = fileWriter.prepareMetaBlock("RFile.index");
+      
+      mba.writeInt(RINDEX_MAGIC);
+      mba.writeInt(RINDEX_VER_6);
+      
+      if (currentLocalityGroup != null)
+        localityGroups.add(currentLocalityGroup);
+      
+      mba.writeInt(localityGroups.size());
+      
+      for (LocalityGroupMetadata lc : localityGroups) {
+        lc.write(mba);
+      }
+      
+      mba.close();
+      
+      fileWriter.close();
+      
+      closed = true;
+    }
+    
+    private void closeData() throws IOException {
+      
+      if (dataClosed) {
+        return;
+      }
+      
+      dataClosed = true;
+      
+      if (blockWriter != null) {
+        closeBlock(lastKeyInBlock, true);
+      }
+    }
+    
+    public void append(Key key, Value value) throws IOException {
+      
+      if (dataClosed) {
+        throw new IllegalStateException("Cannont append, data closed");
+      }
+      
+      if (key.compareTo(prevKey) < 0) {
+        throw new IllegalStateException("Keys appended out-of-order.  New key " + key + ", previous key " + prevKey);
+      }
+      
+      currentLocalityGroup.updateColumnCount(key);
+      
+      if (currentLocalityGroup.getFirstKey() == null) {
+        currentLocalityGroup.setFirstKey(key);
+      }
+      
+      if (blockWriter == null) {
+        blockWriter = fileWriter.prepareDataBlock();
+      } else if (blockWriter.getRawSize() > blockSize) {
+        closeBlock(prevKey, false);
+        blockWriter = fileWriter.prepareDataBlock();
+      }
+      
+      RelativeKey rk = new RelativeKey(lastKeyInBlock, key);
+      
+      rk.write(blockWriter);
+      value.write(blockWriter);
+      entries++;
+      
+      prevKey = new Key(key);
+      lastKeyInBlock = prevKey;
+      
+    }
+    
+    private void closeBlock(Key key, boolean lastBlock) throws IOException {
+      blockWriter.close();
+      
+      if (lastBlock)
+        currentLocalityGroup.indexWriter.addLast(key, entries, blockWriter.getStartPos(), blockWriter.getCompressedSize(), blockWriter.getRawSize());
+      else
+        currentLocalityGroup.indexWriter.add(key, entries, blockWriter.getStartPos(), blockWriter.getCompressedSize(), blockWriter.getRawSize());
+      
+      blockWriter = null;
+      lastKeyInBlock = null;
+      entries = 0;
+      nextBlock++;
+    }
+    
+    @Override
+    public DataOutputStream createMetaStore(String name) throws IOException {
+      closeData();
+      
+      return (DataOutputStream) fileWriter.prepareMetaBlock(name);
+    }
+    
+    private void _startNewLocalityGroup(String name, Set<ByteSequence> columnFamilies) throws IOException {
+      if (dataClosed) {
+        throw new IllegalStateException("data closed");
+      }
+      
+      if (startedDefaultLocalityGroup) {
+        throw new IllegalStateException("Can not start anymore new locality groups after default locality group started");
+      }
+      
+      if (blockWriter != null) {
+        closeBlock(lastKeyInBlock, true);
+      }
+      
+      if (currentLocalityGroup != null) {
+        localityGroups.add(currentLocalityGroup);
+      }
+      
+      if (columnFamilies == null) {
+        startedDefaultLocalityGroup = true;
+        currentLocalityGroup = new LocalityGroupMetadata(nextBlock, previousColumnFamilies, indexBlockSize, fileWriter);
+      } else {
+        if (!Collections.disjoint(columnFamilies, previousColumnFamilies)) {
+          HashSet<ByteSequence> overlap = new HashSet<ByteSequence>(columnFamilies);
+          overlap.retainAll(previousColumnFamilies);
+          throw new IllegalArgumentException("Column families over lap with previous locality group : " + overlap);
+        }
+        currentLocalityGroup = new LocalityGroupMetadata(name, columnFamilies, nextBlock, indexBlockSize, fileWriter);
+        previousColumnFamilies.addAll(columnFamilies);
+      }
+      
+      prevKey = new Key();
+    }
+    
+    @Override
+    public void startNewLocalityGroup(String name, Set<ByteSequence> columnFamilies) throws IOException {
+      if (columnFamilies == null)
+        throw new NullPointerException();
+      
+      _startNewLocalityGroup(name, columnFamilies);
+    }
+    
+    @Override
+    public void startDefaultLocalityGroup() throws IOException {
+      _startNewLocalityGroup(null, null);
+    }
+    
+    @Override
+    public boolean supportsLocalityGroups() {
+      return true;
+    }
+  }
+  
+  private static class LocalityGroupReader implements FileSKVIterator {
+    
+    private BlockFileReader reader;
+    private MultiLevelIndex.Reader index;
+    private int blockCount;
+    private Key firstKey;
+    private int startBlock;
+    private Map<ByteSequence,Count> columnFamilies;
+    private boolean isDefaultLocalityGroup;
+    private boolean closed = false;
+    private int version;
+    
+    private LocalityGroupReader(BlockFileReader reader, LocalityGroupMetadata lgm, int version) throws IOException {
+      this.firstKey = lgm.firstKey;
+      this.index = lgm.indexReader;
+      this.startBlock = lgm.startBlock;
+      blockCount = index.size();
+      this.columnFamilies = lgm.columnFamilies;
+      this.isDefaultLocalityGroup = lgm.isDefaultLG;
+      this.version = version;
+      
+      this.reader = reader;
+      
+    }
+    
+    public LocalityGroupReader(LocalityGroupReader lgr) {
+      this.firstKey = lgr.firstKey;
+      this.index = lgr.index;
+      this.startBlock = lgr.startBlock;
+      this.blockCount = lgr.blockCount;
+      this.columnFamilies = lgr.columnFamilies;
+      this.isDefaultLocalityGroup = lgr.isDefaultLocalityGroup;
+      this.reader = lgr.reader;
+      this.version = lgr.version;
+    }
+    
+    Iterator<IndexEntry> getIndex() throws IOException {
+      return index.lookup(new Key());
+    }
+    
+    @Override
+    public void close() throws IOException {
+      closed = true;
+      hasTop = false;
+      if (currBlock != null)
+        currBlock.close();
+      
+    }
+    
+    private IndexIterator iiter;
+    private int entriesLeft;
+    private ABlockReader currBlock;
+    private RelativeKey rk;
+    private Value val;
+    private Key prevKey = null;
+    private Range range = null;
+    private boolean hasTop = false;
+    private AtomicBoolean interruptFlag;
+    
+    @Override
+    public Key getTopKey() {
+      return rk.getKey();
+    }
+    
+    @Override
+    public Value getTopValue() {
+      return val;
+    }
+    
+    @Override
+    public boolean hasTop() {
+      return hasTop;
+    }
+    
+    @Override
+    public void next() throws IOException {
+      try {
+        _next();
+      } catch (IOException ioe) {
+        reset();
+        throw ioe;
+      }
+    }
+    
+    private void _next() throws IOException {
+      
+      if (!hasTop)
+        throw new IllegalStateException();
+      
+      if (entriesLeft == 0) {
+        currBlock.close();
+        
+        if (iiter.hasNext()) {
+          IndexEntry indexEntry = iiter.next();
+          entriesLeft = indexEntry.getNumEntries();
+          currBlock = getDataBlock(indexEntry);
+        } else {
+          rk = null;
+          val = null;
+          hasTop = false;
+          return;
+        }
+      }
+      
+      prevKey = rk.getKey();
+      rk.readFields(currBlock);
+      val.readFields(currBlock);
+      entriesLeft--;
+      hasTop = !range.afterEndKey(rk.getKey());
+    }
+    
+    private ABlockReader getDataBlock(IndexEntry indexEntry) throws IOException {
+      if (interruptFlag != null && interruptFlag.get())
+        throw new IterationInterruptedException();
+      
+      if (version == RINDEX_VER_3 || version == RINDEX_VER_4)
+        return reader.getDataBlock(startBlock + iiter.previousIndex());
+      else
+        return reader.getDataBlock(indexEntry.getOffset(), indexEntry.getCompressedSize(), indexEntry.getRawSize());
+      
+    }
+    
+    @Override
+    public void seek(Range range, Collection<ByteSequence> columnFamilies, boolean inclusive) throws IOException {
+      
+      if (closed)
+        throw new IllegalStateException("Locality group reader closed");
+      
+      if (columnFamilies.size() != 0 || inclusive)
+        throw new IllegalArgumentException("I do not know how to filter column families");
+      
+      if (interruptFlag != null && interruptFlag.get())
+        throw new IterationInterruptedException();
+      
+      try {
+        _seek(range);
+      } catch (IOException ioe) {
+        reset();
+        throw ioe;
+      }
+    }
+    
+    private void reset() {
+      rk = null;
+      hasTop = false;
+      if (currBlock != null) {
+        try {
+          try {
+            currBlock.close();
+          } catch (IOException e) {
+            log.warn("Failed to close block reader", e);
+          }
+        } finally {
+          currBlock = null;
+        }
+      }
+    }
+    
+    private void _seek(Range range) throws IOException {
+      
+      this.range = range;
+      
+      if (blockCount == 0) {
+        // its an empty file
+        rk = null;
+        return;
+      }
+      
+      Key startKey = range.getStartKey();
+      if (startKey == null)
+        startKey = new Key();
+      
+      boolean reseek = true;
+      
+      if (range.afterEndKey(firstKey)) {
+        // range is before first key in rfile, so there is nothing to do
+        reset();
+        reseek = false;
+      }
+      
+      if (rk != null) {
+        if (range.beforeStartKey(prevKey) && range.afterEndKey(getTopKey())) {
+          // range is between the two keys in the file where the last range seeked to stopped, so there is
+          // nothing to do
+          reseek = false;
+        }
+        
+        if (startKey.compareTo(getTopKey()) <= 0 && startKey.compareTo(prevKey) > 0) {
+          // current location in file can satisfy this request, no need to seek
+          reseek = false;
+        }
+        
+        if (startKey.compareTo(getTopKey()) >= 0 && startKey.compareTo(iiter.peekPrevious().getKey()) <= 0) {
+          // start key is within the unconsumed portion of the current block
+          
+          // this code intentionally does not use the index associated with a cached block
+          // because if only forward seeks are being done, then there is no benefit to building
+          // and index for the block... could consider using the index if it exist but not
+          // causing the build of an index... doing this could slow down some use cases and
+          // and speed up others.
+
+          MByteSequence valbs = new MByteSequence(new byte[64], 0, 0);
+          RelativeKey tmpRk = new RelativeKey();
+          Key pKey = new Key(getTopKey());
+          int fastSkipped = tmpRk.fastSkip(currBlock, startKey, valbs, pKey, getTopKey());
+          if (fastSkipped > 0) {
+            entriesLeft -= fastSkipped;
+            val = new Value(valbs.toArray());
+            prevKey = pKey;
+            rk = tmpRk;
+          }
+          
+          reseek = false;
+        }
+        
+        if (iiter.previousIndex() == 0 && getTopKey().equals(firstKey) && startKey.compareTo(firstKey) <= 0) {
+          // seeking before the beginning of the file, and already positioned at the first key in the file
+          // so there is nothing to do
+          reseek = false;
+        }
+      }
+      
+      int fastSkipped = -1;
+      
+      if (reseek) {
+        iiter = index.lookup(startKey);
+        
+        reset();
+        
+        if (!iiter.hasNext()) {
+          // past the last key
+        } else {
+          
+          // if the index contains the same key multiple times, then go to the
+          // earliest index entry containing the key
+          while (iiter.hasPrevious() && iiter.peekPrevious().getKey().equals(iiter.peek().getKey())) {
+            iiter.previous();
+          }
+          
+          if (iiter.hasPrevious())
+            prevKey = new Key(iiter.peekPrevious().getKey()); // initially prevKey is the last key of the prev block
+          else
+            prevKey = new Key(); // first block in the file, so set prev key to minimal key
+            
+          IndexEntry indexEntry = iiter.next();
+          entriesLeft = indexEntry.getNumEntries();
+          currBlock = getDataBlock(indexEntry);
+
+          RelativeKey tmpRk = new RelativeKey();
+          MByteSequence valbs = new MByteSequence(new byte[64], 0, 0);
+
+          Key currKey = null;
+
+          if (currBlock.isIndexable()) {
+            BlockIndex blockIndex = BlockIndex.getIndex(currBlock, indexEntry);
+            if (blockIndex != null) {
+              BlockIndexEntry bie = blockIndex.seekBlock(startKey, currBlock);
+              if (bie != null) {
+                // we are seeked to the current position of the key in the index
+                // need to prime the read process and read this key from the block
+                tmpRk.setPrevKey(bie.getKey());
+                tmpRk.readFields(currBlock);
+                val = new Value();
+
+                val.readFields(currBlock);
+                valbs = new MByteSequence(val.get(), 0, val.getSize());
+                
+                // just consumed one key from the input stream, so subtract one from entries left
+                entriesLeft = bie.getEntriesLeft() - 1;
+                prevKey = new Key(bie.getKey());
+                currKey = bie.getKey();
+              }
+            }
+            
+          }
+
+          fastSkipped = tmpRk.fastSkip(currBlock, startKey, valbs, prevKey, currKey);
+          entriesLeft -= fastSkipped;
+          val = new Value(valbs.toArray());
+          // set rk when everything above is successful, if exception
+          // occurs rk will not be set
+          rk = tmpRk;
+        }
+      }
+      
+      hasTop = rk != null && !range.afterEndKey(rk.getKey());
+      
+      while (hasTop() && range.beforeStartKey(getTopKey())) {
+        next();
+      }
+    }
+    
+    @Override
+    public Key getFirstKey() throws IOException {
+      return firstKey;
+    }
+    
+    @Override
+    public Key getLastKey() throws IOException {
+      if (index.size() == 0)
+        return null;
+      return index.getLastKey();
+    }
+    
+    @Override
+    public SortedKeyValueIterator<Key,Value> deepCopy(IteratorEnvironment env) {
+      throw new UnsupportedOperationException();
+    }
+    
+    @Override
+    public void closeDeepCopies() throws IOException {
+      throw new UnsupportedOperationException();
+    }
+    
+    @Override
+    public void init(SortedKeyValueIterator<Key,Value> source, Map<String,String> options, IteratorEnvironment env) throws IOException {
+      throw new UnsupportedOperationException();
+    }
+    
+    @Override
+    public DataInputStream getMetaStore(String name) throws IOException {
+      throw new UnsupportedOperationException();
+    }
+    
+    @Override
+    public void setInterruptFlag(AtomicBoolean flag) {
+      this.interruptFlag = flag;
+    }
+  }
+  
+  public static class Reader extends HeapIterator implements FileSKVIterator {
+    
+    private static final Collection<ByteSequence> EMPTY_CF_SET = Collections.emptySet();
+    
+    private BlockFileReader reader;
+    
+    private ArrayList<LocalityGroupMetadata> localityGroups = new ArrayList<LocalityGroupMetadata>();
+    
+    private LocalityGroupReader lgReaders[];
+    private HashSet<ByteSequence> nonDefaultColumnFamilies;
+    
+    private List<Reader> deepCopies;
+    private boolean deepCopy = false;
+    
+    private AtomicBoolean interruptFlag;
+    
+    public Reader(BlockFileReader rdr) throws IOException {
+      this.reader = rdr;
+      
+      ABlockReader mb = reader.getMetaBlock("RFile.index");
+      
+      int magic = mb.readInt();
+      int ver = mb.readInt();
+      
+      if (magic != RINDEX_MAGIC)
+        throw new IOException("Did not see expected magic number, saw " + magic);
+      if (ver != RINDEX_VER_6 && ver != RINDEX_VER_4 && ver != RINDEX_VER_3)
+        throw new IOException("Did not see expected version, saw " + ver);
+      
+      int size = mb.readInt();
+      lgReaders = new LocalityGroupReader[size];
+      
+      deepCopies = new LinkedList<Reader>();
+      
+      for (int i = 0; i < size; i++) {
+        LocalityGroupMetadata lgm = new LocalityGroupMetadata(ver, rdr);
+        lgm.readFields(mb);
+        localityGroups.add(lgm);
+        
+        lgReaders[i] = new LocalityGroupReader(reader, lgm, ver);
+      }
+      
+      mb.close();
+      
+      nonDefaultColumnFamilies = new HashSet<ByteSequence>();
+      for (LocalityGroupMetadata lgm : localityGroups) {
+        if (!lgm.isDefaultLG)
+          nonDefaultColumnFamilies.addAll(lgm.columnFamilies.keySet());
+      }
+      
+      createHeap(lgReaders.length);
+    }
+    
+    private Reader(Reader r) {
+      super(r.lgReaders.length);
+      this.reader = r.reader;
+      this.nonDefaultColumnFamilies = r.nonDefaultColumnFamilies;
+      this.lgReaders = new LocalityGroupReader[r.lgReaders.length];
+      this.deepCopies = r.deepCopies;
+      this.deepCopy = true;
+      for (int i = 0; i < lgReaders.length; i++) {
+        this.lgReaders[i] = new LocalityGroupReader(r.lgReaders[i]);
+        this.lgReaders[i].setInterruptFlag(r.interruptFlag);
+      }
+    }
+    
+    private void closeLocalityGroupReaders() {
+      for (LocalityGroupReader lgr : lgReaders) {
+        try {
+          lgr.close();
+        } catch (IOException e) {
+          e.printStackTrace();
+        }
+      }
+    }
+    
+    @Override
+    public void closeDeepCopies() {
+      if (deepCopy)
+        throw new RuntimeException("Calling closeDeepCopies on a deep copy is not supported");
+      
+      for (Reader deepCopy : deepCopies)
+        deepCopy.closeLocalityGroupReaders();
+      
+      deepCopies.clear();
+    }
+    
+    @Override
+    public void close() throws IOException {
+      if (deepCopy)
+        throw new RuntimeException("Calling close on a deep copy is not supported");
+      
+      closeDeepCopies();
+      closeLocalityGroupReaders();
+      
+      try {
+        reader.close();
+      } finally {
+        /**
+         * input Stream is passed to CachableBlockFile and closed there
+         */
+      }
+    }
+    
+    @Override
+    public Key getFirstKey() throws IOException {
+      if (lgReaders.length == 0) {
+        return null;
+      }
+      
+      Key minKey = null;
+      
+      for (int i = 0; i < lgReaders.length; i++) {
+        if (minKey == null) {
+          minKey = lgReaders[i].getFirstKey();
+        } else {
+          Key firstKey = lgReaders[i].getFirstKey();
+          if (firstKey != null && firstKey.compareTo(minKey) < 0)
+            minKey = firstKey;
+        }
+      }
+      
+      return minKey;
+    }
+    
+    @Override
+    public Key getLastKey() throws IOException {
+      if (lgReaders.length == 0) {
+        return null;
+      }
+      
+      Key maxKey = null;
+      
+      for (int i = 0; i < lgReaders.length; i++) {
+        if (maxKey == null) {
+          maxKey = lgReaders[i].getLastKey();
+        } else {
+          Key lastKey = lgReaders[i].getLastKey();
+          if (lastKey != null && lastKey.compareTo(maxKey) > 0)
+            maxKey = lastKey;
+        }
+      }
+      
+      return maxKey;
+    }
+    
+    @Override
+    public DataInputStream getMetaStore(String name) throws IOException, NoSuchMetaStoreException {
+      try {
+        return this.reader.getMetaBlock(name).getStream();
+      } catch (MetaBlockDoesNotExist e) {
+        throw new NoSuchMetaStoreException("name = " + name, e);
+      }
+    }
+    
+    @Override
+    public SortedKeyValueIterator<Key,Value> deepCopy(IteratorEnvironment env) {
+      Reader copy = new Reader(this);
+      copy.setInterruptFlagInternal(interruptFlag);
+      deepCopies.add(copy);
+      return copy;
+    }
+    
+    @Override
+    public void init(SortedKeyValueIterator<Key,Value> source, Map<String,String> options, IteratorEnvironment env) throws IOException {
+      throw new UnsupportedOperationException();
+      
+    }
+    
+    private int numLGSeeked = 0;
+    
+    @Override
+    public void seek(Range range, Collection<ByteSequence> columnFamilies, boolean inclusive) throws IOException {
+      
+      clear();
+      
+      numLGSeeked = 0;
+      
+      Set<ByteSequence> cfSet;
+      if (columnFamilies.size() > 0)
+        if (columnFamilies instanceof Set<?>) {
+          cfSet = (Set<ByteSequence>) columnFamilies;
+        } else {
+          cfSet = new HashSet<ByteSequence>();
+          cfSet.addAll(columnFamilies);
+        }
+      else
+        cfSet = Collections.emptySet();
+      
+      for (LocalityGroupReader lgr : lgReaders) {
+        
+        // when include is set to true it means this locality groups contains
+        // wanted column families
+        boolean include = false;
+        
+        if (cfSet.size() == 0) {
+          include = !inclusive;
+        } else if (lgr.isDefaultLocalityGroup && lgr.columnFamilies == null) {
+          // do not know what column families are in the default locality group,
+          // only know what column families are not in it
+          
+          if (inclusive) {
+            if (!nonDefaultColumnFamilies.containsAll(cfSet)) {
+              // default LG may contain wanted and unwanted column families
+              include = true;
+            }// else - everything wanted is in other locality groups, so nothing to do
+          } else {
+            // must include, if all excluded column families are in other locality groups
+            // then there are not unwanted column families in default LG
+            include = true;
+          }
+        } else {
+          /*
+           * Need to consider the following cases for inclusive and exclusive (lgcf:locality group column family set, cf:column family set) lgcf and cf are
+           * disjoint lgcf and cf are the same cf contains lgcf lgcf contains cf lgccf and cf intersect but neither is a subset of the other
+           */
+          
+          for (Entry<ByteSequence,Count> entry : lgr.columnFamilies.entrySet())
+            if (entry.getValue().count > 0)
+              if (cfSet.contains(entry.getKey())) {
+                if (inclusive)
+                  include = true;
+              } else if (!inclusive) {
+                include = true;
+              }
+        }
+        
+        if (include) {
+          lgr.seek(range, EMPTY_CF_SET, false);
+          addSource(lgr);
+          numLGSeeked++;
+        }// every column family is excluded, zero count, or not present
+      }
+    }
+    
+    int getNumLocalityGroupsSeeked() {
+      return numLGSeeked;
+    }
+    
+    public FileSKVIterator getIndex() throws IOException {
+      
+      ArrayList<Iterator<IndexEntry>> indexes = new ArrayList<Iterator<IndexEntry>>();
+      
+      for (LocalityGroupReader lgr : lgReaders) {
+        indexes.add(lgr.getIndex());
+      }
+      
+      return new MultiIndexIterator(this, indexes);
+    }
+    
+    public void printInfo() throws IOException {
+      for (LocalityGroupMetadata lgm : localityGroups) {
+        lgm.printInfo();
+      }
+      
+    }
+    
+    @Override
+    public void setInterruptFlag(AtomicBoolean flag) {
+      if (deepCopy)
+        throw new RuntimeException("Calling setInterruptFlag on a deep copy is not supported");
+      
+      if (deepCopies.size() != 0)
+        throw new RuntimeException("Setting interrupt flag after calling deep copy not supported");
+      
+      setInterruptFlagInternal(flag);
+    }
+    
+    private void setInterruptFlagInternal(AtomicBoolean flag) {
+      this.interruptFlag = flag;
+      for (LocalityGroupReader lgr : lgReaders) {
+        lgr.setInterruptFlag(interruptFlag);
+      }
+    }
+  }
+  
+  public static void main(String[] args) throws Exception {
+    Configuration conf = new Configuration();
+    FileSystem fs = FileSystem.get(conf);
+    
+    int max_row = 10000;
+    int max_cf = 10;
+    int max_cq = 10;
+    
+    // FSDataOutputStream fsout = fs.create(new Path("/tmp/test.rf"));
+    
+    // RFile.Writer w = new RFile.Writer(fsout, 1000, "gz", conf);
+    CachableBlockFile.Writer _cbw = new CachableBlockFile.Writer(fs, new Path("/tmp/test.rf"), "gz", conf);
+    RFile.Writer w = new RFile.Writer(_cbw, 100000);
+    
+    w.startDefaultLocalityGroup();
+    
+    int c = 0;
+    
+    for (int i = 0; i < max_row; i++) {
+      Text row = new Text(String.format("R%06d", i));
+      for (int j = 0; j < max_cf; j++) {
+        Text cf = new Text(String.format("CF%06d", j));
+        for (int k = 0; k < max_cq; k++) {
+          Text cq = new Text(String.format("CQ%06d", k));
+          w.append(new Key(row, cf, cq), new Value((c++ + "").getBytes()));
+        }
+      }
+    }
+    
+    w.close();
+    // fsout.close();
+    
+    // Logger.getLogger("accumulo.core.file.rfile").setLevel(Level.DEBUG);
+    long t1 = System.currentTimeMillis();
+    FSDataInputStream fsin = fs.open(new Path("/tmp/test.rf"));
+    long t2 = System.currentTimeMillis();
+    CachableBlockFile.Reader _cbr = new CachableBlockFile.Reader(fs, new Path("/tmp/test.rf"), conf, null, null);
+    RFile.Reader r = new RFile.Reader(_cbr);
+    long t3 = System.currentTimeMillis();
+    
+    System.out.println("Open time " + (t2 - t1) + " " + (t3 - t2));
+    
+    SortedKeyValueIterator<Key,Value> rd = r.deepCopy(null);
+    SortedKeyValueIterator<Key,Value> rd2 = r.deepCopy(null);
+    
+    Random rand = new Random(10);
+    
+    seekRandomly(100, max_row, max_cf, max_cq, r, rand);
+    
+    rand = new Random(10);
+    seekRandomly(100, max_row, max_cf, max_cq, rd, rand);
+    
+    rand = new Random(10);
+    seekRandomly(100, max_row, max_cf, max_cq, rd2, rand);
+    
+    r.closeDeepCopies();
+    
+    seekRandomly(100, max_row, max_cf, max_cq, r, rand);
+    
+    rd = r.deepCopy(null);
+    
+    seekRandomly(100, max_row, max_cf, max_cq, rd, rand);
+    
+    r.close();
+    fsin.close();
+    
+    seekRandomly(100, max_row, max_cf, max_cq, r, rand);
+  }
+  
+  private static void seekRandomly(int num, int max_row, int max_cf, int max_cq, SortedKeyValueIterator<Key,Value> rd, Random rand) throws Exception {
+    long t1 = System.currentTimeMillis();
+    
+    for (int i = 0; i < num; i++) {
+      Text row = new Text(String.format("R%06d", rand.nextInt(max_row)));
+      Text cf = new Text(String.format("CF%06d", rand.nextInt(max_cf)));
+      Text cq = new Text(String.format("CQ%06d", rand.nextInt(max_cq)));
+      
+      Key sk = new Key(row, cf, cq);
+      rd.seek(new Range(sk, null), new ArrayList<ByteSequence>(), false);
+      if (!rd.hasTop() || !rd.getTopKey().equals(sk)) {
+        System.err.println(sk + " != " + rd.getTopKey());
+      }
+      
+    }
+    
+    long t2 = System.currentTimeMillis();
+    
+    double delta = ((t2 - t1) / 1000.0);
+    System.out.println("" + delta + " " + num / delta);
+    
+    System.gc();
+    System.gc();
+    System.gc();
+    
+    Thread.sleep(3000);
+  }
+}


Mime
View raw message