accumulo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mmil...@apache.org
Subject [accumulo] branch master updated: Inline inner Rfile classes and interfaces (#487)
Date Fri, 18 May 2018 21:14:37 GMT
This is an automated email from the ASF dual-hosted git repository.

mmiller pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/accumulo.git


The following commit(s) were added to refs/heads/master by this push:
     new 40cbc60  Inline inner Rfile classes and interfaces (#487)
40cbc60 is described below

commit 40cbc60ed7fcb527b1fa57d935baa25d8d47eae4
Author: Mike Miller <mmiller@apache.org>
AuthorDate: Fri May 18 17:14:32 2018 -0400

    Inline inner Rfile classes and interfaces (#487)
    
    * Eliminated CachableBlockFile.Writer by inlining BCFile.Writer
    * Made BCFile close its OutputStream
    * Eliminated PositionedOuputs wrapping and PositionedOuput interface
    * Made RateLimitedOutputStream extend DataOutputStream and take FSDataOutputStream
---
 .../file/blockfile/impl/CachableBlockFile.java     | 63 ---------------
 .../accumulo/core/file/rfile/MultiLevelIndex.java  |  4 +-
 .../org/apache/accumulo/core/file/rfile/RFile.java | 20 ++---
 .../accumulo/core/file/rfile/RFileOperations.java  |  7 +-
 .../accumulo/core/file/rfile/SplitLarge.java       |  5 +-
 .../accumulo/core/file/rfile/bcfile/BCFile.java    | 93 ++++++++--------------
 .../file/streams/PositionedDataOutputStream.java   | 37 ---------
 .../core/file/streams/PositionedOutput.java        | 27 -------
 .../core/file/streams/PositionedOutputs.java       | 73 -----------------
 .../core/file/streams/RateLimitedOutputStream.java | 14 ++--
 .../core/file/rfile/CreateCompatTestFile.java      |  6 +-
 .../core/file/rfile/MultiLevelIndexTest.java       |  5 +-
 .../core/file/rfile/MultiThreadedRFileTest.java    |  5 +-
 .../apache/accumulo/core/file/rfile/RFileTest.java |  5 +-
 .../file/streams/RateLimitedOutputStreamTest.java  | 13 +--
 15 files changed, 70 insertions(+), 307 deletions(-)

diff --git a/core/src/main/java/org/apache/accumulo/core/file/blockfile/impl/CachableBlockFile.java
b/core/src/main/java/org/apache/accumulo/core/file/blockfile/impl/CachableBlockFile.java
index 43e9730..de3d88d 100644
--- a/core/src/main/java/org/apache/accumulo/core/file/blockfile/impl/CachableBlockFile.java
+++ b/core/src/main/java/org/apache/accumulo/core/file/blockfile/impl/CachableBlockFile.java
@@ -20,7 +20,6 @@ import java.io.Closeable;
 import java.io.DataInputStream;
 import java.io.IOException;
 import java.io.InputStream;
-import java.io.OutputStream;
 import java.io.UncheckedIOException;
 import java.util.Collections;
 import java.util.Map;
@@ -35,9 +34,7 @@ import org.apache.accumulo.core.file.blockfile.cache.CacheEntry.Weighbable;
 import org.apache.accumulo.core.file.rfile.bcfile.BCFile;
 import org.apache.accumulo.core.file.rfile.bcfile.BCFile.Reader.BlockReader;
 import org.apache.accumulo.core.file.rfile.bcfile.MetaBlockDoesNotExist;
-import org.apache.accumulo.core.file.streams.PositionedOutput;
 import org.apache.accumulo.core.file.streams.RateLimitedInputStream;
-import org.apache.accumulo.core.file.streams.RateLimitedOutputStream;
 import org.apache.accumulo.core.util.ratelimit.RateLimiter;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
@@ -58,57 +55,6 @@ public class CachableBlockFile {
 
   private static final Logger log = LoggerFactory.getLogger(CachableBlockFile.class);
 
-  public static class Writer implements Closeable {
-    private BCFile.Writer _bc;
-    private BCFile.Writer.BlockAppender _bw;
-    private final PositionedOutput fsout;
-    private long length = 0;
-
-    public Writer(FileSystem fs, Path fName, String compressAlgor, RateLimiter writeLimiter,
-        Configuration conf, AccumuloConfiguration accumuloConfiguration) throws IOException
{
-      this(new RateLimitedOutputStream(fs.create(fName), writeLimiter), compressAlgor, conf,
-          accumuloConfiguration);
-    }
-
-    public <OutputStreamType extends OutputStream & PositionedOutput> Writer(OutputStreamType
fsout,
-        String compressAlgor, Configuration conf, AccumuloConfiguration accumuloConfiguration)
-        throws IOException {
-      this.fsout = fsout;
-      init(fsout, compressAlgor, conf, accumuloConfiguration);
-    }
-
-    private <OutputStreamT extends OutputStream & PositionedOutput> void init(OutputStreamT
fsout,
-        String compressAlgor, Configuration conf, AccumuloConfiguration accumuloConfiguration)
-        throws IOException {
-      _bc = new BCFile.Writer(fsout, compressAlgor, conf, false, accumuloConfiguration);
-    }
-
-    public BCFile.Writer.BlockAppender prepareMetaBlock(String name) throws IOException {
-      _bw = _bc.prepareMetaBlock(name);
-      return _bw;
-    }
-
-    public BCFile.Writer.BlockAppender prepareDataBlock() throws IOException {
-      _bw = _bc.prepareDataBlock();
-      return _bw;
-    }
-
-    @Override
-    public void close() throws IOException {
-
-      _bw.close();
-      _bc.close();
-
-      length = this.fsout.position();
-      ((OutputStream) this.fsout).close();
-    }
-
-    public long getLength() throws IOException {
-      return length;
-    }
-
-  }
-
   private static interface IoeSupplier<T> {
     T get() throws IOException;
   }
@@ -482,15 +428,6 @@ public class CachableBlockFile {
       indexable = true;
     }
 
-    /**
-     * It is intended that the caller of this method will close the stream we also only intend
that
-     * this be called once per BlockRead. This method is provide for methods up stream that
expect
-     * to receive a DataInputStream object.
-     */
-    public DataInputStream getStream() {
-      return this;
-    }
-
     public void seek(int position) {
       seekableInput.seek(position);
     }
diff --git a/core/src/main/java/org/apache/accumulo/core/file/rfile/MultiLevelIndex.java b/core/src/main/java/org/apache/accumulo/core/file/rfile/MultiLevelIndex.java
index 992a3ec..a71c79e 100644
--- a/core/src/main/java/org/apache/accumulo/core/file/rfile/MultiLevelIndex.java
+++ b/core/src/main/java/org/apache/accumulo/core/file/rfile/MultiLevelIndex.java
@@ -507,9 +507,9 @@ public class MultiLevelIndex {
 
     private boolean addedLast = false;
 
-    private CachableBlockFile.Writer blockFileWriter;
+    private BCFile.Writer blockFileWriter;
 
-    Writer(CachableBlockFile.Writer blockFileWriter, int maxBlockSize) {
+    Writer(BCFile.Writer blockFileWriter, int maxBlockSize) {
       this.blockFileWriter = blockFileWriter;
       this.threshold = maxBlockSize;
       levels = new ArrayList<>();
diff --git a/core/src/main/java/org/apache/accumulo/core/file/rfile/RFile.java b/core/src/main/java/org/apache/accumulo/core/file/rfile/RFile.java
index 7247b60..0c85066 100644
--- a/core/src/main/java/org/apache/accumulo/core/file/rfile/RFile.java
+++ b/core/src/main/java/org/apache/accumulo/core/file/rfile/RFile.java
@@ -56,6 +56,7 @@ import org.apache.accumulo.core.file.rfile.BlockIndex.BlockIndexEntry;
 import org.apache.accumulo.core.file.rfile.MultiLevelIndex.IndexEntry;
 import org.apache.accumulo.core.file.rfile.MultiLevelIndex.Reader.IndexIterator;
 import org.apache.accumulo.core.file.rfile.RelativeKey.SkippR;
+import org.apache.accumulo.core.file.rfile.bcfile.BCFile;
 import org.apache.accumulo.core.file.rfile.bcfile.BCFile.Writer.BlockAppender;
 import org.apache.accumulo.core.file.rfile.bcfile.MetaBlockDoesNotExist;
 import org.apache.accumulo.core.iterators.IterationInterruptedException;
@@ -163,8 +164,7 @@ public class RFile {
       this.version = version;
     }
 
-    public LocalityGroupMetadata(Set<ByteSequence> pcf, int indexBlockSize,
-        CachableBlockFile.Writer bfw) {
+    public LocalityGroupMetadata(Set<ByteSequence> pcf, int indexBlockSize, BCFile.Writer
bfw) {
       isDefaultLG = true;
       columnFamilies = new HashMap<>();
       previousColumnFamilies = pcf;
@@ -174,7 +174,7 @@ public class RFile {
     }
 
     public LocalityGroupMetadata(String name, Set<ByteSequence> cfset, int indexBlockSize,
-        CachableBlockFile.Writer bfw) {
+        BCFile.Writer bfw) {
       this.name = name;
       isDefaultLG = false;
       columnFamilies = new HashMap<>();
@@ -422,7 +422,7 @@ public class RFile {
 
   private static class LocalityGroupWriter {
 
-    private CachableBlockFile.Writer fileWriter;
+    private BCFile.Writer fileWriter;
     private BlockAppender blockWriter;
 
     private final long blockSize;
@@ -441,7 +441,7 @@ public class RFile {
     private RollingStats keyLenStats = new RollingStats(2017);
     private double averageKeySize = 0;
 
-    LocalityGroupWriter(CachableBlockFile.Writer fileWriter, long blockSize, long maxBlockSize,
+    LocalityGroupWriter(BCFile.Writer fileWriter, long blockSize, long maxBlockSize,
         LocalityGroupMetadata currentLocalityGroup, SampleLocalityGroupWriter sample) {
       this.fileWriter = fileWriter;
       this.blockSize = blockSize;
@@ -552,7 +552,7 @@ public class RFile {
     public static final int MAX_CF_IN_DLG = 1000;
     private static final double MAX_BLOCK_MULTIPLIER = 1.1;
 
-    private CachableBlockFile.Writer fileWriter;
+    private BCFile.Writer fileWriter;
 
     private final long blockSize;
     private final long maxBlockSize;
@@ -575,12 +575,12 @@ public class RFile {
     private SamplerConfigurationImpl samplerConfig;
     private Sampler sampler;
 
-    public Writer(CachableBlockFile.Writer bfw, int blockSize) throws IOException {
+    public Writer(BCFile.Writer bfw, int blockSize) throws IOException {
       this(bfw, blockSize, (int) DefaultConfiguration.getInstance()
           .getAsBytes(Property.TABLE_FILE_COMPRESSED_BLOCK_SIZE_INDEX), null, null);
     }
 
-    public Writer(CachableBlockFile.Writer bfw, int blockSize, int indexBlockSize,
+    public Writer(BCFile.Writer bfw, int blockSize, int indexBlockSize,
         SamplerConfigurationImpl samplerConfig, Sampler sampler) throws IOException {
       this.blockSize = blockSize;
       this.maxBlockSize = (long) (blockSize * MAX_BLOCK_MULTIPLIER);
@@ -662,7 +662,7 @@ public class RFile {
     public DataOutputStream createMetaStore(String name) throws IOException {
       closeData();
 
-      return (DataOutputStream) fileWriter.prepareMetaBlock(name);
+      return fileWriter.prepareMetaBlock(name);
     }
 
     private void _startNewLocalityGroup(String name, Set<ByteSequence> columnFamilies)
@@ -1340,7 +1340,7 @@ public class RFile {
     @Override
     public DataInputStream getMetaStore(String name) throws IOException, NoSuchMetaStoreException
{
       try {
-        return this.reader.getMetaBlock(name).getStream();
+        return this.reader.getMetaBlock(name);
       } catch (MetaBlockDoesNotExist e) {
         throw new NoSuchMetaStoreException("name = " + name, e);
       }
diff --git a/core/src/main/java/org/apache/accumulo/core/file/rfile/RFileOperations.java b/core/src/main/java/org/apache/accumulo/core/file/rfile/RFileOperations.java
index 0054db2..5d8705e 100644
--- a/core/src/main/java/org/apache/accumulo/core/file/rfile/RFileOperations.java
+++ b/core/src/main/java/org/apache/accumulo/core/file/rfile/RFileOperations.java
@@ -30,7 +30,7 @@ import org.apache.accumulo.core.file.FileOperations;
 import org.apache.accumulo.core.file.FileSKVIterator;
 import org.apache.accumulo.core.file.FileSKVWriter;
 import org.apache.accumulo.core.file.blockfile.impl.CachableBlockFile;
-import org.apache.accumulo.core.file.streams.RateLimitedOutputStream;
+import org.apache.accumulo.core.file.rfile.bcfile.BCFile;
 import org.apache.accumulo.core.sample.impl.SamplerConfigurationImpl;
 import org.apache.accumulo.core.sample.impl.SamplerFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -128,9 +128,8 @@ public class RFileOperations extends FileOperations {
       outputStream = fs.create(new Path(file), false, bufferSize, (short) rep, block);
     }
 
-    CachableBlockFile.Writer _cbw = new CachableBlockFile.Writer(
-        new RateLimitedOutputStream(outputStream, options.getRateLimiter()), compression,
conf,
-        acuconf);
+    BCFile.Writer _cbw = new BCFile.Writer(outputStream, options.getRateLimiter(), compression,
+        conf, acuconf);
 
     return new RFile.Writer(_cbw, (int) blockSize, (int) indexBlockSize, samplerConfig, sampler);
   }
diff --git a/core/src/main/java/org/apache/accumulo/core/file/rfile/SplitLarge.java b/core/src/main/java/org/apache/accumulo/core/file/rfile/SplitLarge.java
index c3663cd..48f2873 100644
--- a/core/src/main/java/org/apache/accumulo/core/file/rfile/SplitLarge.java
+++ b/core/src/main/java/org/apache/accumulo/core/file/rfile/SplitLarge.java
@@ -29,6 +29,7 @@ import org.apache.accumulo.core.data.Value;
 import org.apache.accumulo.core.file.blockfile.impl.CachableBlockFile;
 import org.apache.accumulo.core.file.rfile.RFile.Reader;
 import org.apache.accumulo.core.file.rfile.RFile.Writer;
+import org.apache.accumulo.core.file.rfile.bcfile.BCFile;
 import org.apache.accumulo.core.util.CachedConfiguration;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
@@ -71,10 +72,10 @@ public class SplitLarge {
         int blockSize = (int) aconf.getAsBytes(Property.TABLE_FILE_BLOCK_SIZE);
         try (
             Writer small = new RFile.Writer(
-                new CachableBlockFile.Writer(fs, new Path(smallName), "gz", null, conf, aconf),
+                new BCFile.Writer(fs.create(new Path(smallName)), null, "gz", conf, aconf),
                 blockSize);
             Writer large = new RFile.Writer(
-                new CachableBlockFile.Writer(fs, new Path(largeName), "gz", null, conf, aconf),
+                new BCFile.Writer(fs.create(new Path(largeName)), null, "gz", conf, aconf),
                 blockSize)) {
           small.startDefaultLocalityGroup();
           large.startDefaultLocalityGroup();
diff --git a/core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/BCFile.java b/core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/BCFile.java
index f367ed0..9abbef6 100644
--- a/core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/BCFile.java
+++ b/core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/BCFile.java
@@ -42,16 +42,17 @@ import org.apache.accumulo.core.conf.Property;
 import org.apache.accumulo.core.file.rfile.bcfile.Compression.Algorithm;
 import org.apache.accumulo.core.file.rfile.bcfile.Utils.Version;
 import org.apache.accumulo.core.file.streams.BoundedRangeFileInputStream;
-import org.apache.accumulo.core.file.streams.PositionedDataOutputStream;
-import org.apache.accumulo.core.file.streams.PositionedOutput;
+import org.apache.accumulo.core.file.streams.RateLimitedOutputStream;
 import org.apache.accumulo.core.file.streams.SeekableDataInputStream;
 import org.apache.accumulo.core.security.crypto.CryptoModule;
 import org.apache.accumulo.core.security.crypto.CryptoModuleFactory;
 import org.apache.accumulo.core.security.crypto.CryptoModuleParameters;
 import org.apache.accumulo.core.security.crypto.SecretKeyEncryptionStrategy;
+import org.apache.accumulo.core.util.ratelimit.RateLimiter;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.Seekable;
 import org.apache.hadoop.io.BytesWritable;
 import org.apache.hadoop.io.compress.Compressor;
@@ -91,7 +92,7 @@ public final class BCFile {
    * BCFile writer, the entry point for creating a new BCFile.
    */
   static public class Writer implements Closeable {
-    private final PositionedDataOutputStream out;
+    private final RateLimitedOutputStream out;
     private final Configuration conf;
     private final CryptoModule cryptoModule;
     private BCFileCryptoModuleParameters cryptoParams;
@@ -106,23 +107,10 @@ public final class BCFile {
     long errorCount = 0;
     // reusable buffers.
     private BytesWritable fsOutputBuffer;
+    private long length = 0;
 
-    /**
-     * Call-back interface to register a block after a block is closed.
-     */
-    private interface BlockRegister {
-      /**
-       * Register a block that is fully closed.
-       *
-       * @param raw
-       *          The size of block in terms of uncompressed bytes.
-       * @param offsetStart
-       *          The start offset of the block.
-       * @param offsetEnd
-       *          One byte after the end of the block. Compressed block size is offsetEnd
-
-       *          offsetStart.
-       */
-      void register(long raw, long offsetStart, long offsetEnd);
+    public long getLength() {
+      return this.length;
     }
 
     /**
@@ -132,7 +120,7 @@ public final class BCFile {
       private final Algorithm compressAlgo;
       private Compressor compressor; // !null only if using native
       // Hadoop compression
-      private final PositionedDataOutputStream fsOut;
+      private final RateLimitedOutputStream fsOut;
       private final OutputStream cipherOut;
       private final long posStart;
       private final SimpleBufferedOutputStream fsBufferedOutput;
@@ -144,7 +132,7 @@ public final class BCFile {
        * @param cryptoModule
        *          the module to use to obtain cryptographic streams
        */
-      public WBlockState(Algorithm compressionAlgo, PositionedDataOutputStream fsOut,
+      public WBlockState(Algorithm compressionAlgo, RateLimitedOutputStream fsOut,
           BytesWritable fsOutputBuffer, Configuration conf, CryptoModule cryptoModule,
           CryptoModuleParameters cryptoParams) throws IOException {
         this.compressAlgo = compressionAlgo;
@@ -269,21 +257,27 @@ public final class BCFile {
      *
      */
     public class BlockAppender extends DataOutputStream {
-      private final BlockRegister blockRegister;
+      private final MetaBlockRegister metaBlockRegister;
       private final WBlockState wBlkState;
       private boolean closed = false;
 
       /**
        * Constructor
        *
-       * @param register
+       * @param metaBlockRegister
        *          the block register, which is called when the block is closed.
        * @param wbs
        *          The writable compression block state.
        */
-      BlockAppender(BlockRegister register, WBlockState wbs) {
+      BlockAppender(MetaBlockRegister metaBlockRegister, WBlockState wbs) {
         super(wbs.getOutputStream());
-        this.blockRegister = register;
+        this.metaBlockRegister = metaBlockRegister;
+        this.wBlkState = wbs;
+      }
+
+      BlockAppender(WBlockState wbs) {
+        super(wbs.getOutputStream());
+        this.metaBlockRegister = null;
         this.wBlkState = wbs;
       }
 
@@ -334,7 +328,9 @@ public final class BCFile {
         try {
           ++errorCount;
           wBlkState.finish();
-          blockRegister.register(getRawSize(), wBlkState.getStartPos(), wBlkState.getCurrentPos());
+          if (metaBlockRegister != null)
+            metaBlockRegister.register(getRawSize(), wBlkState.getStartPos(),
+                wBlkState.getCurrentPos());
           --errorCount;
         } finally {
           closed = true;
@@ -352,16 +348,15 @@ public final class BCFile {
      *          Name of the compression algorithm, which will be used for all data blocks.
      * @see Compression#getSupportedAlgorithms
      */
-    public <OutputStreamType extends OutputStream & PositionedOutput> Writer(OutputStreamType
fout,
-        String compressionName, Configuration conf, boolean trackDataBlocks,
-        AccumuloConfiguration accumuloConfiguration) throws IOException {
-      if (fout.position() != 0) {
+    public Writer(FSDataOutputStream fout, RateLimiter writeLimiter, String compressionName,
+        Configuration conf, AccumuloConfiguration accumuloConfiguration) throws IOException
{
+      if (fout.getPos() != 0) {
         throw new IOException("Output file not at zero offset.");
       }
 
-      this.out = new PositionedDataOutputStream(fout);
+      this.out = new RateLimitedOutputStream(fout, writeLimiter);
       this.conf = conf;
-      dataIndex = new DataIndex(compressionName, trackDataBlocks);
+      dataIndex = new DataIndex(compressionName);
       metaIndex = new MetaIndex();
       fsOutputBuffer = new BytesWritable();
       Magic.write(this.out);
@@ -424,6 +419,8 @@ public final class BCFile {
 
           Magic.write(out);
           out.flush();
+          length = out.position();
+          out.close();
         }
       } finally {
         closed = true;
@@ -485,11 +482,9 @@ public final class BCFile {
         throw new IllegalStateException("Cannot create Data Block after Meta Blocks.");
       }
 
-      DataBlockRegister dbr = new DataBlockRegister();
-
       WBlockState wbs = new WBlockState(getDefaultCompressionAlgorithm(), out, fsOutputBuffer,
conf,
           cryptoModule, cryptoParams);
-      BlockAppender ba = new BlockAppender(dbr, wbs);
+      BlockAppender ba = new BlockAppender(wbs);
       blkInProgress = true;
       return ba;
     }
@@ -497,7 +492,7 @@ public final class BCFile {
     /**
      * Callback to make sure a meta block is added to the internal list when its stream is
closed.
      */
-    private class MetaBlockRegister implements BlockRegister {
+    private class MetaBlockRegister {
       private final String name;
       private final Algorithm compressAlgo;
 
@@ -506,27 +501,11 @@ public final class BCFile {
         this.compressAlgo = compressAlgo;
       }
 
-      @Override
       public void register(long raw, long begin, long end) {
         metaIndex.addEntry(
             new MetaIndexEntry(name, compressAlgo, new BlockRegion(begin, end - begin, raw)));
       }
     }
-
-    /**
-     * Callback to make sure a data block is added to the internal list when it's being closed.
-     *
-     */
-    private class DataBlockRegister implements BlockRegister {
-      DataBlockRegister() {
-        // do nothing
-      }
-
-      @Override
-      public void register(long raw, long begin, long end) {
-        dataIndex.addBlockRegion(new BlockRegion(begin, end - begin, raw));
-      }
-    }
   }
 
   // sha256 of some random data
@@ -1083,8 +1062,6 @@ public final class BCFile {
     // and raw size
     private final ArrayList<BlockRegion> listRegions;
 
-    private boolean trackBlocks;
-
     // for read, deserialized from a file
     public DataIndex(DataInput in) throws IOException {
       defaultCompressionAlgorithm = Compression.getCompressionAlgorithmByName(Utils.readString(in));
@@ -1099,8 +1076,7 @@ public final class BCFile {
     }
 
     // for write
-    public DataIndex(String defaultCompressionAlgorithmName, boolean trackBlocks) {
-      this.trackBlocks = trackBlocks;
+    public DataIndex(String defaultCompressionAlgorithmName) {
       this.defaultCompressionAlgorithm = Compression
           .getCompressionAlgorithmByName(defaultCompressionAlgorithmName);
       listRegions = new ArrayList<>();
@@ -1114,11 +1090,6 @@ public final class BCFile {
       return listRegions;
     }
 
-    public void addBlockRegion(BlockRegion region) {
-      if (trackBlocks)
-        listRegions.add(region);
-    }
-
     public void write(DataOutput out) throws IOException {
       Utils.writeString(out, defaultCompressionAlgorithm.getName());
 
diff --git a/core/src/main/java/org/apache/accumulo/core/file/streams/PositionedDataOutputStream.java
b/core/src/main/java/org/apache/accumulo/core/file/streams/PositionedDataOutputStream.java
deleted file mode 100644
index 419e6b3..0000000
--- a/core/src/main/java/org/apache/accumulo/core/file/streams/PositionedDataOutputStream.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.accumulo.core.file.streams;
-
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.io.OutputStream;
-
-/**
- * A filter converting a {@link PositionedOutput} {@code OutputStream} to a {@code PositionedOutput}
- * {@code DataOutputStream}
- */
-public class PositionedDataOutputStream extends DataOutputStream implements PositionedOutput
{
-  public <StreamType extends OutputStream & PositionedOutput> PositionedDataOutputStream(
-      StreamType type) {
-    super(type);
-  }
-
-  @Override
-  public long position() throws IOException {
-    return ((PositionedOutput) out).position();
-  }
-}
diff --git a/core/src/main/java/org/apache/accumulo/core/file/streams/PositionedOutput.java
b/core/src/main/java/org/apache/accumulo/core/file/streams/PositionedOutput.java
deleted file mode 100644
index aa3122d..0000000
--- a/core/src/main/java/org/apache/accumulo/core/file/streams/PositionedOutput.java
+++ /dev/null
@@ -1,27 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.accumulo.core.file.streams;
-
-import java.io.IOException;
-
-/**
- * For any byte sink (but especially OutputStream), the ability to report how many bytes
have been
- * sunk.
- */
-public interface PositionedOutput {
-  long position() throws IOException;
-}
diff --git a/core/src/main/java/org/apache/accumulo/core/file/streams/PositionedOutputs.java
b/core/src/main/java/org/apache/accumulo/core/file/streams/PositionedOutputs.java
deleted file mode 100644
index 403c1a2..0000000
--- a/core/src/main/java/org/apache/accumulo/core/file/streams/PositionedOutputs.java
+++ /dev/null
@@ -1,73 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.accumulo.core.file.streams;
-
-import java.io.FilterOutputStream;
-import java.io.IOException;
-import java.io.OutputStream;
-import java.util.Objects;
-
-import org.apache.hadoop.fs.FSDataOutputStream;
-
-/**
- * Utility functions for {@link PositionedOutput}.
- */
-public class PositionedOutputs {
-  private PositionedOutputs() {}
-
-  /**
-   * Convert an {@code OutputStream} into an {@code OutputStream} implementing
-   * {@link PositionedOutput}.
-   */
-  public static PositionedOutputStream wrap(final OutputStream fout) {
-    Objects.requireNonNull(fout);
-    if (fout instanceof FSDataOutputStream) {
-      return new PositionedOutputStream(fout) {
-        @Override
-        public long position() throws IOException {
-          return ((FSDataOutputStream) fout).getPos();
-        }
-      };
-    } else if (fout instanceof PositionedOutput) {
-      return new PositionedOutputStream(fout) {
-        @Override
-        public long position() throws IOException {
-          return ((PositionedOutput) fout).position();
-        }
-      };
-    } else {
-      return new PositionedOutputStream(fout) {
-        @Override
-        public long position() throws IOException {
-          throw new UnsupportedOperationException("Underlying stream does not support position()");
-        }
-      };
-    }
-  }
-
-  private static abstract class PositionedOutputStream extends FilterOutputStream
-      implements PositionedOutput {
-    public PositionedOutputStream(OutputStream stream) {
-      super(stream);
-    }
-
-    @Override
-    public void write(byte[] data, int off, int len) throws IOException {
-      out.write(data, off, len);
-    }
-  }
-}
diff --git a/core/src/main/java/org/apache/accumulo/core/file/streams/RateLimitedOutputStream.java
b/core/src/main/java/org/apache/accumulo/core/file/streams/RateLimitedOutputStream.java
index 417b89c..b83b898 100644
--- a/core/src/main/java/org/apache/accumulo/core/file/streams/RateLimitedOutputStream.java
+++ b/core/src/main/java/org/apache/accumulo/core/file/streams/RateLimitedOutputStream.java
@@ -16,21 +16,22 @@
  */
 package org.apache.accumulo.core.file.streams;
 
-import java.io.FilterOutputStream;
+import java.io.DataOutputStream;
 import java.io.IOException;
-import java.io.OutputStream;
 
 import org.apache.accumulo.core.util.ratelimit.NullRateLimiter;
 import org.apache.accumulo.core.util.ratelimit.RateLimiter;
+import org.apache.hadoop.fs.FSDataOutputStream;
 
 /**
  * A decorator for {@code OutputStream} which limits the rate at which data may be written.
+ * Underlying OutputStream is a FSDataOutputStream.
  */
-public class RateLimitedOutputStream extends FilterOutputStream implements PositionedOutput
{
+public class RateLimitedOutputStream extends DataOutputStream {
   private final RateLimiter writeLimiter;
 
-  public RateLimitedOutputStream(OutputStream wrappedStream, RateLimiter writeLimiter) {
-    super(PositionedOutputs.wrap(wrappedStream));
+  public RateLimitedOutputStream(FSDataOutputStream fsDataOutputStream, RateLimiter writeLimiter)
{
+    super(fsDataOutputStream);
     this.writeLimiter = writeLimiter == null ? NullRateLimiter.INSTANCE : writeLimiter;
   }
 
@@ -51,8 +52,7 @@ public class RateLimitedOutputStream extends FilterOutputStream implements
Posit
     out.close();
   }
 
-  @Override
   public long position() throws IOException {
-    return ((PositionedOutput) out).position();
+    return ((FSDataOutputStream) out).getPos();
   }
 }
diff --git a/core/src/test/java/org/apache/accumulo/core/file/rfile/CreateCompatTestFile.java
b/core/src/test/java/org/apache/accumulo/core/file/rfile/CreateCompatTestFile.java
index 8e09355..086e8b9 100644
--- a/core/src/test/java/org/apache/accumulo/core/file/rfile/CreateCompatTestFile.java
+++ b/core/src/test/java/org/apache/accumulo/core/file/rfile/CreateCompatTestFile.java
@@ -24,7 +24,7 @@ import org.apache.accumulo.core.data.ArrayByteSequence;
 import org.apache.accumulo.core.data.ByteSequence;
 import org.apache.accumulo.core.data.Key;
 import org.apache.accumulo.core.data.Value;
-import org.apache.accumulo.core.file.blockfile.impl.CachableBlockFile;
+import org.apache.accumulo.core.file.rfile.bcfile.BCFile;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -56,8 +56,8 @@ public class CreateCompatTestFile {
   public static void main(String[] args) throws Exception {
     Configuration conf = new Configuration();
     FileSystem fs = FileSystem.get(conf);
-    CachableBlockFile.Writer _cbw = new CachableBlockFile.Writer(fs, new Path(args[0]), "gz",
null,
-        conf, DefaultConfiguration.getInstance());
+    BCFile.Writer _cbw = new BCFile.Writer(fs.create(new Path(args[0])), null, "gz", conf,
+        DefaultConfiguration.getInstance());
     RFile.Writer writer = new RFile.Writer(_cbw, 1000);
 
     writer.startNewLocalityGroup("lg1",
diff --git a/core/src/test/java/org/apache/accumulo/core/file/rfile/MultiLevelIndexTest.java
b/core/src/test/java/org/apache/accumulo/core/file/rfile/MultiLevelIndexTest.java
index 656e7da..dd316a4 100644
--- a/core/src/test/java/org/apache/accumulo/core/file/rfile/MultiLevelIndexTest.java
+++ b/core/src/test/java/org/apache/accumulo/core/file/rfile/MultiLevelIndexTest.java
@@ -31,7 +31,6 @@ import org.apache.accumulo.core.file.rfile.MultiLevelIndex.Reader.IndexIterator;
 import org.apache.accumulo.core.file.rfile.MultiLevelIndex.Writer;
 import org.apache.accumulo.core.file.rfile.RFileTest.SeekableByteArrayInputStream;
 import org.apache.accumulo.core.file.rfile.bcfile.BCFile;
-import org.apache.accumulo.core.file.streams.PositionedOutputs;
 import org.apache.accumulo.core.util.CachedConfiguration;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FSDataOutputStream;
@@ -56,8 +55,8 @@ public class MultiLevelIndexTest extends TestCase {
     AccumuloConfiguration aconf = DefaultConfiguration.getInstance();
     ByteArrayOutputStream baos = new ByteArrayOutputStream();
     FSDataOutputStream dos = new FSDataOutputStream(baos, new FileSystem.Statistics("a"));
-    CachableBlockFile.Writer _cbw = new CachableBlockFile.Writer(PositionedOutputs.wrap(dos),
"gz",
-        CachedConfiguration.getInstance(), aconf);
+    BCFile.Writer _cbw = new BCFile.Writer(dos, null, "gz", CachedConfiguration.getInstance(),
+        aconf);
 
     BufferedWriter mliw = new BufferedWriter(new Writer(_cbw, maxBlockSize));
 
diff --git a/core/src/test/java/org/apache/accumulo/core/file/rfile/MultiThreadedRFileTest.java
b/core/src/test/java/org/apache/accumulo/core/file/rfile/MultiThreadedRFileTest.java
index ee7b02b..ca19a1f 100644
--- a/core/src/test/java/org/apache/accumulo/core/file/rfile/MultiThreadedRFileTest.java
+++ b/core/src/test/java/org/apache/accumulo/core/file/rfile/MultiThreadedRFileTest.java
@@ -46,7 +46,7 @@ import org.apache.accumulo.core.data.Value;
 import org.apache.accumulo.core.file.FileSKVIterator;
 import org.apache.accumulo.core.file.blockfile.impl.CachableBlockFile;
 import org.apache.accumulo.core.file.rfile.RFile.Reader;
-import org.apache.accumulo.core.file.streams.PositionedOutputs;
+import org.apache.accumulo.core.file.rfile.bcfile.BCFile;
 import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
 import org.apache.accumulo.core.iterators.system.ColumnFamilySkippingIterator;
 import org.apache.accumulo.core.sample.impl.SamplerConfigurationImpl;
@@ -147,8 +147,7 @@ public class MultiThreadedRFileTest {
       FileSystem fs = FileSystem.newInstance(conf);
       Path path = new Path("file://" + rfile);
       dos = fs.create(path, true);
-      CachableBlockFile.Writer _cbw = new CachableBlockFile.Writer(PositionedOutputs.wrap(dos),
-          "gz", conf, accumuloConfiguration);
+      BCFile.Writer _cbw = new BCFile.Writer(dos, null, "gz", conf, accumuloConfiguration);
       SamplerConfigurationImpl samplerConfig = SamplerConfigurationImpl
           .newSamplerConfig(accumuloConfiguration);
       Sampler sampler = null;
diff --git a/core/src/test/java/org/apache/accumulo/core/file/rfile/RFileTest.java b/core/src/test/java/org/apache/accumulo/core/file/rfile/RFileTest.java
index f968b21..e23fbd7 100644
--- a/core/src/test/java/org/apache/accumulo/core/file/rfile/RFileTest.java
+++ b/core/src/test/java/org/apache/accumulo/core/file/rfile/RFileTest.java
@@ -66,7 +66,7 @@ import org.apache.accumulo.core.file.blockfile.cache.lru.LruBlockCache;
 import org.apache.accumulo.core.file.blockfile.cache.lru.LruBlockCacheManager;
 import org.apache.accumulo.core.file.blockfile.impl.CachableBlockFile;
 import org.apache.accumulo.core.file.rfile.RFile.Reader;
-import org.apache.accumulo.core.file.streams.PositionedOutputs;
+import org.apache.accumulo.core.file.rfile.bcfile.BCFile;
 import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
 import org.apache.accumulo.core.iterators.system.ColumnFamilySkippingIterator;
 import org.apache.accumulo.core.metadata.MetadataTable;
@@ -230,8 +230,7 @@ public class RFileTest {
     public void openWriter(boolean startDLG, int blockSize) throws IOException {
       baos = new ByteArrayOutputStream();
       dos = new FSDataOutputStream(baos, new FileSystem.Statistics("a"));
-      CachableBlockFile.Writer _cbw = new CachableBlockFile.Writer(PositionedOutputs.wrap(dos),
-          "gz", conf, accumuloConfiguration);
+      BCFile.Writer _cbw = new BCFile.Writer(dos, null, "gz", conf, accumuloConfiguration);
 
       SamplerConfigurationImpl samplerConfig = SamplerConfigurationImpl
           .newSamplerConfig(accumuloConfiguration);
diff --git a/core/src/test/java/org/apache/accumulo/core/file/streams/RateLimitedOutputStreamTest.java
b/core/src/test/java/org/apache/accumulo/core/file/streams/RateLimitedOutputStreamTest.java
index 18d229b..fbcc164 100644
--- a/core/src/test/java/org/apache/accumulo/core/file/streams/RateLimitedOutputStreamTest.java
+++ b/core/src/test/java/org/apache/accumulo/core/file/streams/RateLimitedOutputStreamTest.java
@@ -16,10 +16,10 @@
  */
 package org.apache.accumulo.core.file.streams;
 
-import java.io.FilterOutputStream;
 import java.io.IOException;
 import java.util.Random;
 
+import org.apache.hadoop.fs.FSDataOutputStream;
 import org.junit.Assert;
 import org.junit.Test;
 
@@ -45,14 +45,9 @@ public class RateLimitedOutputStreamTest {
     Assert.assertEquals(bytesWritten, rateLimiter.getPermitsAcquired());
   }
 
-  public static class NullOutputStream extends FilterOutputStream implements PositionedOutput
{
-    public NullOutputStream() {
-      super(new CountingOutputStream(ByteStreams.nullOutputStream()));
-    }
-
-    @Override
-    public long position() throws IOException {
-      return ((CountingOutputStream) out).getCount();
+  public static class NullOutputStream extends FSDataOutputStream {
+    public NullOutputStream() throws IOException {
+      super(new CountingOutputStream(ByteStreams.nullOutputStream()), null);
     }
   }
 

-- 
To stop receiving notification emails like this one, please contact
mmiller@apache.org.

Mime
View raw message