accumulo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ctubb...@apache.org
Subject [24/61] [abbrv] [partial] accumulo git commit: ACCUMULO-722 put trunk in my sandbox
Date Thu, 03 Mar 2016 21:59:49 GMT
http://git-wip-us.apache.org/repos/asf/accumulo/blob/7bdbfccb/1.5/core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/Compression.java
----------------------------------------------------------------------
diff --git a/1.5/core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/Compression.java
b/1.5/core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/Compression.java
new file mode 100644
index 0000000..1aab4d5
--- /dev/null
+++ b/1.5/core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/Compression.java
@@ -0,0 +1,383 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.accumulo.core.file.rfile.bcfile;
+
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
+import java.io.FilterOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.ArrayList;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.compress.CodecPool;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.CompressionInputStream;
+import org.apache.hadoop.io.compress.CompressionOutputStream;
+import org.apache.hadoop.io.compress.Compressor;
+import org.apache.hadoop.io.compress.Decompressor;
+import org.apache.hadoop.io.compress.DefaultCodec;
+import org.apache.hadoop.util.ReflectionUtils;
+
+/**
+ * Compression related stuff.
+ */
+final class Compression {
+  static final Log LOG = LogFactory.getLog(Compression.class);
+  
+  /**
+   * Prevent the instantiation of class.
+   */
+  private Compression() {
+    // nothing
+  }
+  
+  static class FinishOnFlushCompressionStream extends FilterOutputStream {
+    public FinishOnFlushCompressionStream(CompressionOutputStream cout) {
+      super(cout);
+    }
+    
+    @Override
+    public void write(byte b[], int off, int len) throws IOException {
+      out.write(b, off, len);
+    }
+    
+    @Override
+    public void flush() throws IOException {
+      CompressionOutputStream cout = (CompressionOutputStream) out;
+      cout.finish();
+      cout.flush();
+      cout.resetState();
+    }
+  }
+  
+  /**
+   * Compression algorithms.
+   */
+  static enum Algorithm {
+    LZO(TFile.COMPRESSION_LZO) {
+      private transient boolean checked = false;
+      private static final String defaultClazz = "org.apache.hadoop.io.compress.LzoCodec";
+      private transient CompressionCodec codec = null;
+      
+      @Override
+      public synchronized boolean isSupported() {
+        if (!checked) {
+          checked = true;
+          String extClazz = (conf.get(CONF_LZO_CLASS) == null ? System.getProperty(CONF_LZO_CLASS)
: null);
+          String clazz = (extClazz != null) ? extClazz : defaultClazz;
+          try {
+            LOG.info("Trying to load Lzo codec class: " + clazz);
+            codec = (CompressionCodec) ReflectionUtils.newInstance(Class.forName(clazz),
conf);
+          } catch (ClassNotFoundException e) {
+            // that is okay
+          }
+        }
+        return codec != null;
+      }
+      
+      @Override
+      CompressionCodec getCodec() throws IOException {
+        if (!isSupported()) {
+          throw new IOException("LZO codec class not specified. Did you forget to set property
" + CONF_LZO_CLASS + "?");
+        }
+        
+        return codec;
+      }
+      
+      @Override
+      public synchronized InputStream createDecompressionStream(InputStream downStream, Decompressor
decompressor, int downStreamBufferSize) throws IOException {
+        if (!isSupported()) {
+          throw new IOException("LZO codec class not specified. Did you forget to set property
" + CONF_LZO_CLASS + "?");
+        }
+        InputStream bis1 = null;
+        if (downStreamBufferSize > 0) {
+          bis1 = new BufferedInputStream(downStream, downStreamBufferSize);
+        } else {
+          bis1 = downStream;
+        }
+        conf.setInt("io.compression.codec.lzo.buffersize", 64 * 1024);
+        CompressionInputStream cis = codec.createInputStream(bis1, decompressor);
+        BufferedInputStream bis2 = new BufferedInputStream(cis, DATA_IBUF_SIZE);
+        return bis2;
+      }
+      
+      @Override
+      public synchronized OutputStream createCompressionStream(OutputStream downStream, Compressor
compressor, int downStreamBufferSize) throws IOException {
+        if (!isSupported()) {
+          throw new IOException("LZO codec class not specified. Did you forget to set property
" + CONF_LZO_CLASS + "?");
+        }
+        OutputStream bos1 = null;
+        if (downStreamBufferSize > 0) {
+          bos1 = new BufferedOutputStream(downStream, downStreamBufferSize);
+        } else {
+          bos1 = downStream;
+        }
+        conf.setInt("io.compression.codec.lzo.buffersize", 64 * 1024);
+        CompressionOutputStream cos = codec.createOutputStream(bos1, compressor);
+        BufferedOutputStream bos2 = new BufferedOutputStream(new FinishOnFlushCompressionStream(cos),
DATA_OBUF_SIZE);
+        return bos2;
+      }
+    },
+    
+    GZ(TFile.COMPRESSION_GZ) {
+      private transient DefaultCodec codec;
+      
+      @Override
+      synchronized CompressionCodec getCodec() {
+        if (codec == null) {
+          codec = new DefaultCodec();
+          codec.setConf(conf);
+        }
+        
+        return codec;
+      }
+      
+      @Override
+      public synchronized InputStream createDecompressionStream(InputStream downStream, Decompressor
decompressor, int downStreamBufferSize) throws IOException {
+        // Set the internal buffer size to read from down stream.
+        if (downStreamBufferSize > 0) {
+          codec.getConf().setInt("io.file.buffer.size", downStreamBufferSize);
+        }
+        CompressionInputStream cis = codec.createInputStream(downStream, decompressor);
+        BufferedInputStream bis2 = new BufferedInputStream(cis, DATA_IBUF_SIZE);
+        return bis2;
+      }
+      
+      @Override
+      public synchronized OutputStream createCompressionStream(OutputStream downStream, Compressor
compressor, int downStreamBufferSize) throws IOException {
+        OutputStream bos1 = null;
+        if (downStreamBufferSize > 0) {
+          bos1 = new BufferedOutputStream(downStream, downStreamBufferSize);
+        } else {
+          bos1 = downStream;
+        }
+        codec.getConf().setInt("io.file.buffer.size", 32 * 1024);
+        CompressionOutputStream cos = codec.createOutputStream(bos1, compressor);
+        BufferedOutputStream bos2 = new BufferedOutputStream(new FinishOnFlushCompressionStream(cos),
DATA_OBUF_SIZE);
+        return bos2;
+      }
+      
+      @Override
+      public boolean isSupported() {
+        return true;
+      }
+    },
+    
+    NONE(TFile.COMPRESSION_NONE) {
+      @Override
+      CompressionCodec getCodec() {
+        return null;
+      }
+      
+      @Override
+      public synchronized InputStream createDecompressionStream(InputStream downStream, Decompressor
decompressor, int downStreamBufferSize) throws IOException {
+        if (downStreamBufferSize > 0) {
+          return new BufferedInputStream(downStream, downStreamBufferSize);
+        }
+        return downStream;
+      }
+      
+      @Override
+      public synchronized OutputStream createCompressionStream(OutputStream downStream, Compressor
compressor, int downStreamBufferSize) throws IOException {
+        if (downStreamBufferSize > 0) {
+          return new BufferedOutputStream(downStream, downStreamBufferSize);
+        }
+        
+        return downStream;
+      }
+      
+      @Override
+      public boolean isSupported() {
+        return true;
+      }
+    },
+    
+    SNAPPY(TFile.COMPRESSION_SNAPPY) {
+      // Use base type to avoid compile-time dependencies.
+      private transient CompressionCodec snappyCodec = null;
+      private transient boolean checked = false;
+      private static final String defaultClazz = "org.apache.hadoop.io.compress.SnappyCodec";
+      
+      public CompressionCodec getCodec() throws IOException {
+        if (!isSupported()) {
+          throw new IOException("SNAPPY codec class not specified. Did you forget to set
property " + CONF_SNAPPY_CLASS + "?");
+        }
+        return snappyCodec;
+      }
+      
+      @Override
+      public synchronized OutputStream createCompressionStream(OutputStream downStream, Compressor
compressor, int downStreamBufferSize) throws IOException {
+        
+        if (!isSupported()) {
+          throw new IOException("SNAPPY codec class not specified. Did you forget to set
property " + CONF_SNAPPY_CLASS + "?");
+        }
+        OutputStream bos1 = null;
+        if (downStreamBufferSize > 0) {
+          bos1 = new BufferedOutputStream(downStream, downStreamBufferSize);
+        } else {
+          bos1 = downStream;
+        }
+        conf.setInt("io.compression.codec.snappy.buffersize", 64 * 1024);
+        CompressionOutputStream cos = snappyCodec.createOutputStream(bos1, compressor);
+        BufferedOutputStream bos2 = new BufferedOutputStream(new FinishOnFlushCompressionStream(cos),
DATA_OBUF_SIZE);
+        return bos2;
+      }
+      
+      @Override
+      public synchronized InputStream createDecompressionStream(InputStream downStream, Decompressor
decompressor, int downStreamBufferSize) throws IOException {
+        if (!isSupported()) {
+          throw new IOException("SNAPPY codec class not specified. Did you forget to set
property " + CONF_SNAPPY_CLASS + "?");
+        }
+        if (downStreamBufferSize > 0) {
+          conf.setInt("io.file.buffer.size", downStreamBufferSize);
+        }
+        CompressionInputStream cis = snappyCodec.createInputStream(downStream, decompressor);
+        BufferedInputStream bis2 = new BufferedInputStream(cis, DATA_IBUF_SIZE);
+        return bis2;
+      }
+      
+      @Override
+      public synchronized boolean isSupported() {
+        if (!checked) {
+          checked = true;
+          String extClazz = (conf.get(CONF_SNAPPY_CLASS) == null ? System.getProperty(CONF_SNAPPY_CLASS)
: null);
+          String clazz = (extClazz != null) ? extClazz : defaultClazz;
+          try {
+            LOG.info("Trying to load snappy codec class: " + clazz);
+            snappyCodec = (CompressionCodec) ReflectionUtils.newInstance(Class.forName(clazz),
conf);
+          } catch (ClassNotFoundException e) {
+            // that is okay
+          }
+        }
+        return snappyCodec != null;
+      }
+    };
+    // We require that all compression related settings are configured
+    // statically in the Configuration object.
+    protected static final Configuration conf = new Configuration();
+    private final String compressName;
+    // data input buffer size to absorb small reads from application.
+    private static final int DATA_IBUF_SIZE = 1 * 1024;
+    // data output buffer size to absorb small writes from application.
+    private static final int DATA_OBUF_SIZE = 4 * 1024;
+    public static final String CONF_LZO_CLASS = "io.compression.codec.lzo.class";
+    public static final String CONF_SNAPPY_CLASS = "io.compression.codec.snappy.class";
+    
+    Algorithm(String name) {
+      this.compressName = name;
+    }
+    
+    abstract CompressionCodec getCodec() throws IOException;
+    
+    public abstract InputStream createDecompressionStream(InputStream downStream, Decompressor
decompressor, int downStreamBufferSize) throws IOException;
+    
+    public abstract OutputStream createCompressionStream(OutputStream downStream, Compressor
compressor, int downStreamBufferSize) throws IOException;
+    
+    public abstract boolean isSupported();
+    
+    public Compressor getCompressor() throws IOException {
+      CompressionCodec codec = getCodec();
+      if (codec != null) {
+        Compressor compressor = CodecPool.getCompressor(codec);
+        if (compressor != null) {
+          if (compressor.finished()) {
+            // Somebody returns the compressor to CodecPool but is still using
+            // it.
+            LOG.warn("Compressor obtained from CodecPool already finished()");
+          } else {
+            LOG.debug("Got a compressor: " + compressor.hashCode());
+          }
+          /**
+           * Following statement is necessary to get around bugs in 0.18 where a compressor
is referenced after returned back to the codec pool.
+           */
+          compressor.reset();
+        }
+        return compressor;
+      }
+      return null;
+    }
+    
+    public void returnCompressor(Compressor compressor) {
+      if (compressor != null) {
+        LOG.debug("Return a compressor: " + compressor.hashCode());
+        CodecPool.returnCompressor(compressor);
+      }
+    }
+    
+    public Decompressor getDecompressor() throws IOException {
+      CompressionCodec codec = getCodec();
+      if (codec != null) {
+        Decompressor decompressor = CodecPool.getDecompressor(codec);
+        if (decompressor != null) {
+          if (decompressor.finished()) {
+            // Somebody returns the decompressor to CodecPool but is still using
+            // it.
+            LOG.warn("Decompressor obtained from CodecPool already finished()");
+          } else {
+            LOG.debug("Got a decompressor: " + decompressor.hashCode());
+          }
+          /**
+           * Following statement is necessary to get around bugs in 0.18 where a decompressor
is referenced after returned back to the codec pool.
+           */
+          decompressor.reset();
+        }
+        return decompressor;
+      }
+      
+      return null;
+    }
+    
+    public void returnDecompressor(Decompressor decompressor) {
+      if (decompressor != null) {
+        LOG.debug("Returned a decompressor: " + decompressor.hashCode());
+        CodecPool.returnDecompressor(decompressor);
+      }
+    }
+    
+    public String getName() {
+      return compressName;
+    }
+  }
+  
+  static Algorithm getCompressionAlgorithmByName(String compressName) {
+    Algorithm[] algos = Algorithm.class.getEnumConstants();
+    
+    for (Algorithm a : algos) {
+      if (a.getName().equals(compressName)) {
+        return a;
+      }
+    }
+    
+    throw new IllegalArgumentException("Unsupported compression algorithm name: " + compressName);
+  }
+  
+  static String[] getSupportedAlgorithms() {
+    Algorithm[] algos = Algorithm.class.getEnumConstants();
+    
+    ArrayList<String> ret = new ArrayList<String>();
+    for (Algorithm a : algos) {
+      if (a.isSupported()) {
+        ret.add(a.getName());
+      }
+    }
+    return ret.toArray(new String[ret.size()]);
+  }
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/7bdbfccb/1.5/core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/MetaBlockAlreadyExists.java
----------------------------------------------------------------------
diff --git a/1.5/core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/MetaBlockAlreadyExists.java
b/1.5/core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/MetaBlockAlreadyExists.java
new file mode 100644
index 0000000..6f22431
--- /dev/null
+++ b/1.5/core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/MetaBlockAlreadyExists.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.accumulo.core.file.rfile.bcfile;
+
+import java.io.IOException;
+
+/**
+ * Exception - Meta Block with the same name already exists.
+ */
+@SuppressWarnings("serial")
+public class MetaBlockAlreadyExists extends IOException {
+  /**
+   * Constructor
+   * 
+   * @param s
+   *          message.
+   */
+  MetaBlockAlreadyExists(String s) {
+    super(s);
+  }
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/7bdbfccb/1.5/core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/MetaBlockDoesNotExist.java
----------------------------------------------------------------------
diff --git a/1.5/core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/MetaBlockDoesNotExist.java
b/1.5/core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/MetaBlockDoesNotExist.java
new file mode 100644
index 0000000..b703a18
--- /dev/null
+++ b/1.5/core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/MetaBlockDoesNotExist.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.accumulo.core.file.rfile.bcfile;
+
+import java.io.IOException;
+
+/**
+ * Exception - No such Meta Block with the given name.
+ */
+@SuppressWarnings("serial")
+public class MetaBlockDoesNotExist extends IOException {
+  /**
+   * Constructor
+   * 
+   * @param s
+   *          message.
+   */
+  MetaBlockDoesNotExist(String s) {
+    super(s);
+  }
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/7bdbfccb/1.5/core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/PrintInfo.java
----------------------------------------------------------------------
diff --git a/1.5/core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/PrintInfo.java
b/1.5/core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/PrintInfo.java
new file mode 100644
index 0000000..7fa4d79
--- /dev/null
+++ b/1.5/core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/PrintInfo.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.core.file.rfile.bcfile;
+
+import java.io.PrintStream;
+import java.util.Map.Entry;
+import java.util.Set;
+
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
+import org.apache.accumulo.core.file.FileUtil;
+import org.apache.accumulo.core.file.rfile.bcfile.BCFile.MetaIndexEntry;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+public class PrintInfo {
+  public static void printMetaBlockInfo(Configuration conf, FileSystem fs, Path path) throws
Exception {
+    FSDataInputStream fsin = fs.open(path);
+    BCFile.Reader bcfr = new BCFile.Reader(fsin, fs.getFileStatus(path).getLen(), conf);
+    
+    Set<Entry<String,MetaIndexEntry>> es = bcfr.metaIndex.index.entrySet();
+    
+    for (Entry<String,MetaIndexEntry> entry : es) {
+      PrintStream out = System.out;
+      out.println("Meta block     : " + entry.getKey());
+      out.println("      Raw size             : " + String.format("%,d", entry.getValue().getRegion().getRawSize())
+ " bytes");
+      out.println("      Compressed size      : " + String.format("%,d", entry.getValue().getRegion().getCompressedSize())
+ " bytes");
+      out.println("      Compression type     : " + entry.getValue().getCompressionAlgorithm().getName());
+      out.println();
+    }
+  }
+  
+  public static void main(String[] args) throws Exception {
+    Configuration conf = new Configuration();
+    @SuppressWarnings("deprecation")
+    // Not for client use
+    FileSystem fs = FileUtil.getFileSystem(conf, AccumuloConfiguration.getSiteConfiguration());
+    Path path = new Path(args[0]);
+    printMetaBlockInfo(conf, fs, path);
+  }
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/7bdbfccb/1.5/core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/RawComparable.java
----------------------------------------------------------------------
diff --git a/1.5/core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/RawComparable.java
b/1.5/core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/RawComparable.java
new file mode 100644
index 0000000..8f01914
--- /dev/null
+++ b/1.5/core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/RawComparable.java
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.accumulo.core.file.rfile.bcfile;
+
+import java.util.Collections;
+import java.util.Comparator;
+
+import org.apache.hadoop.io.RawComparator;
+
+/**
+ * Interface for objects that can be compared through {@link RawComparator}. This is useful
in places where we need a single object reference to specify a range
+ * of bytes in a byte array, such as {@link Comparable} or {@link Collections#binarySearch(java.util.List,
Object, Comparator)}
+ * 
+ * The actual comparison among RawComparable's requires an external RawComparator and it
is applications' responsibility to ensure two RawComparable are
+ * supposed to be semantically comparable with the same RawComparator.
+ */
+public interface RawComparable {
+  /**
+   * Get the underlying byte array.
+   * 
+   * @return The underlying byte array.
+   */
+  abstract byte[] buffer();
+  
+  /**
+   * Get the offset of the first byte in the byte array.
+   * 
+   * @return The offset of the first byte in the byte array.
+   */
+  abstract int offset();
+  
+  /**
+   * Get the size of the byte range in the byte array.
+   * 
+   * @return The size of the byte range in the byte array.
+   */
+  abstract int size();
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/7bdbfccb/1.5/core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/SimpleBufferedOutputStream.java
----------------------------------------------------------------------
diff --git a/1.5/core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/SimpleBufferedOutputStream.java
b/1.5/core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/SimpleBufferedOutputStream.java
new file mode 100644
index 0000000..6cd7752
--- /dev/null
+++ b/1.5/core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/SimpleBufferedOutputStream.java
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.accumulo.core.file.rfile.bcfile;
+
+import java.io.FilterOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+
+/**
+ * A simplified BufferedOutputStream with borrowed buffer, and allow users to see how much
data have been buffered.
+ */
+class SimpleBufferedOutputStream extends FilterOutputStream {
+  protected byte buf[]; // the borrowed buffer
+  protected int count = 0; // bytes used in buffer.
+  
+  // Constructor
+  public SimpleBufferedOutputStream(OutputStream out, byte[] buf) {
+    super(out);
+    this.buf = buf;
+  }
+  
+  private void flushBuffer() throws IOException {
+    if (count > 0) {
+      out.write(buf, 0, count);
+      count = 0;
+    }
+  }
+  
+  @Override
+  public void write(int b) throws IOException {
+    if (count >= buf.length) {
+      flushBuffer();
+    }
+    buf[count++] = (byte) b;
+  }
+  
+  @Override
+  public void write(byte b[], int off, int len) throws IOException {
+    if (len >= buf.length) {
+      flushBuffer();
+      out.write(b, off, len);
+      return;
+    }
+    if (len > buf.length - count) {
+      flushBuffer();
+    }
+    System.arraycopy(b, off, buf, count, len);
+    count += len;
+  }
+  
+  @Override
+  public synchronized void flush() throws IOException {
+    flushBuffer();
+    out.flush();
+  }
+  
+  // Get the size of internal buffer being used.
+  public int size() {
+    return count;
+  }
+}


Mime
View raw message