hive-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ser...@apache.org
Subject [1/2] hive git commit: HIVE-14453 : refactor physical writing of ORC data and metadata to FS from the logical writers (Sergey Shelukhin, reviewed by Prasanth Jayachandran)
Date Thu, 08 Dec 2016 19:07:37 GMT
Repository: hive
Updated Branches:
  refs/heads/master 84b7fc5bd -> 777477f25


HIVE-14453 : refactor physical writing of ORC data and metadata to FS from the logical writers (Sergey Shelukhin, reviewed by Prasanth Jayachandran)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/65d8fae0
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/65d8fae0
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/65d8fae0

Branch: refs/heads/master
Commit: 65d8fae07034e640f13e5b5c88db57204d4e0787
Parents: 84b7fc5
Author: Sergey Shelukhin <sershe@apache.org>
Authored: Thu Dec 8 11:04:42 2016 -0800
Committer: Sergey Shelukhin <sershe@apache.org>
Committed: Thu Dec 8 11:04:42 2016 -0800

----------------------------------------------------------------------
 .../llap/io/decode/OrcEncodedDataConsumer.java  |   3 +-
 orc/src/java/org/apache/orc/impl/OrcTail.java   |   2 +-
 .../org/apache/orc/impl/PhysicalFsWriter.java   | 529 ++++++++++++++++++
 .../org/apache/orc/impl/PhysicalWriter.java     | 123 +++++
 .../java/org/apache/orc/impl/ReaderImpl.java    |   6 +-
 .../org/apache/orc/impl/RecordReaderUtils.java  |   2 +-
 .../java/org/apache/orc/impl/WriterImpl.java    | 535 ++-----------------
 .../org/apache/orc/impl/TestOrcWideTable.java   |  12 +-
 .../hadoop/hive/ql/io/orc/WriterImpl.java       |   8 +
 .../hive/ql/io/orc/TestInputOutputFormat.java   |  25 +-
 10 files changed, 743 insertions(+), 502 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/65d8fae0/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcEncodedDataConsumer.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcEncodedDataConsumer.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcEncodedDataConsumer.java
index 2cb7f79..29f1ba8 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcEncodedDataConsumer.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcEncodedDataConsumer.java
@@ -49,6 +49,7 @@ import org.apache.hadoop.hive.ql.io.orc.encoded.Reader.OrcEncodedColumnBatch;
 import org.apache.hadoop.hive.ql.io.orc.RecordReaderImpl;
 import org.apache.orc.OrcUtils;
 import org.apache.orc.TypeDescription;
+import org.apache.orc.impl.PhysicalFsWriter;
 import org.apache.orc.impl.TreeReaderFactory;
 import org.apache.orc.impl.TreeReaderFactory.StructTreeReader;
 import org.apache.orc.impl.TreeReaderFactory.TreeReader;
@@ -85,7 +86,7 @@ public class OrcEncodedDataConsumer
     assert fileMetadata == null;
     fileMetadata = f;
     stripes = new OrcStripeMetadata[f.getStripes().size()];
-    codec = WriterImpl.createCodec(fileMetadata.getCompressionKind());
+    codec = PhysicalFsWriter.createCodec(fileMetadata.getCompressionKind());
   }
 
   public void setStripeMetadata(OrcStripeMetadata m) {

http://git-wip-us.apache.org/repos/asf/hive/blob/65d8fae0/orc/src/java/org/apache/orc/impl/OrcTail.java
----------------------------------------------------------------------
diff --git a/orc/src/java/org/apache/orc/impl/OrcTail.java b/orc/src/java/org/apache/orc/impl/OrcTail.java
index b5f85fb..f095603 100644
--- a/orc/src/java/org/apache/orc/impl/OrcTail.java
+++ b/orc/src/java/org/apache/orc/impl/OrcTail.java
@@ -87,7 +87,7 @@ public final class OrcTail {
   }
 
   public CompressionCodec getCompressionCodec() {
-    return WriterImpl.createCodec(getCompressionKind());
+    return PhysicalFsWriter.createCodec(getCompressionKind());
   }
 
   public int getCompressionBufferSize() {

http://git-wip-us.apache.org/repos/asf/hive/blob/65d8fae0/orc/src/java/org/apache/orc/impl/PhysicalFsWriter.java
----------------------------------------------------------------------
diff --git a/orc/src/java/org/apache/orc/impl/PhysicalFsWriter.java b/orc/src/java/org/apache/orc/impl/PhysicalFsWriter.java
new file mode 100644
index 0000000..ba8c13f
--- /dev/null
+++ b/orc/src/java/org/apache/orc/impl/PhysicalFsWriter.java
@@ -0,0 +1,529 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.orc.impl;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.EnumSet;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.orc.CompressionCodec;
+import org.apache.orc.CompressionCodec.Modifier;
+import org.apache.orc.CompressionKind;
+import org.apache.orc.OrcFile;
+import org.apache.orc.OrcFile.CompressionStrategy;
+import org.apache.orc.OrcProto;
+import org.apache.orc.OrcProto.BloomFilterIndex;
+import org.apache.orc.OrcProto.Footer;
+import org.apache.orc.OrcProto.Metadata;
+import org.apache.orc.OrcProto.PostScript;
+import org.apache.orc.OrcProto.Stream.Kind;
+import org.apache.orc.OrcProto.StripeFooter;
+import org.apache.orc.OrcProto.StripeInformation;
+import org.apache.orc.OrcProto.RowIndex.Builder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.protobuf.CodedOutputStream;
+
+public class PhysicalFsWriter implements PhysicalWriter {
+  private static final Logger LOG = LoggerFactory.getLogger(PhysicalFsWriter.class);
+
+  private static final int HDFS_BUFFER_SIZE = 256 * 1024;
+
+  private FSDataOutputStream rawWriter = null;
+  // the compressed metadata information outStream
+  private OutStream writer = null;
+  // a protobuf outStream around streamFactory
+  private CodedOutputStream protobufWriter = null;
+
+  private final FileSystem fs;
+  private final Path path;
+  private final long blockSize;
+  private final int bufferSize;
+  private final CompressionCodec codec;
+  private final double paddingTolerance;
+  private final long defaultStripeSize;
+  private final CompressionKind compress;
+  private final boolean addBlockPadding;
+  private final CompressionStrategy compressionStrategy;
+
+  // the streams that make up the current stripe
+  private final Map<StreamName, BufferedStream> streams =
+    new TreeMap<StreamName, BufferedStream>();
+
+  private long adjustedStripeSize;
+  private long headerLength;
+  private long stripeStart;
+  private int metadataLength;
+  private int footerLength;
+
+  public PhysicalFsWriter(FileSystem fs, Path path, int numColumns, OrcFile.WriterOptions opts) {
+    this.fs = fs;
+    this.path = path;
+    this.defaultStripeSize = this.adjustedStripeSize = opts.getStripeSize();
+    this.addBlockPadding = opts.getBlockPadding();
+    if (opts.isEnforceBufferSize()) {
+      this.bufferSize = opts.getBufferSize();
+    } else {
+      this.bufferSize = getEstimatedBufferSize(defaultStripeSize, numColumns, opts.getBufferSize());
+    }
+    this.compress = opts.getCompress();
+    this.compressionStrategy = opts.getCompressionStrategy();
+    codec = createCodec(compress);
+    this.paddingTolerance = opts.getPaddingTolerance();
+    this.blockSize = opts.getBlockSize();
+    LOG.info("ORC writer created for path: {} with stripeSize: {} blockSize: {}" +
+        " compression: {} bufferSize: {}", path, defaultStripeSize, blockSize,
+        compress, bufferSize);
+  }
+
+  @Override
+  public void initialize() throws IOException {
+    if (rawWriter != null) return;
+    rawWriter = fs.create(path, false, HDFS_BUFFER_SIZE,
+                          fs.getDefaultReplication(path), blockSize);
+    rawWriter.writeBytes(OrcFile.MAGIC);
+    headerLength = rawWriter.getPos();
+    writer = new OutStream("metadata", bufferSize, codec,
+                           new DirectStream(rawWriter));
+    protobufWriter = CodedOutputStream.newInstance(writer);
+  }
+
+  private void padStripe(long indexSize, long dataSize, int footerSize) throws IOException {
+    this.stripeStart = rawWriter.getPos();
+    final long currentStripeSize = indexSize + dataSize + footerSize;
+    final long available = blockSize - (stripeStart % blockSize);
+    final long overflow = currentStripeSize - adjustedStripeSize;
+    final float availRatio = (float) available / (float) defaultStripeSize;
+
+    if (availRatio > 0.0f && availRatio < 1.0f
+        && availRatio > paddingTolerance) {
+      // adjust default stripe size to fit into remaining space, also adjust
+      // the next stripe for correction based on the current stripe size
+      // and user specified padding tolerance. Since stripe size can overflow
+      // the default stripe size we should apply this correction to avoid
+      // writing portion of last stripe to next hdfs block.
+      double correction = overflow > 0 ? (double) overflow
+          / (double) adjustedStripeSize : 0.0;
+
+      // correction should not be greater than user specified padding
+      // tolerance
+      correction = correction > paddingTolerance ? paddingTolerance
+          : correction;
+
+      // adjust next stripe size based on current stripe estimate correction
+      adjustedStripeSize = (long) ((1.0f - correction) * (availRatio * defaultStripeSize));
+    } else if (availRatio >= 1.0) {
+      adjustedStripeSize = defaultStripeSize;
+    }
+
+    if (availRatio < paddingTolerance && addBlockPadding) {
+      long padding = blockSize - (stripeStart % blockSize);
+      byte[] pad = new byte[(int) Math.min(HDFS_BUFFER_SIZE, padding)];
+      LOG.info(String.format("Padding ORC by %d bytes (<=  %.2f * %d)", 
+          padding, availRatio, defaultStripeSize));
+      stripeStart += padding;
+      while (padding > 0) {
+        int writeLen = (int) Math.min(padding, pad.length);
+        rawWriter.write(pad, 0, writeLen);
+        padding -= writeLen;
+      }
+      adjustedStripeSize = defaultStripeSize;
+    } else if (currentStripeSize < blockSize
+        && (stripeStart % blockSize) + currentStripeSize > blockSize) {
+      // even if you don't pad, reset the default stripe size when crossing a
+      // block boundary
+      adjustedStripeSize = defaultStripeSize;
+    }
+  }
+
+  /**
+   * An output receiver that writes the ByteBuffers to the output stream
+   * as they are received.
+   */
+  private class DirectStream implements OutStream.OutputReceiver {
+    private final FSDataOutputStream output;
+
+    DirectStream(FSDataOutputStream output) {
+      this.output = output;
+    }
+
+    @Override
+    public void output(ByteBuffer buffer) throws IOException {
+      output.write(buffer.array(), buffer.arrayOffset() + buffer.position(), buffer.remaining());
+    }
+  }
+
+  @Override
+  public long getPhysicalStripeSize() {
+    return adjustedStripeSize;
+  }
+
+  @Override
+  public boolean isCompressed() {
+    return codec != null;
+  }
+
+
+  public static CompressionCodec createCodec(CompressionKind kind) {
+    switch (kind) {
+      case NONE:
+        return null;
+      case ZLIB:
+        return new ZlibCodec();
+      case SNAPPY:
+        return new SnappyCodec();
+      case LZO:
+        try {
+          ClassLoader loader = Thread.currentThread().getContextClassLoader();
+          if (loader == null) {
+            loader = WriterImpl.class.getClassLoader();
+          }
+          @SuppressWarnings("unchecked")
+          Class<? extends CompressionCodec> lzo =
+              (Class<? extends CompressionCodec>)
+              loader.loadClass("org.apache.hadoop.hive.ql.io.orc.LzoCodec");
+          return lzo.newInstance();
+        } catch (ClassNotFoundException e) {
+          throw new IllegalArgumentException("LZO is not available.", e);
+        } catch (InstantiationException e) {
+          throw new IllegalArgumentException("Problem initializing LZO", e);
+        } catch (IllegalAccessException e) {
+          throw new IllegalArgumentException("Insufficient access to LZO", e);
+        }
+      default:
+        throw new IllegalArgumentException("Unknown compression codec: " +
+            kind);
+    }
+  }
+
+  private void writeStripeFooter(StripeFooter footer, long dataSize, long indexSize,
+      StripeInformation.Builder dirEntry) throws IOException {
+    footer.writeTo(protobufWriter);
+    protobufWriter.flush();
+    writer.flush();
+    dirEntry.setOffset(stripeStart);
+    dirEntry.setFooterLength(rawWriter.getPos() - stripeStart - dataSize - indexSize);
+  }
+
+  @VisibleForTesting
+  public static int getEstimatedBufferSize(long stripeSize, int numColumns,
+                                           int bs) {
+    // The worst case is that there are 2 big streams per a column and
+    // we want to guarantee that each stream gets ~10 buffers.
+    // This keeps buffers small enough that we don't get really small stripe
+    // sizes.
+    int estBufferSize = (int) (stripeSize / (20 * numColumns));
+    estBufferSize = getClosestBufferSize(estBufferSize);
+    return estBufferSize > bs ? bs : estBufferSize;
+  }
+
+  private static int getClosestBufferSize(int estBufferSize) {
+    final int kb4 = 4 * 1024;
+    final int kb8 = 8 * 1024;
+    final int kb16 = 16 * 1024;
+    final int kb32 = 32 * 1024;
+    final int kb64 = 64 * 1024;
+    final int kb128 = 128 * 1024;
+    final int kb256 = 256 * 1024;
+    if (estBufferSize <= kb4) {
+      return kb4;
+    } else if (estBufferSize > kb4 && estBufferSize <= kb8) {
+      return kb8;
+    } else if (estBufferSize > kb8 && estBufferSize <= kb16) {
+      return kb16;
+    } else if (estBufferSize > kb16 && estBufferSize <= kb32) {
+      return kb32;
+    } else if (estBufferSize > kb32 && estBufferSize <= kb64) {
+      return kb64;
+    } else if (estBufferSize > kb64 && estBufferSize <= kb128) {
+      return kb128;
+    } else {
+      return kb256;
+    }
+  }
+
+  @Override
+  public void writeFileMetadata(Metadata.Builder builder) throws IOException {
+    long startPosn = rawWriter.getPos();
+    Metadata metadata = builder.build();
+    metadata.writeTo(protobufWriter);
+    protobufWriter.flush();
+    writer.flush();
+    this.metadataLength = (int) (rawWriter.getPos() - startPosn);
+  }
+
+  @Override
+  public void writeFileFooter(Footer.Builder builder) throws IOException {
+    long bodyLength = rawWriter.getPos() - metadataLength;
+    builder.setContentLength(bodyLength);
+    builder.setHeaderLength(headerLength);
+    long startPosn = rawWriter.getPos();
+    Footer footer = builder.build();
+    footer.writeTo(protobufWriter);
+    protobufWriter.flush();
+    writer.flush();
+    this.footerLength = (int) (rawWriter.getPos() - startPosn);
+  }
+
+  @Override
+  public void writePostScript(PostScript.Builder builder) throws IOException {
+    builder.setCompression(writeCompressionKind(compress));
+    builder.setFooterLength(footerLength);
+    builder.setMetadataLength(metadataLength);
+    if (compress != CompressionKind.NONE) {
+      builder.setCompressionBlockSize(bufferSize);
+    }
+    PostScript ps = builder.build();
+    // need to write this uncompressed
+    long startPosn = rawWriter.getPos();
+    ps.writeTo(rawWriter);
+    long length = rawWriter.getPos() - startPosn;
+    if (length > 255) {
+      throw new IllegalArgumentException("PostScript too large at " + length);
+    }
+    rawWriter.writeByte((int)length);
+  }
+
+  @Override
+  public void close() throws IOException {
+    rawWriter.close();
+  }
+
+  private OrcProto.CompressionKind writeCompressionKind(CompressionKind kind) {
+    switch (kind) {
+      case NONE: return OrcProto.CompressionKind.NONE;
+      case ZLIB: return OrcProto.CompressionKind.ZLIB;
+      case SNAPPY: return OrcProto.CompressionKind.SNAPPY;
+      case LZO: return OrcProto.CompressionKind.LZO;
+      default:
+        throw new IllegalArgumentException("Unknown compression " + kind);
+    }
+  }
+
+  @Override
+  public void flush() throws IOException {
+    rawWriter.hflush();
+    // TODO: reset?
+  }
+
+  @Override
+  public long getRawWriterPosition() throws IOException {
+    return rawWriter.getPos();
+  }
+
+  @Override
+  public void appendRawStripe(byte[] stripe, int offset, int length,
+      StripeInformation.Builder dirEntry) throws IOException {
+    long start = rawWriter.getPos();
+    long availBlockSpace = blockSize - (start % blockSize);
+
+    // see if stripe can fit in the current hdfs block, else pad the remaining
+    // space in the block
+    if (length < blockSize && length > availBlockSpace &&
+        addBlockPadding) {
+      byte[] pad = new byte[(int) Math.min(HDFS_BUFFER_SIZE, availBlockSpace)];
+      LOG.info(String.format("Padding ORC by %d bytes while merging..",
+          availBlockSpace));
+      start += availBlockSpace;
+      while (availBlockSpace > 0) {
+        int writeLen = (int) Math.min(availBlockSpace, pad.length);
+        rawWriter.write(pad, 0, writeLen);
+        availBlockSpace -= writeLen;
+      }
+    }
+
+    rawWriter.write(stripe);
+    dirEntry.setOffset(start);
+  }
+
+
+  /**
+   * This class is used to hold the contents of streams as they are buffered.
+   * The TreeWriters write to the outStream and the codec compresses the
+   * data as buffers fill up and stores them in the output list. When the
+   * stripe is being written, the whole stream is written to the file.
+   */
+  private class BufferedStream implements OutStream.OutputReceiver {
+    private final OutStream outStream;
+    private final List<ByteBuffer> output = new ArrayList<ByteBuffer>();
+
+    BufferedStream(String name, int bufferSize,
+                   CompressionCodec codec) throws IOException {
+      outStream = new OutStream(name, bufferSize, codec, this);
+    }
+
+    /**
+     * Receive a buffer from the compression codec.
+     * @param buffer the buffer to save
+     */
+    @Override
+    public void output(ByteBuffer buffer) {
+      output.add(buffer);
+    }
+
+    /**
+     * @return the number of bytes in buffers that are allocated to this stream.
+     */
+    public long getBufferSize() {
+      long result = 0;
+      for (ByteBuffer buf: output) {
+        result += buf.capacity();
+      }
+      return outStream.getBufferSize() + result;
+    }
+
+    /**
+     * Write any saved buffers to the OutputStream if needed, and clears all the buffers.
+     */
+    public void spillToDiskAndClear() throws IOException {
+      if (!outStream.isSuppressed()) {
+        for (ByteBuffer buffer: output) {
+          rawWriter.write(buffer.array(), buffer.arrayOffset() + buffer.position(),
+            buffer.remaining());
+        }
+      }
+      outStream.clear();
+      output.clear();
+    }
+
+    /**
+     * @return The number of bytes that will be written to the output. Assumes the stream writing
+     *         into this receiver has already been flushed.
+     */
+    public long getOutputSize() {
+      long result = 0;
+      for (ByteBuffer buffer: output) {
+        result += buffer.remaining();
+      }
+      return result;
+    }
+
+    @Override
+    public String toString() {
+      return outStream.toString();
+    }
+  }
+
+  @Override
+  public OutStream getOrCreatePhysicalStream(StreamName name) throws IOException {
+    BufferedStream result = streams.get(name);
+    if (result == null) {
+      EnumSet<Modifier> modifiers = createCompressionModifiers(name.getKind());
+      result = new BufferedStream(name.toString(), bufferSize,
+          codec == null ? null : codec.modify(modifiers));
+      streams.put(name, result);
+    }
+    return result.outStream;
+  }
+
+  private EnumSet<Modifier> createCompressionModifiers(Kind kind) {
+    switch (kind) {
+      case BLOOM_FILTER:
+      case DATA:
+      case DICTIONARY_DATA:
+        return EnumSet.of(Modifier.TEXT,
+            compressionStrategy == CompressionStrategy.SPEED ? Modifier.FAST : Modifier.DEFAULT);
+      case LENGTH:
+      case DICTIONARY_COUNT:
+      case PRESENT:
+      case ROW_INDEX:
+      case SECONDARY:
+        // easily compressed using the fastest modes
+        return EnumSet.of(CompressionCodec.Modifier.FASTEST, CompressionCodec.Modifier.BINARY);
+      default:
+        LOG.warn("Missing ORC compression modifiers for " + kind);
+        return null;
+    }
+  }
+
+  @Override
+  public void finalizeStripe(StripeFooter.Builder footerBuilder,
+      StripeInformation.Builder dirEntry) throws IOException {
+    long indexSize = 0;
+    long dataSize = 0;
+    for (Map.Entry<StreamName, BufferedStream> pair: streams.entrySet()) {
+      BufferedStream receiver = pair.getValue();
+      OutStream outStream = receiver.outStream;
+      if (!outStream.isSuppressed()) {
+        outStream.flush();
+        long streamSize = receiver.getOutputSize();
+        StreamName name = pair.getKey();
+        footerBuilder.addStreams(OrcProto.Stream.newBuilder().setColumn(name.getColumn())
+            .setKind(name.getKind()).setLength(streamSize));
+        if (StreamName.Area.INDEX == name.getArea()) {
+          indexSize += streamSize;
+        } else {
+          dataSize += streamSize;
+        }
+      }
+    }
+    dirEntry.setIndexLength(indexSize).setDataLength(dataSize);
+
+    OrcProto.StripeFooter footer = footerBuilder.build();
+    // Do we need to pad the file so the stripe doesn't straddle a block boundary?
+    padStripe(indexSize, dataSize, footer.getSerializedSize());
+
+    // write out the data streams
+    for (Map.Entry<StreamName, BufferedStream> pair : streams.entrySet()) {
+      pair.getValue().spillToDiskAndClear();
+    }
+    // Write out the footer.
+    writeStripeFooter(footer, dataSize, indexSize, dirEntry);
+  }
+
+  @Override
+  public long estimateMemory() {
+    long result = 0;
+    for (BufferedStream stream: streams.values()) {
+      result += stream.getBufferSize();
+    }
+    return result;
+  }
+
+  @Override
+  public void writeIndexStream(StreamName name, Builder rowIndex) throws IOException {
+    OutStream stream = getOrCreatePhysicalStream(name);
+    rowIndex.build().writeTo(stream);
+    stream.flush();
+  }
+
+  @Override
+  public void writeBloomFilterStream(
+      StreamName name, BloomFilterIndex.Builder bloomFilterIndex) throws IOException {
+    OutStream stream = getOrCreatePhysicalStream(name);
+    bloomFilterIndex.build().writeTo(stream);
+    stream.flush();
+  }
+
+  @VisibleForTesting
+  public OutputStream getStream() throws IOException {
+    initialize();
+    return rawWriter;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/65d8fae0/orc/src/java/org/apache/orc/impl/PhysicalWriter.java
----------------------------------------------------------------------
diff --git a/orc/src/java/org/apache/orc/impl/PhysicalWriter.java b/orc/src/java/org/apache/orc/impl/PhysicalWriter.java
new file mode 100644
index 0000000..83742e4
--- /dev/null
+++ b/orc/src/java/org/apache/orc/impl/PhysicalWriter.java
@@ -0,0 +1,123 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.orc.impl;
+
+import java.io.IOException;
+import java.util.EnumSet;
+
+import org.apache.orc.OrcProto.BloomFilterIndex;
+import org.apache.orc.OrcProto.Footer;
+import org.apache.orc.OrcProto.Metadata;
+import org.apache.orc.OrcProto.PostScript;
+import org.apache.orc.OrcProto.RowIndex;
+import org.apache.orc.OrcProto.StripeFooter;
+import org.apache.orc.OrcProto.StripeInformation;
+
+public interface PhysicalWriter {
+
+  /**
+   * Creates all the streams/connections/etc. necessary to write.
+   */
+  void initialize() throws IOException;
+
+  /**
+   * Writes out the file metadata.
+   * @param builder Metadata builder to finalize and write.
+   */
+  void writeFileMetadata(Metadata.Builder builder) throws IOException;
+
+  /**
+   * Writes out the file footer.
+   * @param builder Footer builder to finalize and write.
+   */
+  void writeFileFooter(Footer.Builder builder) throws IOException;
+
+  /**
+   * Writes out the postscript (including the size byte if needed).
+   * @param builder Postscript builder to finalize and write.
+   */
+  void writePostScript(PostScript.Builder builder) throws IOException;
+
+  /**
+   * Creates physical stream to write data to.
+   * @param name Stream name.
+   * @return The output stream.
+   */
+  OutStream getOrCreatePhysicalStream(StreamName name) throws IOException;
+
+  /**
+   * Flushes the data in all the streams, spills them to disk, write out stripe footer.
+   * @param footer Stripe footer to be updated with relevant data and written out.
+   * @param dirEntry File metadata entry for the stripe, to be updated with relevant data.
+   */
+  void finalizeStripe(StripeFooter.Builder footer,
+      StripeInformation.Builder dirEntry) throws IOException;
+
+  /**
+   * Writes out the index for the stripe column.
+   * @param streamName Stream name.
+   * @param rowIndex Row index entries to write.
+   */
+  void writeIndexStream(StreamName name, RowIndex.Builder rowIndex) throws IOException;
+
+  /**
+   * Writes out the index for the stripe column.
+   * @param streamName Stream name.
+   * @param bloomFilterIndex Bloom filter index to write.
+   */
+  void writeBloomFilterStream(StreamName streamName,
+      BloomFilterIndex.Builder bloomFilterIndex) throws IOException;
+
+  /**
+   * Closes the writer.
+   */
+  void close() throws IOException;
+
+  /**
+   * Force-flushes the writer.
+   */
+  void flush() throws IOException;
+
+  /**
+   * @return the physical writer position (e.g. for updater).
+   */
+  long getRawWriterPosition() throws IOException;
+
+  /** @return physical stripe size, taking padding into account. */
+  long getPhysicalStripeSize();
+
+  /** @return whether the writer is compressed. */
+  boolean isCompressed();
+
+  /**
+   * Appends raw stripe data (e.g. for file merger).
+   * @param stripe Stripe data buffer.
+   * @param offset Stripe data buffer offset.
+   * @param length Stripe data buffer length.
+   * @param dirEntry File metadata entry for the stripe, to be updated with relevant data.
+   * @throws IOException
+   */
+  void appendRawStripe(byte[] stripe, int offset, int length,
+      StripeInformation.Builder dirEntry) throws IOException;
+
+  /**
+   * @return the estimated memory usage for the stripe.
+   */
+  long estimateMemory();
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/65d8fae0/orc/src/java/org/apache/orc/impl/ReaderImpl.java
----------------------------------------------------------------------
diff --git a/orc/src/java/org/apache/orc/impl/ReaderImpl.java b/orc/src/java/org/apache/orc/impl/ReaderImpl.java
index 93fc0ce..70fa628 100644
--- a/orc/src/java/org/apache/orc/impl/ReaderImpl.java
+++ b/orc/src/java/org/apache/orc/impl/ReaderImpl.java
@@ -350,7 +350,7 @@ public class ReaderImpl implements Reader {
     if (fileMetadata != null) {
       this.compressionKind = fileMetadata.getCompressionKind();
       this.bufferSize = fileMetadata.getCompressionBufferSize();
-      this.codec = WriterImpl.createCodec(compressionKind);
+      this.codec = PhysicalFsWriter.createCodec(compressionKind);
       this.metadataSize = fileMetadata.getMetadataSize();
       this.stripeStats = fileMetadata.getStripeStats();
       this.versionList = fileMetadata.getVersionList();
@@ -459,7 +459,7 @@ public class ReaderImpl implements Reader {
     System.arraycopy(buffer.array(), psOffset, psBuffer, 0, psLen);
     OrcProto.PostScript ps = OrcProto.PostScript.parseFrom(psBuffer);
     int footerSize = (int) ps.getFooterLength();
-    CompressionCodec codec = WriterImpl
+    CompressionCodec codec = PhysicalFsWriter
         .createCodec(CompressionKind.valueOf(ps.getCompression().name()));
     OrcProto.Footer footer = extractFooter(buffer,
         (int) (buffer.position() + ps.getMetadataLength()),
@@ -509,7 +509,7 @@ public class ReaderImpl implements Reader {
       int psOffset = readSize - 1 - psLen;
       ps = extractPostScript(buffer, path, psLen, psOffset);
       bufferSize = (int) ps.getCompressionBlockSize();
-      codec = WriterImpl.createCodec(CompressionKind.valueOf(ps.getCompression().name()));
+      codec = PhysicalFsWriter.createCodec(CompressionKind.valueOf(ps.getCompression().name()));
       fileTailBuilder.setPostscriptLength(psLen).setPostscript(ps);
 
       int footerSize = (int) ps.getFooterLength();

http://git-wip-us.apache.org/repos/asf/hive/blob/65d8fae0/orc/src/java/org/apache/orc/impl/RecordReaderUtils.java
----------------------------------------------------------------------
diff --git a/orc/src/java/org/apache/orc/impl/RecordReaderUtils.java b/orc/src/java/org/apache/orc/impl/RecordReaderUtils.java
index 1067957..6100d50 100644
--- a/orc/src/java/org/apache/orc/impl/RecordReaderUtils.java
+++ b/orc/src/java/org/apache/orc/impl/RecordReaderUtils.java
@@ -71,7 +71,7 @@ public class RecordReaderUtils {
       this.fs = properties.getFileSystem();
       this.path = properties.getPath();
       this.useZeroCopy = properties.getZeroCopy();
-      this.codec = WriterImpl.createCodec(properties.getCompression());
+      this.codec = PhysicalFsWriter.createCodec(properties.getCompression());
       this.bufferSize = properties.getBufferSize();
       this.typeCount = properties.getTypeCount();
       if (useZeroCopy) {

http://git-wip-us.apache.org/repos/asf/hive/blob/65d8fae0/orc/src/java/org/apache/orc/impl/WriterImpl.java
----------------------------------------------------------------------
diff --git a/orc/src/java/org/apache/orc/impl/WriterImpl.java b/orc/src/java/org/apache/orc/impl/WriterImpl.java
index b2966e0..b17fb41 100644
--- a/orc/src/java/org/apache/orc/impl/WriterImpl.java
+++ b/orc/src/java/org/apache/orc/impl/WriterImpl.java
@@ -21,12 +21,10 @@ package org.apache.orc.impl;
 import static com.google.common.base.Preconditions.checkArgument;
 
 import java.io.IOException;
-import java.io.OutputStream;
 import java.nio.ByteBuffer;
 import java.sql.Timestamp;
 import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.EnumSet;
 import java.util.List;
 import java.util.Map;
 import java.util.TimeZone;
@@ -35,11 +33,12 @@ import java.util.TreeMap;
 import org.apache.hadoop.hive.ql.util.JavaDataModel;
 import org.apache.orc.BinaryColumnStatistics;
 import org.apache.orc.BloomFilterIO;
-import org.apache.orc.CompressionCodec;
-import org.apache.orc.CompressionKind;
 import org.apache.orc.OrcConf;
 import org.apache.orc.OrcFile;
 import org.apache.orc.OrcProto;
+import org.apache.orc.OrcProto.BloomFilterIndex;
+import org.apache.orc.OrcProto.RowIndex;
+import org.apache.orc.OrcProto.Stream;
 import org.apache.orc.OrcUtils;
 import org.apache.orc.StringColumnStatistics;
 import org.apache.orc.StripeInformation;
@@ -48,7 +47,6 @@ import org.apache.orc.Writer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.common.type.HiveDecimal;
@@ -69,7 +67,6 @@ import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.Lists;
 import com.google.common.primitives.Longs;
 import com.google.protobuf.ByteString;
-import com.google.protobuf.CodedOutputStream;
 
 /**
  * An ORC file writer. The file is divided into stripes, which is the natural
@@ -94,35 +91,14 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
 
   private static final Logger LOG = LoggerFactory.getLogger(WriterImpl.class);
 
-  private static final int HDFS_BUFFER_SIZE = 256 * 1024;
   private static final int MIN_ROW_INDEX_STRIDE = 1000;
 
-  // threshold above which buffer size will be automatically resized
-  private static final int COLUMN_COUNT_THRESHOLD = 1000;
-
-  private final FileSystem fs;
   private final Path path;
-  private final long defaultStripeSize;
-  private long adjustedStripeSize;
   private final int rowIndexStride;
-  private final CompressionKind compress;
-  private final CompressionCodec codec;
-  private final boolean addBlockPadding;
-  private final int bufferSize;
-  private final long blockSize;
-  private final double paddingTolerance;
   private final TypeDescription schema;
 
-  // the streams that make up the current stripe
-  private final Map<StreamName, BufferedStream> streams =
-    new TreeMap<StreamName, BufferedStream>();
-
-  private FSDataOutputStream rawWriter = null;
-  // the compressed metadata information outStream
-  private OutStream writer = null;
-  // a protobuf outStream around streamFactory
-  private CodedOutputStream protobufWriter = null;
-  private long headerLength;
+  @VisibleForTesting
+  protected final PhysicalWriter physWriter;
   private int columnCount;
   private long rowCount = 0;
   private long rowsInStripe = 0;
@@ -142,7 +118,6 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
   private final OrcFile.WriterCallback callback;
   private final OrcFile.WriterContext callbackContext;
   private final OrcFile.EncodingStrategy encodingStrategy;
-  private final OrcFile.CompressionStrategy compressionStrategy;
   private final boolean[] bloomFilterColumns;
   private final double bloomFilterFpp;
   private boolean writeTimeZone;
@@ -150,7 +125,6 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
   public WriterImpl(FileSystem fs,
                     Path path,
                     OrcFile.WriterOptions opts) throws IOException {
-    this.fs = fs;
     this.path = path;
     this.conf = opts.getConfiguration();
     this.callback = opts.getCallback();
@@ -166,26 +140,11 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
     } else {
       callbackContext = null;
     }
-    this.adjustedStripeSize = opts.getStripeSize();
-    this.defaultStripeSize = opts.getStripeSize();
     this.version = opts.getVersion();
     this.encodingStrategy = opts.getEncodingStrategy();
-    this.compressionStrategy = opts.getCompressionStrategy();
-    this.addBlockPadding = opts.getBlockPadding();
-    this.blockSize = opts.getBlockSize();
-    this.paddingTolerance = opts.getPaddingTolerance();
-    this.compress = opts.getCompress();
     this.rowIndexStride = opts.getRowIndexStride();
     this.memoryManager = opts.getMemoryManager();
     buildIndex = rowIndexStride > 0;
-    codec = createCodec(compress);
-    int numColumns = schema.getMaximumId() + 1;
-    if (opts.isEnforceBufferSize()) {
-      this.bufferSize = opts.getBufferSize();
-    } else {
-      this.bufferSize = getEstimatedBufferSize(defaultStripeSize,
-          numColumns, opts.getBufferSize());
-    }
     if (version == OrcFile.Version.V_0_11) {
       /* do not write bloom filters for ORC v11 */
       this.bloomFilterColumns = new boolean[schema.getMaximumId() + 1];
@@ -194,6 +153,8 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
           OrcUtils.includeColumns(opts.getBloomFilterColumns(), schema);
     }
     this.bloomFilterFpp = opts.getBloomFilterFpp();
+    int numColumns = schema.getMaximumId() + 1;
+    physWriter = new PhysicalFsWriter(fs, path, numColumns, opts);
     treeWriter = createTreeWriter(schema, streamFactory, false);
     if (buildIndex && rowIndexStride < MIN_ROW_INDEX_STRIDE) {
       throw new IllegalArgumentException("Row stride must be at least " +
@@ -202,83 +163,11 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
 
     // ensure that we are able to handle callbacks before we register ourselves
     memoryManager.addWriter(path, opts.getStripeSize(), this);
-    LOG.info("ORC writer created for path: {} with stripeSize: {} blockSize: {}" +
-        " compression: {} bufferSize: {}", path, defaultStripeSize, blockSize,
-        compress, bufferSize);
-  }
-
-  @VisibleForTesting
-  public static int getEstimatedBufferSize(long stripeSize, int numColumns,
-                                           int bs) {
-    // The worst case is that there are 2 big streams per a column and
-    // we want to guarantee that each stream gets ~10 buffers.
-    // This keeps buffers small enough that we don't get really small stripe
-    // sizes.
-    int estBufferSize = (int) (stripeSize / (20 * numColumns));
-    estBufferSize = getClosestBufferSize(estBufferSize);
-    return estBufferSize > bs ? bs : estBufferSize;
-  }
-
-  private static int getClosestBufferSize(int estBufferSize) {
-    final int kb4 = 4 * 1024;
-    final int kb8 = 8 * 1024;
-    final int kb16 = 16 * 1024;
-    final int kb32 = 32 * 1024;
-    final int kb64 = 64 * 1024;
-    final int kb128 = 128 * 1024;
-    final int kb256 = 256 * 1024;
-    if (estBufferSize <= kb4) {
-      return kb4;
-    } else if (estBufferSize > kb4 && estBufferSize <= kb8) {
-      return kb8;
-    } else if (estBufferSize > kb8 && estBufferSize <= kb16) {
-      return kb16;
-    } else if (estBufferSize > kb16 && estBufferSize <= kb32) {
-      return kb32;
-    } else if (estBufferSize > kb32 && estBufferSize <= kb64) {
-      return kb64;
-    } else if (estBufferSize > kb64 && estBufferSize <= kb128) {
-      return kb128;
-    } else {
-      return kb256;
-    }
-  }
-
-  public static CompressionCodec createCodec(CompressionKind kind) {
-    switch (kind) {
-      case NONE:
-        return null;
-      case ZLIB:
-        return new ZlibCodec();
-      case SNAPPY:
-        return new SnappyCodec();
-      case LZO:
-        try {
-          ClassLoader loader = Thread.currentThread().getContextClassLoader();
-          if (loader == null) {
-            loader = WriterImpl.class.getClassLoader();
-          }
-          @SuppressWarnings("unchecked")
-          Class<? extends CompressionCodec> lzo =
-              (Class<? extends CompressionCodec>)
-              loader.loadClass("org.apache.hadoop.hive.ql.io.orc.LzoCodec");
-          return lzo.newInstance();
-        } catch (ClassNotFoundException e) {
-          throw new IllegalArgumentException("LZO is not available.", e);
-        } catch (InstantiationException e) {
-          throw new IllegalArgumentException("Problem initializing LZO", e);
-        } catch (IllegalAccessException e) {
-          throw new IllegalArgumentException("Insufficient access to LZO", e);
-        }
-      default:
-        throw new IllegalArgumentException("Unknown compression codec: " +
-            kind);
-    }
   }
 
   @Override
   public boolean checkMemory(double newScale) throws IOException {
-    long limit = (long) Math.round(adjustedStripeSize * newScale);
+    long limit = (long) Math.round(physWriter.getPhysicalStripeSize() * newScale);
     long size = estimateStripeSize();
     if (LOG.isDebugEnabled()) {
       LOG.debug("ORC writer " + path + " size = " + size + " limit = " +
@@ -291,116 +180,6 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
     return false;
   }
 
-  /**
-   * This class is used to hold the contents of streams as they are buffered.
-   * The TreeWriters write to the outStream and the codec compresses the
-   * data as buffers fill up and stores them in the output list. When the
-   * stripe is being written, the whole stream is written to the file.
-   */
-  private class BufferedStream implements OutStream.OutputReceiver {
-    private final OutStream outStream;
-    private final List<ByteBuffer> output = new ArrayList<ByteBuffer>();
-
-    BufferedStream(String name, int bufferSize,
-                   CompressionCodec codec) throws IOException {
-      outStream = new OutStream(name, bufferSize, codec, this);
-    }
-
-    /**
-     * Receive a buffer from the compression codec.
-     * @param buffer the buffer to save
-     */
-    @Override
-    public void output(ByteBuffer buffer) {
-      output.add(buffer);
-    }
-
-    /**
-     * Get the number of bytes in buffers that are allocated to this stream.
-     * @return number of bytes in buffers
-     */
-    public long getBufferSize() {
-      long result = 0;
-      for(ByteBuffer buf: output) {
-        result += buf.capacity();
-      }
-      return outStream.getBufferSize() + result;
-    }
-
-    /**
-     * Flush the stream to the codec.
-     * @throws IOException
-     */
-    public void flush() throws IOException {
-      outStream.flush();
-    }
-
-    /**
-     * Clear all of the buffers.
-     * @throws IOException
-     */
-    public void clear() throws IOException {
-      outStream.clear();
-      output.clear();
-    }
-
-    /**
-     * Check the state of suppress flag in output stream
-     * @return value of suppress flag
-     */
-    public boolean isSuppressed() {
-      return outStream.isSuppressed();
-    }
-
-    /**
-     * Get the number of bytes that will be written to the output. Assumes
-     * the stream has already been flushed.
-     * @return the number of bytes
-     */
-    public long getOutputSize() {
-      long result = 0;
-      for(ByteBuffer buffer: output) {
-        result += buffer.remaining();
-      }
-      return result;
-    }
-
-    /**
-     * Write the saved compressed buffers to the OutputStream.
-     * @param out the stream to write to
-     * @throws IOException
-     */
-    void spillTo(OutputStream out) throws IOException {
-      for(ByteBuffer buffer: output) {
-        out.write(buffer.array(), buffer.arrayOffset() + buffer.position(),
-          buffer.remaining());
-      }
-    }
-
-    @Override
-    public String toString() {
-      return outStream.toString();
-    }
-  }
-
-  /**
-   * An output receiver that writes the ByteBuffers to the output stream
-   * as they are received.
-   */
-  private class DirectStream implements OutStream.OutputReceiver {
-    private final FSDataOutputStream output;
-
-    DirectStream(FSDataOutputStream output) {
-      this.output = output;
-    }
-
-    @Override
-    public void output(ByteBuffer buffer) throws IOException {
-      output.write(buffer.array(), buffer.arrayOffset() + buffer.position(),
-        buffer.remaining());
-    }
-  }
-
   private static class RowIndexPositionRecorder implements PositionRecorder {
     private final OrcProto.RowIndexEntry.Builder builder;
 
@@ -430,44 +209,18 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
                                   OrcProto.Stream.Kind kind
                                   ) throws IOException {
       final StreamName name = new StreamName(column, kind);
-      final EnumSet<CompressionCodec.Modifier> modifiers;
-
-      switch (kind) {
-        case BLOOM_FILTER:
-        case DATA:
-        case DICTIONARY_DATA:
-          if (getCompressionStrategy() == OrcFile.CompressionStrategy.SPEED) {
-            modifiers = EnumSet.of(CompressionCodec.Modifier.FAST,
-                CompressionCodec.Modifier.TEXT);
-          } else {
-            modifiers = EnumSet.of(CompressionCodec.Modifier.DEFAULT,
-                CompressionCodec.Modifier.TEXT);
-          }
-          break;
-        case LENGTH:
-        case DICTIONARY_COUNT:
-        case PRESENT:
-        case ROW_INDEX:
-        case SECONDARY:
-          // easily compressed using the fastest modes
-          modifiers = EnumSet.of(CompressionCodec.Modifier.FASTEST,
-              CompressionCodec.Modifier.BINARY);
-          break;
-        default:
-          LOG.warn("Missing ORC compression modifiers for " + kind);
-          modifiers = null;
-          break;
-      }
+      return physWriter.getOrCreatePhysicalStream(name);
+    }
 
-      BufferedStream result = streams.get(name);
-      if (result == null) {
-        result = new BufferedStream(name.toString(), bufferSize,
-            codec == null ? codec : codec.modify(modifiers));
-        streams.put(name, result);
-      }
-      return result.outStream;
+    public void writeIndex(int column, RowIndex.Builder rowIndex) throws IOException {
+      physWriter.writeIndexStream(new StreamName(column, Stream.Kind.ROW_INDEX), rowIndex);
     }
 
+    public void writeBloomFilter(
+        int column, BloomFilterIndex.Builder bloomFilterIndex) throws IOException {
+      physWriter.writeBloomFilterStream(
+          new StreamName(column, Stream.Kind.BLOOM_FILTER), bloomFilterIndex);
+    }
     /**
      * Get the next column id.
      * @return a number from 0 to the number of columns - 1
@@ -496,7 +249,7 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
      * @return are the streams compressed
      */
     public boolean isCompressed() {
-      return codec != null;
+      return physWriter.isCompressed();
     }
 
     /**
@@ -508,14 +261,6 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
     }
 
     /**
-     * Get the compression strategy to use.
-     * @return compression strategy
-     */
-    public OrcFile.CompressionStrategy getCompressionStrategy() {
-      return compressionStrategy;
-    }
-
-    /**
      * Get the bloom filter columns
      * @return bloom filter columns
      */
@@ -572,8 +317,6 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
     protected final RowIndexPositionRecorder rowIndexPosition;
     private final OrcProto.RowIndex.Builder rowIndex;
     private final OrcProto.RowIndexEntry.Builder rowIndexEntry;
-    private final PositionedOutputStream rowIndexStream;
-    private final PositionedOutputStream bloomFilterStream;
     protected final BloomFilterIO bloomFilter;
     protected final boolean createBloomFilter;
     private final OrcProto.BloomFilterIndex.Builder bloomFilterIndex;
@@ -615,21 +358,14 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
       rowIndexEntry = OrcProto.RowIndexEntry.newBuilder();
       rowIndexPosition = new RowIndexPositionRecorder(rowIndexEntry);
       stripeStatsBuilders = Lists.newArrayList();
-      if (streamFactory.buildIndex()) {
-        rowIndexStream = streamFactory.createStream(id, OrcProto.Stream.Kind.ROW_INDEX);
-      } else {
-        rowIndexStream = null;
-      }
       if (createBloomFilter) {
         bloomFilterEntry = OrcProto.BloomFilter.newBuilder();
         bloomFilterIndex = OrcProto.BloomFilterIndex.newBuilder();
-        bloomFilterStream = streamFactory.createStream(id, OrcProto.Stream.Kind.BLOOM_FILTER);
         bloomFilter = new BloomFilterIO(streamFactory.getRowIndexStride(),
             streamFactory.getBloomFilterFPP());
       } else {
         bloomFilterEntry = null;
         bloomFilterIndex = null;
-        bloomFilterStream = null;
         bloomFilter = null;
       }
     }
@@ -758,11 +494,11 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
         isPresent.flush();
 
         // if no nulls are found in a stream, then suppress the stream
-        if(!foundNulls) {
+        if (!foundNulls) {
           isPresentOutStream.suppress();
           // since isPresent bitstream is suppressed, update the index to
           // remove the positions of the isPresent stream
-          if (rowIndexStream != null) {
+          if (streamFactory.buildIndex()) {
             removeIsPresentPositions();
           }
         }
@@ -781,22 +517,21 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
       if (streamFactory.hasWriterTimeZone()) {
         builder.setWriterTimezone(TimeZone.getDefault().getID());
       }
-      if (rowIndexStream != null) {
+      if (streamFactory.buildIndex()) {
         if (rowIndex.getEntryCount() != requiredIndexEntries) {
           throw new IllegalArgumentException("Column has wrong number of " +
                "index entries found: " + rowIndex.getEntryCount() + " expected: " +
                requiredIndexEntries);
         }
-        rowIndex.build().writeTo(rowIndexStream);
-        rowIndexStream.flush();
+        streamFactory.writeIndex(id, rowIndex);
       }
+
       rowIndex.clear();
       rowIndexEntry.clear();
 
       // write the bloom filter to out stream
-      if (bloomFilterStream != null) {
-        bloomFilterIndex.build().writeTo(bloomFilterStream);
-        bloomFilterStream.flush();
+      if (createBloomFilter) {
+        streamFactory.writeBloomFilter(id, bloomFilterIndex);
         bloomFilterIndex.clear();
         bloomFilterEntry.clear();
       }
@@ -2463,17 +2198,8 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
   }
 
   @VisibleForTesting
-  public FSDataOutputStream getStream() throws IOException {
-    if (rawWriter == null) {
-      rawWriter = fs.create(path, false, HDFS_BUFFER_SIZE,
-                            fs.getDefaultReplication(path), blockSize);
-      rawWriter.writeBytes(OrcFile.MAGIC);
-      headerLength = rawWriter.getPos();
-      writer = new OutStream("metadata", bufferSize, codec,
-                             new DirectStream(rawWriter));
-      protobufWriter = CodedOutputStream.newInstance(writer);
-    }
-    return rawWriter;
+  public void ensureStream() throws IOException {
+    physWriter.initialize();
   }
 
   private void createRowIndexEntry() throws IOException {
@@ -2482,7 +2208,7 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
   }
 
   private void flushStripe() throws IOException {
-    getStream();
+    ensureStream();
     if (buildIndex && rowsInIndex != 0) {
       createRowIndexEntry();
     }
@@ -2493,98 +2219,12 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
       // finalize the data for the stripe
       int requiredIndexEntries = rowIndexStride == 0 ? 0 :
           (int) ((rowsInStripe + rowIndexStride - 1) / rowIndexStride);
-      OrcProto.StripeFooter.Builder builder =
-          OrcProto.StripeFooter.newBuilder();
+      OrcProto.StripeFooter.Builder builder = OrcProto.StripeFooter.newBuilder();
+      OrcProto.StripeInformation.Builder dirEntry = OrcProto.StripeInformation
+          .newBuilder().setNumberOfRows(rowsInStripe);
       treeWriter.writeStripe(builder, requiredIndexEntries);
-      long indexSize = 0;
-      long dataSize = 0;
-      for(Map.Entry<StreamName, BufferedStream> pair: streams.entrySet()) {
-        BufferedStream stream = pair.getValue();
-        if (!stream.isSuppressed()) {
-          stream.flush();
-          StreamName name = pair.getKey();
-          long streamSize = pair.getValue().getOutputSize();
-          builder.addStreams(OrcProto.Stream.newBuilder()
-                             .setColumn(name.getColumn())
-                             .setKind(name.getKind())
-                             .setLength(streamSize));
-          if (StreamName.Area.INDEX == name.getArea()) {
-            indexSize += streamSize;
-          } else {
-            dataSize += streamSize;
-          }
-        }
-      }
-      OrcProto.StripeFooter footer = builder.build();
-
-      // Do we need to pad the file so the stripe doesn't straddle a block
-      // boundary?
-      long start = rawWriter.getPos();
-      final long currentStripeSize = indexSize + dataSize + footer.getSerializedSize();
-      final long available = blockSize - (start % blockSize);
-      final long overflow = currentStripeSize - adjustedStripeSize;
-      final float availRatio = (float) available / (float) defaultStripeSize;
-
-      if (availRatio > 0.0f && availRatio < 1.0f
-          && availRatio > paddingTolerance) {
-        // adjust default stripe size to fit into remaining space, also adjust
-        // the next stripe for correction based on the current stripe size
-        // and user specified padding tolerance. Since stripe size can overflow
-        // the default stripe size we should apply this correction to avoid
-        // writing portion of last stripe to next hdfs block.
-        double correction = overflow > 0 ? (double) overflow
-            / (double) adjustedStripeSize : 0.0;
-
-        // correction should not be greater than user specified padding
-        // tolerance
-        correction = correction > paddingTolerance ? paddingTolerance
-            : correction;
-
-        // adjust next stripe size based on current stripe estimate correction
-        adjustedStripeSize = (long) ((1.0f - correction) * (availRatio * defaultStripeSize));
-      } else if (availRatio >= 1.0) {
-        adjustedStripeSize = defaultStripeSize;
-      }
-
-      if (availRatio < paddingTolerance && addBlockPadding) {
-        long padding = blockSize - (start % blockSize);
-        byte[] pad = new byte[(int) Math.min(HDFS_BUFFER_SIZE, padding)];
-        LOG.info(String.format("Padding ORC by %d bytes (<=  %.2f * %d)", 
-            padding, availRatio, defaultStripeSize));
-        start += padding;
-        while (padding > 0) {
-          int writeLen = (int) Math.min(padding, pad.length);
-          rawWriter.write(pad, 0, writeLen);
-          padding -= writeLen;
-        }
-        adjustedStripeSize = defaultStripeSize;
-      } else if (currentStripeSize < blockSize
-          && (start % blockSize) + currentStripeSize > blockSize) {
-        // even if you don't pad, reset the default stripe size when crossing a
-        // block boundary
-        adjustedStripeSize = defaultStripeSize;
-      }
-
-      // write out the data streams
-      for(Map.Entry<StreamName, BufferedStream> pair: streams.entrySet()) {
-        BufferedStream stream = pair.getValue();
-        if (!stream.isSuppressed()) {
-          stream.spillTo(rawWriter);
-        }
-        stream.clear();
-      }
-      footer.writeTo(protobufWriter);
-      protobufWriter.flush();
-      writer.flush();
-      long footerLength = rawWriter.getPos() - start - dataSize - indexSize;
-      OrcProto.StripeInformation dirEntry =
-          OrcProto.StripeInformation.newBuilder()
-              .setOffset(start)
-              .setNumberOfRows(rowsInStripe)
-              .setIndexLength(indexSize)
-              .setDataLength(dataSize)
-              .setFooterLength(footerLength).build();
-      stripes.add(dirEntry);
+      physWriter.finalizeStripe(builder, dirEntry);
+      stripes.add(dirEntry.build());
       rowCount += rowsInStripe;
       rowsInStripe = 0;
     }
@@ -2645,17 +2285,6 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
     return total;
   }
 
-  private OrcProto.CompressionKind writeCompressionKind(CompressionKind kind) {
-    switch (kind) {
-      case NONE: return OrcProto.CompressionKind.NONE;
-      case ZLIB: return OrcProto.CompressionKind.ZLIB;
-      case SNAPPY: return OrcProto.CompressionKind.SNAPPY;
-      case LZO: return OrcProto.CompressionKind.LZO;
-      default:
-        throw new IllegalArgumentException("Unknown compression " + kind);
-    }
-  }
-
   private void writeFileStatistics(OrcProto.Footer.Builder builder,
                                    TreeWriter writer) throws IOException {
     builder.addStatistics(writer.fileStatistics.serialize());
@@ -2664,26 +2293,19 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
     }
   }
 
-  private int writeMetadata() throws IOException {
-    getStream();
+  private void writeMetadata() throws IOException {
+    ensureStream();
     OrcProto.Metadata.Builder builder = OrcProto.Metadata.newBuilder();
     for(OrcProto.StripeStatistics.Builder ssb : treeWriter.stripeStatsBuilders) {
       builder.addStripeStats(ssb.build());
     }
 
-    long startPosn = rawWriter.getPos();
-    OrcProto.Metadata metadata = builder.build();
-    metadata.writeTo(protobufWriter);
-    protobufWriter.flush();
-    writer.flush();
-    return (int) (rawWriter.getPos() - startPosn);
+    physWriter.writeFileMetadata(builder);
   }
 
-  private int writeFooter(long bodyLength) throws IOException {
-    getStream();
+  private void writeFooter() throws IOException {
+    ensureStream();
     OrcProto.Footer.Builder builder = OrcProto.Footer.newBuilder();
-    builder.setContentLength(bodyLength);
-    builder.setHeaderLength(headerLength);
     builder.setNumberOfRows(rowCount);
     builder.setRowIndexStride(rowIndexStride);
     // populate raw data size
@@ -2701,45 +2323,21 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
       builder.addMetadata(OrcProto.UserMetadataItem.newBuilder()
         .setName(entry.getKey()).setValue(entry.getValue()));
     }
-    long startPosn = rawWriter.getPos();
-    OrcProto.Footer footer = builder.build();
-    footer.writeTo(protobufWriter);
-    protobufWriter.flush();
-    writer.flush();
-    return (int) (rawWriter.getPos() - startPosn);
+    physWriter.writeFileFooter(builder);
   }
 
-  private int writePostScript(int footerLength, int metadataLength) throws IOException {
+  private void writePostScript() throws IOException {
     OrcProto.PostScript.Builder builder =
       OrcProto.PostScript.newBuilder()
-        .setCompression(writeCompressionKind(compress))
-        .setFooterLength(footerLength)
-        .setMetadataLength(metadataLength)
         .setMagic(OrcFile.MAGIC)
         .addVersion(version.getMajor())
         .addVersion(version.getMinor())
         .setWriterVersion(OrcFile.CURRENT_WRITER.getId());
-    if (compress != CompressionKind.NONE) {
-      builder.setCompressionBlockSize(bufferSize);
-    }
-    OrcProto.PostScript ps = builder.build();
-    // need to write this uncompressed
-    long startPosn = rawWriter.getPos();
-    ps.writeTo(rawWriter);
-    long length = rawWriter.getPos() - startPosn;
-    if (length > 255) {
-      throw new IllegalArgumentException("PostScript too large at " + length);
-    }
-    return (int) length;
+    physWriter.writePostScript(builder);
   }
 
   private long estimateStripeSize() {
-    long result = 0;
-    for(BufferedStream stream: streams.values()) {
-      result += stream.getBufferSize();
-    }
-    result += treeWriter.estimateMemory();
-    return result;
+    return physWriter.estimateMemory() + treeWriter.estimateMemory();
   }
 
   @Override
@@ -2785,11 +2383,10 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
     memoryManager.removeWriter(path);
     // actually close the file
     flushStripe();
-    int metadataLength = writeMetadata();
-    int footerLength = writeFooter(rawWriter.getPos() - metadataLength);
-    rawWriter.writeByte(writePostScript(footerLength, metadataLength));
-    rawWriter.close();
-
+    writeMetadata();
+    writeFooter();
+    writePostScript();
+    physWriter.close();
   }
 
   /**
@@ -2819,13 +2416,13 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
       if (callback != null) {
         callback.preFooterWrite(callbackContext);
       }
-      int metaLength = writeMetadata();
-      int footLength = writeFooter(rawWriter.getPos() - metaLength);
-      rawWriter.writeByte(writePostScript(footLength, metaLength));
+      writeMetadata();
+      writeFooter();
+      writePostScript();
       stripesAtLastFlush = stripes.size();
-      rawWriter.hflush();
+      physWriter.flush();
     }
-    return rawWriter.getPos();
+    return physWriter.getRawWriterPosition();
   }
 
   @Override
@@ -2839,26 +2436,10 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
     checkArgument(stripeStatistics != null,
         "Stripe statistics must not be null");
 
-    getStream();
-    long start = rawWriter.getPos();
-    long availBlockSpace = blockSize - (start % blockSize);
-
-    // see if stripe can fit in the current hdfs block, else pad the remaining
-    // space in the block
-    if (length < blockSize && length > availBlockSpace &&
-        addBlockPadding) {
-      byte[] pad = new byte[(int) Math.min(HDFS_BUFFER_SIZE, availBlockSpace)];
-      LOG.info(String.format("Padding ORC by %d bytes while merging..",
-          availBlockSpace));
-      start += availBlockSpace;
-      while (availBlockSpace > 0) {
-        int writeLen = (int) Math.min(availBlockSpace, pad.length);
-        rawWriter.write(pad, 0, writeLen);
-        availBlockSpace -= writeLen;
-      }
-    }
+    ensureStream();
+    OrcProto.StripeInformation.Builder dirEntry = OrcProto.StripeInformation.newBuilder();
+    physWriter.appendRawStripe(stripe, offset, length, dirEntry);
 
-    rawWriter.write(stripe);
     rowsInStripe = stripeStatistics.getColStats(0).getNumberOfValues();
     rowCount += rowsInStripe;
 
@@ -2869,15 +2450,11 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
     updateFileStatistics(stripeStatistics);
 
     // update stripe information
-    OrcProto.StripeInformation dirEntry = OrcProto.StripeInformation
-        .newBuilder()
-        .setOffset(start)
-        .setNumberOfRows(rowsInStripe)
+    stripes.add(dirEntry.setNumberOfRows(rowsInStripe)
         .setIndexLength(stripeInfo.getIndexLength())
         .setDataLength(stripeInfo.getDataLength())
         .setFooterLength(stripeInfo.getFooterLength())
-        .build();
-    stripes.add(dirEntry);
+        .build());
 
     // reset it after writing the stripe
     rowsInStripe = 0;

http://git-wip-us.apache.org/repos/asf/hive/blob/65d8fae0/orc/src/test/org/apache/orc/impl/TestOrcWideTable.java
----------------------------------------------------------------------
diff --git a/orc/src/test/org/apache/orc/impl/TestOrcWideTable.java b/orc/src/test/org/apache/orc/impl/TestOrcWideTable.java
index 289a86e..efa3ffb 100644
--- a/orc/src/test/org/apache/orc/impl/TestOrcWideTable.java
+++ b/orc/src/test/org/apache/orc/impl/TestOrcWideTable.java
@@ -28,37 +28,37 @@ public class TestOrcWideTable {
 
   @Test
   public void testBufferSizeFor1Col() throws IOException {
-    assertEquals(128 * 1024, WriterImpl.getEstimatedBufferSize(512 * 1024 * 1024,
+    assertEquals(128 * 1024, PhysicalFsWriter.getEstimatedBufferSize(512 * 1024 * 1024,
         1, 128*1024));
   }
 
   @Test
   public void testBufferSizeFor50Col() throws IOException {
-    assertEquals(256 * 1024, WriterImpl.getEstimatedBufferSize(256 * 1024 * 1024,
+    assertEquals(256 * 1024, PhysicalFsWriter.getEstimatedBufferSize(256 * 1024 * 1024,
         50, 256*1024));
   }
 
   @Test
   public void testBufferSizeFor1000Col() throws IOException {
-    assertEquals(32 * 1024, WriterImpl.getEstimatedBufferSize(512 * 1024 * 1024,
+    assertEquals(32 * 1024, PhysicalFsWriter.getEstimatedBufferSize(512 * 1024 * 1024,
         1000, 128*1024));
   }
 
   @Test
   public void testBufferSizeFor2000Col() throws IOException {
-    assertEquals(16 * 1024, WriterImpl.getEstimatedBufferSize(512 * 1024 * 1024,
+    assertEquals(16 * 1024, PhysicalFsWriter.getEstimatedBufferSize(512 * 1024 * 1024,
         2000, 256*1024));
   }
 
   @Test
   public void testBufferSizeFor4000Col() throws IOException {
-    assertEquals(8 * 1024, WriterImpl.getEstimatedBufferSize(512 * 1024 * 1024,
+    assertEquals(8 * 1024, PhysicalFsWriter.getEstimatedBufferSize(512 * 1024 * 1024,
         4000, 256*1024));
   }
 
   @Test
   public void testBufferSizeFor25000Col() throws IOException {
-    assertEquals(4 * 1024, WriterImpl.getEstimatedBufferSize(512 * 1024 * 1024,
+    assertEquals(4 * 1024, PhysicalFsWriter.getEstimatedBufferSize(512 * 1024 * 1024,
         25000, 256*1024));
   }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/65d8fae0/ql/src/java/org/apache/hadoop/hive/ql/io/orc/WriterImpl.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/WriterImpl.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/WriterImpl.java
index 8e52907..075c3b4 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/WriterImpl.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/WriterImpl.java
@@ -60,6 +60,9 @@ import org.apache.hadoop.hive.serde2.objectinspector.primitive.StringObjectInspe
 import org.apache.hadoop.hive.serde2.objectinspector.primitive.TimestampObjectInspector;
 import org.apache.hadoop.io.BytesWritable;
 import org.apache.hadoop.io.Text;
+import org.apache.orc.impl.PhysicalWriter;
+
+import com.google.common.annotations.VisibleForTesting;
 
 /**
  * An ORC file writer. The file is divided into stripes, which is the natural
@@ -312,4 +315,9 @@ public class WriterImpl extends org.apache.orc.impl.WriterImpl implements Writer
     flushInternalBatch();
     super.close();
   }
+
+  @VisibleForTesting
+  PhysicalWriter getPhysicalWriter() {
+    return physWriter;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/65d8fae0/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java b/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java
index d6b48a3..197c1d2 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java
@@ -122,6 +122,7 @@ import org.apache.hadoop.mapred.Reporter;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.util.Progressable;
 import org.apache.orc.*;
+import org.apache.orc.impl.PhysicalFsWriter;
 import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
@@ -2138,8 +2139,7 @@ public class TestInputOutputFormat {
       writer.addRow(new MyRow(i, 2*i));
     }
     writer.close();
-    ((MockOutputStream) ((WriterImpl) writer).getStream())
-        .setBlocks(new MockBlock("host0", "host1"));
+    getStreamFromWriter(writer).setBlocks(new MockBlock("host0", "host1"));
 
     // call getsplits
     HiveInputFormat<?,?> inputFormat =
@@ -2160,6 +2160,11 @@ public class TestInputOutputFormat {
     assertEquals(false, reader.next(key, value));
   }
 
+  private MockOutputStream getStreamFromWriter(Writer writer) throws IOException {
+    PhysicalFsWriter pfr = (PhysicalFsWriter)((WriterImpl) writer).getPhysicalWriter();
+    return (MockOutputStream)pfr.getStream();
+  }
+
   /**
    * Test vectorization, non-acid, non-combine.
    * @throws Exception
@@ -2185,8 +2190,7 @@ public class TestInputOutputFormat {
       writer.addRow(new MyRow(i, 2*i));
     }
     writer.close();
-    ((MockOutputStream) ((WriterImpl) writer).getStream())
-        .setBlocks(new MockBlock("host0", "host1"));
+    getStreamFromWriter(writer).setBlocks(new MockBlock("host0", "host1"));
 
     // call getsplits
     conf.setInt(hive_metastoreConstants.BUCKET_COUNT, 3);
@@ -2226,8 +2230,7 @@ public class TestInputOutputFormat {
     }
     WriterImpl baseWriter = (WriterImpl) writer.getWriter();
     writer.close(false);
-    ((MockOutputStream) baseWriter.getStream())
-        .setBlocks(new MockBlock("host0", "host1"));
+    getStreamFromWriter(baseWriter).setBlocks(new MockBlock("host0", "host1"));
 
     // call getsplits
     HiveInputFormat<?, ?> inputFormat =
@@ -2305,7 +2308,7 @@ public class TestInputOutputFormat {
       writer.addRow(new MyRow(i, 2*i));
     }
     writer.close();
-    MockOutputStream outputStream = (MockOutputStream) ((WriterImpl) writer).getStream();
+    MockOutputStream outputStream = getStreamFromWriter(writer);
     outputStream.setBlocks(new MockBlock("host0", "host1"));
     int length0 = outputStream.file.length;
     writer =
@@ -2316,7 +2319,7 @@ public class TestInputOutputFormat {
       writer.addRow(new MyRow(i, 2*i));
     }
     writer.close();
-    outputStream = (MockOutputStream) ((WriterImpl) writer).getStream();
+    outputStream = getStreamFromWriter(writer);
     outputStream.setBlocks(new MockBlock("host1", "host2"));
 
     // call getsplits
@@ -2383,7 +2386,7 @@ public class TestInputOutputFormat {
     WriterImpl baseWriter = (WriterImpl) writer.getWriter();
     writer.close(false);
 
-    MockOutputStream outputStream = (MockOutputStream) baseWriter.getStream();
+    MockOutputStream outputStream = getStreamFromWriter(baseWriter);
     outputStream.setBlocks(new MockBlock("host1", "host2"));
 
     // write a delta file in partition 0
@@ -2394,7 +2397,7 @@ public class TestInputOutputFormat {
       writer.insert(10, new MyRow(i, 2*i));
     }
     WriterImpl deltaWriter = (WriterImpl) writer.getWriter();
-    outputStream = (MockOutputStream) deltaWriter.getStream();
+    outputStream = getStreamFromWriter(deltaWriter);
     writer.close(false);
     outputStream.setBlocks(new MockBlock("host1", "host2"));
 
@@ -2407,7 +2410,7 @@ public class TestInputOutputFormat {
               .bufferSize(1024)
               .inspector(inspector));
       orc.addRow(new MyRow(1, 2));
-      outputStream = (MockOutputStream) ((WriterImpl) orc).getStream();
+      outputStream = getStreamFromWriter(orc);
       orc.close();
       outputStream.setBlocks(new MockBlock("host3", "host4"));
     }


Mime
View raw message