accumulo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mmil...@apache.org
Subject [accumulo] branch master updated: Inline BlockFile interfaces. Fixes #480 (#484)
Date Tue, 15 May 2018 19:25:05 GMT
This is an automated email from the ASF dual-hosted git repository.

mmiller pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/accumulo.git


The following commit(s) were added to refs/heads/master by this push:
     new a8fbba0  Inline BlockFile interfaces. Fixes #480 (#484)
a8fbba0 is described below

commit a8fbba044a08f2ceff6f16d277fc3fd6484f76e9
Author: Mike Miller <mmiller@apache.org>
AuthorDate: Tue May 15 15:25:00 2018 -0400

    Inline BlockFile interfaces. Fixes #480 (#484)
    
    * Replace BlockRead with CachedBlockRead
    * Replace BlockFileReader with CachableBlockFile.Reader
    * Replace ABlockReader with CachableBlockFile.CachedBlockRead
    * Replace BlockWrite with BCFile.Writer.BlockAppender
    * Replace ABlockWriter with BCFile.Writer.BlockAppender
    * Replace BlockFileWriter with CachableBlockFile.Writer
---
 .../accumulo/core/file/blockfile/ABlockReader.java |  57 -------
 .../accumulo/core/file/blockfile/ABlockWriter.java |  35 -----
 .../core/file/blockfile/BlockFileReader.java       |  39 -----
 .../core/file/blockfile/BlockFileWriter.java       |  36 -----
 .../file/blockfile/impl/CachableBlockFile.java     | 165 +++++----------------
 .../accumulo/core/file/rfile/BlockIndex.java       |   9 +-
 .../accumulo/core/file/rfile/MultiLevelIndex.java  |  22 ++-
 .../org/apache/accumulo/core/file/rfile/RFile.java |  46 +++---
 .../accumulo/core/file/rfile/BlockIndexTest.java   |   5 +-
 .../core/file/rfile/MultiLevelIndexTest.java       |   7 +-
 10 files changed, 79 insertions(+), 342 deletions(-)

diff --git a/core/src/main/java/org/apache/accumulo/core/file/blockfile/ABlockReader.java
b/core/src/main/java/org/apache/accumulo/core/file/blockfile/ABlockReader.java
deleted file mode 100644
index dd256d8..0000000
--- a/core/src/main/java/org/apache/accumulo/core/file/blockfile/ABlockReader.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.accumulo.core.file.blockfile;
-
-import java.io.DataInput;
-import java.io.DataInputStream;
-import java.io.IOException;
-import java.util.function.Supplier;
-
-import org.apache.accumulo.core.file.blockfile.cache.CacheEntry.Weighbable;
-
-/**
- * Minimal interface to read a block from a block based file
- */
-public interface ABlockReader extends DataInput {
-
-  DataInputStream getStream() throws IOException;
-
-  void close() throws IOException;
-
-  /**
-   * An indexable block supports seeking, getting a position, and associating an arbitrary
index
-   * with the block
-   *
-   * @return true, if the block is indexable; otherwise false.
-   */
-  boolean isIndexable();
-
-  void seek(int position);
-
-  /**
-   * Get the file position.
-   *
-   * @return the file position.
-   */
-  int getPosition();
-
-  <T extends Weighbable> T getIndex(Supplier<T> supplier);
-
-  void indexWeightChanged();
-
-  byte[] getBuffer();
-}
diff --git a/core/src/main/java/org/apache/accumulo/core/file/blockfile/ABlockWriter.java
b/core/src/main/java/org/apache/accumulo/core/file/blockfile/ABlockWriter.java
deleted file mode 100644
index 7e3242a..0000000
--- a/core/src/main/java/org/apache/accumulo/core/file/blockfile/ABlockWriter.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.accumulo.core.file.blockfile;
-
-import java.io.DataOutput;
-import java.io.IOException;
-
-/**
- * Minimal interface to write a block to a block based file
- */
-public interface ABlockWriter extends DataOutput {
-
-  long getCompressedSize() throws IOException;
-
-  void close() throws IOException;
-
-  long getRawSize() throws IOException;
-
-  long getStartPos() throws IOException;
-
-}
diff --git a/core/src/main/java/org/apache/accumulo/core/file/blockfile/BlockFileReader.java
b/core/src/main/java/org/apache/accumulo/core/file/blockfile/BlockFileReader.java
deleted file mode 100644
index 26dc126..0000000
--- a/core/src/main/java/org/apache/accumulo/core/file/blockfile/BlockFileReader.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.accumulo.core.file.blockfile;
-
-import java.io.IOException;
-
-/**
- * Provides a generic interface for a Reader for a BlockBaseFile format. Supports the minimal
- * interface required.
- *
- * Read a metaBlock and a dataBlock
- */
-public interface BlockFileReader {
-
-  ABlockReader getMetaBlock(String name) throws IOException;
-
-  ABlockReader getDataBlock(int blockIndex) throws IOException;
-
-  void close() throws IOException;
-
-  ABlockReader getMetaBlock(long offset, long compressedSize, long rawSize) throws IOException;
-
-  ABlockReader getDataBlock(long offset, long compressedSize, long rawSize) throws IOException;
-
-}
diff --git a/core/src/main/java/org/apache/accumulo/core/file/blockfile/BlockFileWriter.java
b/core/src/main/java/org/apache/accumulo/core/file/blockfile/BlockFileWriter.java
deleted file mode 100644
index ab72c35..0000000
--- a/core/src/main/java/org/apache/accumulo/core/file/blockfile/BlockFileWriter.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.accumulo.core.file.blockfile;
-
-import java.io.IOException;
-
-/**
- * Provides a generic interface for a Writer for a BlockBaseFile format. Supports the minimal
- * interface required.
- *
- * Write a metaBlock and a dataBlock.
- */
-public interface BlockFileWriter {
-
-  ABlockWriter prepareMetaBlock(String name) throws IOException;
-
-  ABlockWriter prepareDataBlock() throws IOException;
-
-  void close() throws IOException;
-
-  long getLength() throws IOException;
-}
diff --git a/core/src/main/java/org/apache/accumulo/core/file/blockfile/impl/CachableBlockFile.java
b/core/src/main/java/org/apache/accumulo/core/file/blockfile/impl/CachableBlockFile.java
index f7451e3..43e9730 100644
--- a/core/src/main/java/org/apache/accumulo/core/file/blockfile/impl/CachableBlockFile.java
+++ b/core/src/main/java/org/apache/accumulo/core/file/blockfile/impl/CachableBlockFile.java
@@ -16,8 +16,8 @@
  */
 package org.apache.accumulo.core.file.blockfile.impl;
 
+import java.io.Closeable;
 import java.io.DataInputStream;
-import java.io.DataOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
@@ -28,17 +28,12 @@ import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Supplier;
 
 import org.apache.accumulo.core.conf.AccumuloConfiguration;
-import org.apache.accumulo.core.file.blockfile.ABlockReader;
-import org.apache.accumulo.core.file.blockfile.ABlockWriter;
-import org.apache.accumulo.core.file.blockfile.BlockFileReader;
-import org.apache.accumulo.core.file.blockfile.BlockFileWriter;
 import org.apache.accumulo.core.file.blockfile.cache.BlockCache;
 import org.apache.accumulo.core.file.blockfile.cache.BlockCache.Loader;
 import org.apache.accumulo.core.file.blockfile.cache.CacheEntry;
 import org.apache.accumulo.core.file.blockfile.cache.CacheEntry.Weighbable;
 import org.apache.accumulo.core.file.rfile.bcfile.BCFile;
 import org.apache.accumulo.core.file.rfile.bcfile.BCFile.Reader.BlockReader;
-import org.apache.accumulo.core.file.rfile.bcfile.BCFile.Writer.BlockAppender;
 import org.apache.accumulo.core.file.rfile.bcfile.MetaBlockDoesNotExist;
 import org.apache.accumulo.core.file.streams.PositionedOutput;
 import org.apache.accumulo.core.file.streams.RateLimitedInputStream;
@@ -63,9 +58,9 @@ public class CachableBlockFile {
 
   private static final Logger log = LoggerFactory.getLogger(CachableBlockFile.class);
 
-  public static class Writer implements BlockFileWriter {
+  public static class Writer implements Closeable {
     private BCFile.Writer _bc;
-    private BlockWrite _bw;
+    private BCFile.Writer.BlockAppender _bw;
     private final PositionedOutput fsout;
     private long length = 0;
 
@@ -88,15 +83,13 @@ public class CachableBlockFile {
       _bc = new BCFile.Writer(fsout, compressAlgor, conf, false, accumuloConfiguration);
     }
 
-    @Override
-    public ABlockWriter prepareMetaBlock(String name) throws IOException {
-      _bw = new BlockWrite(_bc.prepareMetaBlock(name));
+    public BCFile.Writer.BlockAppender prepareMetaBlock(String name) throws IOException {
+      _bw = _bc.prepareMetaBlock(name);
       return _bw;
     }
 
-    @Override
-    public ABlockWriter prepareDataBlock() throws IOException {
-      _bw = new BlockWrite(_bc.prepareDataBlock());
+    public BCFile.Writer.BlockAppender prepareDataBlock() throws IOException {
+      _bw = _bc.prepareDataBlock();
       return _bw;
     }
 
@@ -110,52 +103,20 @@ public class CachableBlockFile {
       ((OutputStream) this.fsout).close();
     }
 
-    @Override
     public long getLength() throws IOException {
       return length;
     }
 
   }
 
-  public static class BlockWrite extends DataOutputStream implements ABlockWriter {
-    BlockAppender _ba;
-
-    public BlockWrite(BlockAppender ba) {
-      super(ba);
-      this._ba = ba;
-    }
-
-    @Override
-    public long getCompressedSize() throws IOException {
-      return _ba.getCompressedSize();
-    }
-
-    @Override
-    public long getRawSize() throws IOException {
-      return _ba.getRawSize();
-    }
-
-    @Override
-    public void close() throws IOException {
-
-      _ba.close();
-    }
-
-    @Override
-    public long getStartPos() throws IOException {
-      return _ba.getStartPos();
-    }
-
-  }
-
-  private interface IoeSupplier<T> {
+  private static interface IoeSupplier<T> {
     T get() throws IOException;
   }
 
   /**
    * Class wraps the BCFile reader.
    */
-  public static class Reader implements BlockFileReader {
+  public static class Reader implements Closeable {
     private final RateLimiter readLimiter;
     // private BCFile.Reader _bc;
     private final String cacheId;
@@ -402,8 +363,7 @@ public class CachableBlockFile {
      * It is intended that once the BlockRead object is returned to the caller, that the
caller will
      * read the entire block and then call close on the BlockRead class.
      */
-    @Override
-    public BlockRead getMetaBlock(String blockName) throws IOException {
+    public CachedBlockRead getMetaBlock(String blockName) throws IOException {
       if (_iCache != null) {
         String _lookup = this.cacheId + "M" + blockName;
         try {
@@ -423,11 +383,10 @@ public class CachableBlockFile {
       }
 
       BlockReader _currBlock = getBCFile(null).getMetaBlock(blockName);
-      return new BlockRead(_currBlock);
+      return new CachedBlockRead(_currBlock);
     }
 
-    @Override
-    public ABlockReader getMetaBlock(long offset, long compressedSize, long rawSize)
+    public CachedBlockRead getMetaBlock(long offset, long compressedSize, long rawSize)
         throws IOException {
       if (_iCache != null) {
         String _lookup = this.cacheId + "R" + offset;
@@ -439,7 +398,7 @@ public class CachableBlockFile {
       }
 
       BlockReader _currBlock = getBCFile(null).getDataBlock(offset, compressedSize, rawSize);
-      return new BlockRead(_currBlock);
+      return new CachedBlockRead(_currBlock);
     }
 
     /**
@@ -450,8 +409,7 @@ public class CachableBlockFile {
      * read from disk and other threads check the cache before it has been inserted.
      */
 
-    @Override
-    public BlockRead getDataBlock(int blockIndex) throws IOException {
+    public CachedBlockRead getDataBlock(int blockIndex) throws IOException {
       if (_dCache != null) {
         String _lookup = this.cacheId + "O" + blockIndex;
         CacheEntry ce = _dCache.getBlock(_lookup, new OffsetBlockLoader(blockIndex, false));
@@ -461,11 +419,10 @@ public class CachableBlockFile {
       }
 
       BlockReader _currBlock = getBCFile().getDataBlock(blockIndex);
-      return new BlockRead(_currBlock);
+      return new CachedBlockRead(_currBlock);
     }
 
-    @Override
-    public ABlockReader getDataBlock(long offset, long compressedSize, long rawSize)
+    public CachedBlockRead getDataBlock(long offset, long compressedSize, long rawSize)
         throws IOException {
       if (_dCache != null) {
         String _lookup = this.cacheId + "R" + offset;
@@ -477,7 +434,7 @@ public class CachableBlockFile {
       }
 
       BlockReader _currBlock = getBCFile().getDataBlock(offset, compressedSize, rawSize);
-      return new BlockRead(_currBlock);
+      return new CachedBlockRead(_currBlock);
     }
 
     @Override
@@ -502,9 +459,17 @@ public class CachableBlockFile {
 
   }
 
-  public static class CachedBlockRead extends BlockRead {
+  public static class CachedBlockRead extends DataInputStream {
     private SeekableByteArrayInputStream seekableInput;
     private final CacheEntry cb;
+    boolean indexable;
+
+    public CachedBlockRead(InputStream in) {
+      super(in);
+      cb = null;
+      seekableInput = null;
+      indexable = false;
+    }
 
     public CachedBlockRead(CacheEntry cb, byte buf[]) {
       this(new SeekableByteArrayInputStream(buf), cb);
@@ -514,96 +479,40 @@ public class CachableBlockFile {
       super(seekableInput);
       this.seekableInput = seekableInput;
       this.cb = cb;
+      indexable = true;
+    }
+
+    /**
+     * It is intended that the caller of this method will close the stream we also only intend
that
+     * this be called once per BlockRead. This method is provide for methods up stream that
expect
+     * to receive a DataInputStream object.
+     */
+    public DataInputStream getStream() {
+      return this;
     }
 
-    @Override
     public void seek(int position) {
       seekableInput.seek(position);
     }
 
-    @Override
     public int getPosition() {
       return seekableInput.getPosition();
     }
 
-    @Override
     public boolean isIndexable() {
-      return true;
+      return indexable;
     }
 
-    @Override
     public byte[] getBuffer() {
       return seekableInput.getBuffer();
     }
 
-    @Override
     public <T extends Weighbable> T getIndex(Supplier<T> indexSupplier) {
       return cb.getIndex(indexSupplier);
     }
 
-    @Override
     public void indexWeightChanged() {
       cb.indexWeightChanged();
     }
   }
-
-  /**
-   *
-   * Class provides functionality to read one block from the underlying BCFile Since We are
caching
-   * blocks in the Reader class as bytearrays, this class will wrap a
-   * DataInputStream(ByteArrayStream(cachedBlock)).
-   *
-   *
-   */
-  public static class BlockRead extends DataInputStream implements ABlockReader {
-
-    public BlockRead(InputStream in) {
-      super(in);
-    }
-
-    /**
-     * It is intended that the caller of this method will close the stream we also only intend
that
-     * this be called once per BlockRead. This method is provide for methods up stream that
expect
-     * to receive a DataInputStream object.
-     */
-    @Override
-    public DataInputStream getStream() throws IOException {
-      return this;
-    }
-
-    @Override
-    public boolean isIndexable() {
-      return false;
-    }
-
-    @Override
-    public void seek(int position) {
-      throw new UnsupportedOperationException();
-    }
-
-    @Override
-    public int getPosition() {
-      throw new UnsupportedOperationException();
-    }
-
-    @Override
-    public <T extends Weighbable> T getIndex(Supplier<T> clazz) {
-      throw new UnsupportedOperationException();
-    }
-
-    /**
-     * The byte array returned by this method is only for read optimizations, it should not
be
-     * modified.
-     */
-    @Override
-    public byte[] getBuffer() {
-      throw new UnsupportedOperationException();
-    }
-
-    @Override
-    public void indexWeightChanged() {
-      throw new UnsupportedOperationException();
-    }
-
-  }
 }
diff --git a/core/src/main/java/org/apache/accumulo/core/file/rfile/BlockIndex.java b/core/src/main/java/org/apache/accumulo/core/file/rfile/BlockIndex.java
index fa302b4..c48de2d 100644
--- a/core/src/main/java/org/apache/accumulo/core/file/rfile/BlockIndex.java
+++ b/core/src/main/java/org/apache/accumulo/core/file/rfile/BlockIndex.java
@@ -16,6 +16,8 @@
  */
 package org.apache.accumulo.core.file.rfile;
 
+import static org.apache.accumulo.core.file.blockfile.impl.CachableBlockFile.CachedBlockRead;
+
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -23,7 +25,6 @@ import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.accumulo.core.data.Key;
 import org.apache.accumulo.core.data.Value;
-import org.apache.accumulo.core.file.blockfile.ABlockReader;
 import org.apache.accumulo.core.file.blockfile.cache.CacheEntry.Weighbable;
 import org.apache.accumulo.core.file.blockfile.cache.impl.ClassSize;
 import org.apache.accumulo.core.file.blockfile.cache.impl.SizeConstants;
@@ -31,7 +32,7 @@ import org.apache.accumulo.core.file.rfile.MultiLevelIndex.IndexEntry;
 
 public class BlockIndex implements Weighbable {
 
-  public static BlockIndex getIndex(ABlockReader cacheBlock, IndexEntry indexEntry)
+  public static BlockIndex getIndex(CachedBlockRead cacheBlock, IndexEntry indexEntry)
       throws IOException {
 
     BlockIndex blockIndex = cacheBlock.getIndex(BlockIndex::new);
@@ -112,7 +113,7 @@ public class BlockIndex implements Weighbable {
     }
   }
 
-  public BlockIndexEntry seekBlock(Key startKey, ABlockReader cacheBlock) {
+  public BlockIndexEntry seekBlock(Key startKey, CachedBlockRead cacheBlock) {
 
     // get a local ref to the index, another thread could change it
     BlockIndexEntry[] blockIndex = this.blockIndex;
@@ -156,7 +157,7 @@ public class BlockIndex implements Weighbable {
     return bie;
   }
 
-  private synchronized void buildIndex(int indexEntries, ABlockReader cacheBlock,
+  private synchronized void buildIndex(int indexEntries, CachedBlockRead cacheBlock,
       IndexEntry indexEntry) throws IOException {
     cacheBlock.seek(0);
 
diff --git a/core/src/main/java/org/apache/accumulo/core/file/rfile/MultiLevelIndex.java b/core/src/main/java/org/apache/accumulo/core/file/rfile/MultiLevelIndex.java
index a403add..992a3ec 100644
--- a/core/src/main/java/org/apache/accumulo/core/file/rfile/MultiLevelIndex.java
+++ b/core/src/main/java/org/apache/accumulo/core/file/rfile/MultiLevelIndex.java
@@ -36,11 +36,9 @@ import java.util.Map;
 import java.util.RandomAccess;
 
 import org.apache.accumulo.core.data.Key;
-import org.apache.accumulo.core.file.blockfile.ABlockReader;
-import org.apache.accumulo.core.file.blockfile.ABlockWriter;
-import org.apache.accumulo.core.file.blockfile.BlockFileReader;
-import org.apache.accumulo.core.file.blockfile.BlockFileWriter;
+import org.apache.accumulo.core.file.blockfile.impl.CachableBlockFile;
 import org.apache.accumulo.core.file.blockfile.impl.SeekableByteArrayInputStream;
+import org.apache.accumulo.core.file.rfile.bcfile.BCFile;
 import org.apache.accumulo.core.file.rfile.bcfile.Utils;
 import org.apache.hadoop.io.WritableComparable;
 
@@ -321,7 +319,7 @@ public class MultiLevelIndex {
         offset = in.readInt();
         hasNext = in.readBoolean();
 
-        ABlockReader abr = (ABlockReader) in;
+        CachableBlockFile.CachedBlockRead abr = (CachableBlockFile.CachedBlockRead) in;
         if (abr.isIndexable()) {
           // this block is cahced, so avoid copy
           data = abr.getBuffer();
@@ -509,9 +507,9 @@ public class MultiLevelIndex {
 
     private boolean addedLast = false;
 
-    private BlockFileWriter blockFileWriter;
+    private CachableBlockFile.Writer blockFileWriter;
 
-    Writer(BlockFileWriter blockFileWriter, int maxBlockSize) {
+    Writer(CachableBlockFile.Writer blockFileWriter, int maxBlockSize) {
       this.blockFileWriter = blockFileWriter;
       this.threshold = maxBlockSize;
       levels = new ArrayList<>();
@@ -535,7 +533,7 @@ public class MultiLevelIndex {
 
       IndexBlock iblock = levels.get(level);
       if ((iblock.getSize() > threshold && iblock.offsets.size() > 1) || last)
{
-        ABlockWriter out = blockFileWriter.prepareDataBlock();
+        BCFile.Writer.BlockAppender out = blockFileWriter.prepareDataBlock();
         iblock.setHasNext(!last);
         iblock.write(out);
         out.close();
@@ -586,7 +584,7 @@ public class MultiLevelIndex {
 
   public static class Reader {
     private IndexBlock rootBlock;
-    private BlockFileReader blockStore;
+    private CachableBlockFile.Reader blockStore;
     private int version;
     private int size;
 
@@ -799,15 +797,15 @@ public class MultiLevelIndex {
 
     }
 
-    public Reader(BlockFileReader blockStore, int version) {
+    public Reader(CachableBlockFile.Reader blockStore, int version) {
       this.version = version;
       this.blockStore = blockStore;
     }
 
     private IndexBlock getIndexBlock(IndexEntry ie) throws IOException {
       IndexBlock iblock = new IndexBlock();
-      ABlockReader in = blockStore.getMetaBlock(ie.getOffset(), ie.getCompressedSize(),
-          ie.getRawSize());
+      CachableBlockFile.CachedBlockRead in = blockStore.getMetaBlock(ie.getOffset(),
+          ie.getCompressedSize(), ie.getRawSize());
       iblock.readFields(in, version);
       in.close();
 
diff --git a/core/src/main/java/org/apache/accumulo/core/file/rfile/RFile.java b/core/src/main/java/org/apache/accumulo/core/file/rfile/RFile.java
index 2ec1245..7247b60 100644
--- a/core/src/main/java/org/apache/accumulo/core/file/rfile/RFile.java
+++ b/core/src/main/java/org/apache/accumulo/core/file/rfile/RFile.java
@@ -51,14 +51,12 @@ import org.apache.accumulo.core.data.Value;
 import org.apache.accumulo.core.file.FileSKVIterator;
 import org.apache.accumulo.core.file.FileSKVWriter;
 import org.apache.accumulo.core.file.NoSuchMetaStoreException;
-import org.apache.accumulo.core.file.blockfile.ABlockReader;
-import org.apache.accumulo.core.file.blockfile.ABlockWriter;
-import org.apache.accumulo.core.file.blockfile.BlockFileReader;
-import org.apache.accumulo.core.file.blockfile.BlockFileWriter;
+import org.apache.accumulo.core.file.blockfile.impl.CachableBlockFile;
 import org.apache.accumulo.core.file.rfile.BlockIndex.BlockIndexEntry;
 import org.apache.accumulo.core.file.rfile.MultiLevelIndex.IndexEntry;
 import org.apache.accumulo.core.file.rfile.MultiLevelIndex.Reader.IndexIterator;
 import org.apache.accumulo.core.file.rfile.RelativeKey.SkippR;
+import org.apache.accumulo.core.file.rfile.bcfile.BCFile.Writer.BlockAppender;
 import org.apache.accumulo.core.file.rfile.bcfile.MetaBlockDoesNotExist;
 import org.apache.accumulo.core.iterators.IterationInterruptedException;
 import org.apache.accumulo.core.iterators.IteratorEnvironment;
@@ -159,13 +157,14 @@ public class RFile {
     private MultiLevelIndex.Reader indexReader;
     private int version;
 
-    public LocalityGroupMetadata(int version, BlockFileReader br) {
+    public LocalityGroupMetadata(int version, CachableBlockFile.Reader br) {
       columnFamilies = new HashMap<>();
       indexReader = new MultiLevelIndex.Reader(br, version);
       this.version = version;
     }
 
-    public LocalityGroupMetadata(Set<ByteSequence> pcf, int indexBlockSize, BlockFileWriter
bfw) {
+    public LocalityGroupMetadata(Set<ByteSequence> pcf, int indexBlockSize,
+        CachableBlockFile.Writer bfw) {
       isDefaultLG = true;
       columnFamilies = new HashMap<>();
       previousColumnFamilies = pcf;
@@ -175,7 +174,7 @@ public class RFile {
     }
 
     public LocalityGroupMetadata(String name, Set<ByteSequence> cfset, int indexBlockSize,
-        BlockFileWriter bfw) {
+        CachableBlockFile.Writer bfw) {
       this.name = name;
       isDefaultLG = false;
       columnFamilies = new HashMap<>();
@@ -423,10 +422,9 @@ public class RFile {
 
   private static class LocalityGroupWriter {
 
-    private BlockFileWriter fileWriter;
-    private ABlockWriter blockWriter;
+    private CachableBlockFile.Writer fileWriter;
+    private BlockAppender blockWriter;
 
-    // private BlockAppender blockAppender;
     private final long blockSize;
     private final long maxBlockSize;
     private int entries = 0;
@@ -443,7 +441,7 @@ public class RFile {
     private RollingStats keyLenStats = new RollingStats(2017);
     private double averageKeySize = 0;
 
-    LocalityGroupWriter(BlockFileWriter fileWriter, long blockSize, long maxBlockSize,
+    LocalityGroupWriter(CachableBlockFile.Writer fileWriter, long blockSize, long maxBlockSize,
         LocalityGroupMetadata currentLocalityGroup, SampleLocalityGroupWriter sample) {
       this.fileWriter = fileWriter;
       this.blockSize = blockSize;
@@ -554,9 +552,8 @@ public class RFile {
     public static final int MAX_CF_IN_DLG = 1000;
     private static final double MAX_BLOCK_MULTIPLIER = 1.1;
 
-    private BlockFileWriter fileWriter;
+    private CachableBlockFile.Writer fileWriter;
 
-    // private BlockAppender blockAppender;
     private final long blockSize;
     private final long maxBlockSize;
     private final int indexBlockSize;
@@ -578,12 +575,12 @@ public class RFile {
     private SamplerConfigurationImpl samplerConfig;
     private Sampler sampler;
 
-    public Writer(BlockFileWriter bfw, int blockSize) throws IOException {
+    public Writer(CachableBlockFile.Writer bfw, int blockSize) throws IOException {
       this(bfw, blockSize, (int) DefaultConfiguration.getInstance()
           .getAsBytes(Property.TABLE_FILE_COMPRESSED_BLOCK_SIZE_INDEX), null, null);
     }
 
-    public Writer(BlockFileWriter bfw, int blockSize, int indexBlockSize,
+    public Writer(CachableBlockFile.Writer bfw, int blockSize, int indexBlockSize,
         SamplerConfigurationImpl samplerConfig, Sampler sampler) throws IOException {
       this.blockSize = blockSize;
       this.maxBlockSize = (long) (blockSize * MAX_BLOCK_MULTIPLIER);
@@ -603,7 +600,7 @@ public class RFile {
 
       closeData();
 
-      ABlockWriter mba = fileWriter.prepareMetaBlock("RFile.index");
+      BlockAppender mba = fileWriter.prepareMetaBlock("RFile.index");
 
       mba.writeInt(RINDEX_MAGIC);
       mba.writeInt(RINDEX_VER_8);
@@ -748,7 +745,7 @@ public class RFile {
 
   private static class LocalityGroupReader extends LocalityGroup implements FileSKVIterator
{
 
-    private BlockFileReader reader;
+    private CachableBlockFile.Reader reader;
     private MultiLevelIndex.Reader index;
     private int blockCount;
     private Key firstKey;
@@ -757,8 +754,8 @@ public class RFile {
     private int version;
     private boolean checkRange = true;
 
-    private LocalityGroupReader(BlockFileReader reader, LocalityGroupMetadata lgm, int version)
-        throws IOException {
+    private LocalityGroupReader(CachableBlockFile.Reader reader, LocalityGroupMetadata lgm,
+        int version) throws IOException {
       super(lgm.columnFamilies, lgm.isDefaultLG);
       this.firstKey = lgm.firstKey;
       this.index = lgm.indexReader;
@@ -795,7 +792,7 @@ public class RFile {
 
     private IndexIterator iiter;
     private int entriesLeft;
-    private ABlockReader currBlock;
+    private CachableBlockFile.CachedBlockRead currBlock;
     private RelativeKey rk;
     private Value val;
     private Key prevKey = null;
@@ -867,7 +864,8 @@ public class RFile {
         hasTop = !range.afterEndKey(rk.getKey());
     }
 
-    private ABlockReader getDataBlock(IndexEntry indexEntry) throws IOException {
+    private CachableBlockFile.CachedBlockRead getDataBlock(IndexEntry indexEntry)
+        throws IOException {
       if (interruptFlag != null && interruptFlag.get())
         throw new IterationInterruptedException();
 
@@ -1132,7 +1130,7 @@ public class RFile {
 
   public static class Reader extends HeapIterator implements FileSKVIterator {
 
-    private final BlockFileReader reader;
+    private final CachableBlockFile.Reader reader;
 
     private final ArrayList<LocalityGroupMetadata> localityGroups = new ArrayList<>();
     private final ArrayList<LocalityGroupMetadata> sampleGroups = new ArrayList<>();
@@ -1152,10 +1150,10 @@ public class RFile {
 
     private int rfileVersion;
 
-    public Reader(BlockFileReader rdr) throws IOException {
+    public Reader(CachableBlockFile.Reader rdr) throws IOException {
       this.reader = rdr;
 
-      ABlockReader mb = reader.getMetaBlock("RFile.index");
+      CachableBlockFile.CachedBlockRead mb = reader.getMetaBlock("RFile.index");
       try {
         int magic = mb.readInt();
         int ver = mb.readInt();
diff --git a/core/src/test/java/org/apache/accumulo/core/file/rfile/BlockIndexTest.java b/core/src/test/java/org/apache/accumulo/core/file/rfile/BlockIndexTest.java
index 2ec6ab9..1458e11 100644
--- a/core/src/test/java/org/apache/accumulo/core/file/rfile/BlockIndexTest.java
+++ b/core/src/test/java/org/apache/accumulo/core/file/rfile/BlockIndexTest.java
@@ -23,7 +23,6 @@ import java.util.function.Supplier;
 
 import org.apache.accumulo.core.data.Key;
 import org.apache.accumulo.core.data.Value;
-import org.apache.accumulo.core.file.blockfile.ABlockReader;
 import org.apache.accumulo.core.file.blockfile.cache.CacheEntry;
 import org.apache.accumulo.core.file.blockfile.impl.CachableBlockFile;
 import org.apache.accumulo.core.file.rfile.BlockIndex.BlockIndexEntry;
@@ -80,7 +79,7 @@ public class BlockIndexTest {
 
     CacheEntry ce = new MyCacheEntry(data);
 
-    ABlockReader cacheBlock = new CachableBlockFile.CachedBlockRead(ce, data);
+    CachableBlockFile.CachedBlockRead cacheBlock = new CachableBlockFile.CachedBlockRead(ce,
data);
     BlockIndex blockIndex = null;
 
     for (int i = 0; i < 129; i++)
@@ -152,7 +151,7 @@ public class BlockIndexTest {
 
     CacheEntry ce = new MyCacheEntry(data);
 
-    ABlockReader cacheBlock = new CachableBlockFile.CachedBlockRead(ce, data);
+    CachableBlockFile.CachedBlockRead cacheBlock = new CachableBlockFile.CachedBlockRead(ce,
data);
     BlockIndex blockIndex = null;
 
     for (int i = 0; i < 257; i++)
diff --git a/core/src/test/java/org/apache/accumulo/core/file/rfile/MultiLevelIndexTest.java
b/core/src/test/java/org/apache/accumulo/core/file/rfile/MultiLevelIndexTest.java
index 0b69407..656e7da 100644
--- a/core/src/test/java/org/apache/accumulo/core/file/rfile/MultiLevelIndexTest.java
+++ b/core/src/test/java/org/apache/accumulo/core/file/rfile/MultiLevelIndexTest.java
@@ -23,15 +23,14 @@ import java.util.Random;
 import org.apache.accumulo.core.conf.AccumuloConfiguration;
 import org.apache.accumulo.core.conf.DefaultConfiguration;
 import org.apache.accumulo.core.data.Key;
-import org.apache.accumulo.core.file.blockfile.ABlockWriter;
 import org.apache.accumulo.core.file.blockfile.impl.CachableBlockFile;
-import org.apache.accumulo.core.file.blockfile.impl.CachableBlockFile.BlockRead;
 import org.apache.accumulo.core.file.rfile.MultiLevelIndex.BufferedWriter;
 import org.apache.accumulo.core.file.rfile.MultiLevelIndex.IndexEntry;
 import org.apache.accumulo.core.file.rfile.MultiLevelIndex.Reader;
 import org.apache.accumulo.core.file.rfile.MultiLevelIndex.Reader.IndexIterator;
 import org.apache.accumulo.core.file.rfile.MultiLevelIndex.Writer;
 import org.apache.accumulo.core.file.rfile.RFileTest.SeekableByteArrayInputStream;
+import org.apache.accumulo.core.file.rfile.bcfile.BCFile;
 import org.apache.accumulo.core.file.streams.PositionedOutputs;
 import org.apache.accumulo.core.util.CachedConfiguration;
 import org.apache.hadoop.fs.FSDataInputStream;
@@ -67,7 +66,7 @@ public class MultiLevelIndexTest extends TestCase {
 
     mliw.addLast(new Key(String.format("%05d000", num)), num, 0, 0, 0);
 
-    ABlockWriter root = _cbw.prepareMetaBlock("root");
+    BCFile.Writer.BlockAppender root = _cbw.prepareMetaBlock("root");
     mliw.close(root);
     root.close();
 
@@ -82,7 +81,7 @@ public class MultiLevelIndexTest extends TestCase {
         CachedConfiguration.getInstance(), aconf);
 
     Reader reader = new Reader(_cbr, RFile.RINDEX_VER_8);
-    BlockRead rootIn = _cbr.getMetaBlock("root");
+    CachableBlockFile.CachedBlockRead rootIn = _cbr.getMetaBlock("root");
     reader.readFields(rootIn);
     rootIn.close();
     IndexIterator liter = reader.lookup(new Key("000000"));

-- 
To stop receiving notification emails like this one, please contact
mmiller@apache.org.

Mime
View raw message