hadoop-hive-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From zs...@apache.org
Subject svn commit: r770548 [1/3] - in /hadoop/hive/trunk: ./ ql/src/java/org/apache/hadoop/hive/ql/exec/ ql/src/java/org/apache/hadoop/hive/ql/io/ ql/src/java/org/apache/hadoop/hive/ql/metadata/ ql/src/java/org/apache/hadoop/hive/ql/parse/ ql/src/test/org/apa...
Date Fri, 01 May 2009 06:31:31 GMT
Author: zshao
Date: Fri May  1 06:31:30 2009
New Revision: 770548

URL: http://svn.apache.org/viewvc?rev=770548&view=rev
Log:
HIVE-352. Column-based storage format RCFile. (Yongqiang He via zshao)

Added:
    hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/CodecPool.java
    hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/RCFile.java
    hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/RCFileInputFormat.java
    hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/RCFileOutputFormat.java
    hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/RCFileRecordReader.java
    hadoop/hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/PerformTestRCFileAndSeqFile.java
    hadoop/hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/TestRCFile.java
    hadoop/hive/trunk/ql/src/test/queries/clientpositive/columnarserde_create_shortcut.q
    hadoop/hive/trunk/ql/src/test/queries/clientpositive/input_columnarserde.q
    hadoop/hive/trunk/ql/src/test/queries/clientpositive/rcfile_bigdata.q
    hadoop/hive/trunk/ql/src/test/queries/clientpositive/rcfile_columnar.q
    hadoop/hive/trunk/ql/src/test/results/clientpositive/columnarserde_create_shortcut.q.out
    hadoop/hive/trunk/ql/src/test/results/clientpositive/input_columnarserde.q.out
    hadoop/hive/trunk/ql/src/test/results/clientpositive/rcfile_bigdata.q.out
    hadoop/hive/trunk/ql/src/test/results/clientpositive/rcfile_columnar.q.out
    hadoop/hive/trunk/serde/src/java/org/apache/hadoop/hive/serde2/columnar/
    hadoop/hive/trunk/serde/src/java/org/apache/hadoop/hive/serde2/columnar/BytesRefArrayWritable.java
    hadoop/hive/trunk/serde/src/java/org/apache/hadoop/hive/serde2/columnar/BytesRefWritable.java
    hadoop/hive/trunk/serde/src/java/org/apache/hadoop/hive/serde2/columnar/ColumnarSerDe.java
    hadoop/hive/trunk/serde/src/java/org/apache/hadoop/hive/serde2/columnar/ColumnarStruct.java
    hadoop/hive/trunk/serde/src/java/org/apache/hadoop/hive/serde2/objectinspector/ColumnarStructObjectInspector.java
Modified:
    hadoop/hive/trunk/CHANGES.txt
    hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java
    hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
    hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
    hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java
    hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/Hive.g
    hadoop/hive/trunk/serde/src/java/org/apache/hadoop/hive/serde2/lazy/LazyFactory.java
    hadoop/hive/trunk/serde/src/java/org/apache/hadoop/hive/serde2/lazy/LazySimpleSerDe.java
    hadoop/hive/trunk/serde/src/java/org/apache/hadoop/hive/serde2/objectinspector/ObjectInspectorFactory.java

Modified: hadoop/hive/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/CHANGES.txt?rev=770548&r1=770547&r2=770548&view=diff
==============================================================================
--- hadoop/hive/trunk/CHANGES.txt (original)
+++ hadoop/hive/trunk/CHANGES.txt Fri May  1 06:31:30 2009
@@ -21,6 +21,9 @@
     HIVE-464. Add alter partition method to the meastore interface.
     (Prasad Chakka via zshao)
 
+    HIVE-352. Column-based storage format RCFile.
+    (Yongqiang He via zshao)
+
   IMPROVEMENTS
     HIVE-389. Option to build without ivy (jssarma)
 

Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java?rev=770548&r1=770547&r2=770548&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java Fri May  1 06:31:30 2009
@@ -61,6 +61,7 @@
 import org.apache.hadoop.hive.ql.plan.showTablesDesc;
 import org.apache.hadoop.hive.serde.Constants;
 import org.apache.hadoop.hive.serde2.MetadataTypedColumnsetSerDe;
+import org.apache.hadoop.hive.serde2.columnar.ColumnarSerDe;
 import org.apache.hadoop.hive.serde2.dynamic_type.DynamicSerDe;
 import org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe;
 import org.apache.hadoop.util.StringUtils;
@@ -530,6 +531,8 @@
           && !tbl.getSerializationLib().equals(
           LazySimpleSerDe.class.getName())
           && !tbl.getSerializationLib().equals(
+          ColumnarSerDe.class.getName())
+          && !tbl.getSerializationLib().equals(
           DynamicSerDe.class.getName())) {
         console
             .printError("Replace columns is not supported for this table. SerDe may be incompatible.");

Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java?rev=770548&r1=770547&r2=770548&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java Fri May  1 06:31:30 2009
@@ -42,6 +42,7 @@
 import org.apache.hadoop.hive.metastore.api.FieldSchema;
 import org.apache.hadoop.hive.ql.plan.*;
 import org.apache.hadoop.hive.ql.plan.PlanUtils.ExpressionTypes;
+import org.apache.hadoop.hive.ql.io.RCFile;
 import org.apache.hadoop.hive.ql.metadata.Table;
 import org.apache.hadoop.hive.ql.metadata.Partition;
 import org.apache.hadoop.hive.conf.HiveConf;
@@ -501,6 +502,30 @@
                                       keyClass, valClass, compressionType, codec));
 
   }
+  
+  /**
+   * Create a RCFile output stream based on job configuration Uses user supplied
+   * compression flag (rather than obtaining it from the Job Configuration)
+   * 
+   * @param jc
+   *          Job configuration
+   * @param fs
+   *          File System to create file in
+   * @param file
+   *          Path to be created
+   * @return output stream over the created rcfile
+   */
+  public static RCFile.Writer createRCFileWriter(JobConf jc, FileSystem fs,
+      Path file, boolean isCompressed) throws IOException {
+    CompressionCodec codec = null;
+    Class<?> codecClass = null;
+    if (isCompressed) {
+      codecClass = FileOutputFormat.getOutputCompressorClass(jc,
+          DefaultCodec.class);
+      codec = (CompressionCodec) ReflectionUtils.newInstance(codecClass, jc);
+    }
+    return new RCFile.Writer(fs, jc, file, null, codec);
+  }
 
   /**
    * Shamelessly cloned from GenericOptionsParser

Added: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/CodecPool.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/CodecPool.java?rev=770548&view=auto
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/CodecPool.java (added)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/CodecPool.java Fri May  1 06:31:30 2009
@@ -0,0 +1,156 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.io;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.Compressor;
+import org.apache.hadoop.io.compress.Decompressor;
+
+/**
+ * A global compressor/decompressor pool used to save and reuse 
+ * (possibly native) compression/decompression codecs.
+ */
+public class CodecPool {
+  private static final Log LOG = LogFactory.getLog(CodecPool.class);
+  
+  /**
+   * A global compressor pool used to save the expensive 
+   * construction/destruction of (possibly native) decompression codecs.
+   */
+  private static final Map<Class<Compressor>, List<Compressor>> compressorPool = 
+    new HashMap<Class<Compressor>, List<Compressor>>();
+  
+  /**
+   * A global decompressor pool used to save the expensive 
+   * construction/destruction of (possibly native) decompression codecs.
+   */
+  private static final Map<Class<Decompressor>, List<Decompressor>> decompressorPool = 
+    new HashMap<Class<Decompressor>, List<Decompressor>>();
+
+  private static <T> T borrow(Map<Class<T>, List<T>> pool,
+                             Class<? extends T> codecClass) {
+    T codec = null;
+    
+    // Check if an appropriate codec is available
+    synchronized (pool) {
+      if (pool.containsKey(codecClass)) {
+        List<T> codecList = pool.get(codecClass);
+        
+        if (codecList != null) {
+          synchronized (codecList) {
+            if (!codecList.isEmpty()) {
+              codec = codecList.remove(codecList.size()-1);
+            }
+          }
+        }
+      }
+    }
+    
+    return codec;
+  }
+
+  private static <T> void payback(Map<Class<T>, List<T>> pool, T codec) {
+    if (codec != null) {
+      Class<T> codecClass = (Class<T>) codec.getClass();
+      synchronized (pool) {
+        if (!pool.containsKey(codecClass)) {
+          pool.put(codecClass, new ArrayList<T>());
+        }
+
+        List<T> codecList = pool.get(codecClass);
+        synchronized (codecList) {
+          codecList.add(codec);
+        }
+      }
+    }
+  }
+  
+  /**
+   * Get a {@link Compressor} for the given {@link CompressionCodec} from the 
+   * pool or a new one.
+   *
+   * @param codec the <code>CompressionCodec</code> for which to get the 
+   *              <code>Compressor</code>
+   * @return <code>Compressor</code> for the given 
+   *         <code>CompressionCodec</code> from the pool or a new one
+   */
+  public static Compressor getCompressor(CompressionCodec codec) {
+    Compressor compressor = (Compressor) borrow(compressorPool, codec.getCompressorType());
+    if (compressor == null) {
+      compressor = codec.createCompressor();
+      LOG.info("Got brand-new compressor");
+    } else {
+      LOG.debug("Got recycled compressor");
+    }
+    return compressor;
+  }
+  
+  /**
+   * Get a {@link Decompressor} for the given {@link CompressionCodec} from the
+   * pool or a new one.
+   *  
+   * @param codec the <code>CompressionCodec</code> for which to get the 
+   *              <code>Decompressor</code>
+   * @return <code>Decompressor</code> for the given 
+   *         <code>CompressionCodec</code> the pool or a new one
+   */
+  public static Decompressor getDecompressor(CompressionCodec codec) {
+    Decompressor decompressor = (Decompressor) borrow(decompressorPool, codec.getDecompressorType());
+    if (decompressor == null) {
+      decompressor = codec.createDecompressor();
+      LOG.info("Got brand-new decompressor");
+    } else {
+      LOG.debug("Got recycled decompressor");
+    }
+    return decompressor;
+  }
+  
+  /**
+   * Return the {@link Compressor} to the pool.
+   * 
+   * @param compressor the <code>Compressor</code> to be returned to the pool
+   */
+  public static void returnCompressor(Compressor compressor) {
+    if (compressor == null) {
+      return;
+    }
+    compressor.reset();
+    payback(compressorPool, compressor);
+  }
+  
+  /**
+   * Return the {@link Decompressor} to the pool.
+   * 
+   * @param decompressor the <code>Decompressor</code> to be returned to the 
+   *                     pool
+   */
+  public static void returnDecompressor(Decompressor decompressor) {
+    if (decompressor == null) {
+      return;
+    }
+    decompressor.reset();
+    payback(decompressorPool, decompressor);
+  }
+}

Added: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/RCFile.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/RCFile.java?rev=770548&view=auto
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/RCFile.java (added)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/RCFile.java Fri May  1 06:31:30 2009
@@ -0,0 +1,1352 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.io;
+
+import java.io.BufferedOutputStream;
+import java.io.DataInput;
+import java.io.DataInputStream;
+import java.io.DataOutput;
+import java.io.DataOutputStream;
+import java.io.EOFException;
+import java.io.IOException;
+import java.rmi.server.UID;
+import java.security.MessageDigest;
+import java.util.Arrays;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.ChecksumException;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.io.CodecPool;
+import org.apache.hadoop.hive.serde2.columnar.BytesRefArrayWritable;
+import org.apache.hadoop.hive.serde2.columnar.BytesRefWritable;
+import org.apache.hadoop.io.DataInputBuffer;
+import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.VersionMismatchException;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableUtils;
+import org.apache.hadoop.io.SequenceFile.Metadata;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.CompressionInputStream;
+import org.apache.hadoop.io.compress.CompressionOutputStream;
+import org.apache.hadoop.io.compress.Compressor;
+import org.apache.hadoop.io.compress.Decompressor;
+import org.apache.hadoop.util.Progressable;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hadoop.util.StringUtils;
+
+/**
+ * <code>RCFile</code>s, short of Record Columnar File, are flat files
+ * consisting of binary key/value pairs, which shares much similarity with
+ * <code>SequenceFile</code>.
+ * 
+ * RCFile stores columns of a table in a record columnar way. It first
+ * partitions rows horizontally into row splits. and then it vertically
+ * partitions each row split in a columnar way. RCFile first stores the meta
+ * data of a row split, as the key part of a record, and all the data of a row
+ * split as the value part. When writing, RCFile.Writer first holds records'
+ * value bytes in memory, and determines a row split if the raw bytes size of
+ * buffered records overflow a given parameter<tt>Writer.columnsBufferSize</tt>,
+ * which can be set like: <code>conf.setInt(COLUMNS_BUFFER_SIZE_CONF_STR,
+          4 * 1024 * 1024)</code> .
+ * <p>
+ * <code>RCFile</code> provides {@link Writer}, {@link Reader} and classes for
+ * writing, reading respectively.
+ * </p>
+ * 
+ * <p>
+ * RCFile stores columns of a table in a record columnar way. It first
+ * partitions rows horizontally into row splits. and then it vertically
+ * partitions each row split in a columnar way. RCFile first stores the meta
+ * data of a row split, as the key part of a record, and all the data of a row
+ * split as the value part.
+ * </p>
+ * 
+ * <p>
+ * RCFile compresses values in a more fine-grained manner then record level
+ * compression. However, It currently does not support compress the key part
+ * yet. The actual compression algorithm used to compress key and/or values can
+ * be specified by using the appropriate {@link CompressionCodec}.
+ * </p>
+ * 
+ * <p>
+ * The {@link Reader} is used to read and explain the bytes of RCFile.
+ * </p>
+ * 
+ * <h4 id="Formats">RCFile Formats</h4>
+ * 
+ * 
+ * <h5 id="Header">RC Header</h5>
+ * <ul>
+ * <li>version - 3 bytes of magic header <b>SEQ</b>, followed by 1 byte of
+ * actual version number (e.g. SEQ4 or SEQ6)</li>
+ * <li>keyClassName -KeyBuffer's class name</li>
+ * <li>valueClassName - ValueBuffer's class name</li>
+ * <li>compression - A boolean which specifies if compression is turned on for
+ * keys/values in this file.</li>
+ * <li>blockCompression - always false. this field is kept for compatible with
+ * SequeceFile's format</li>
+ * <li>compression codec - <code>CompressionCodec</code> class which is used for
+ * compression of keys and/or values (if compression is enabled).</li>
+ * <li>metadata - {@link Metadata} for this file.</li>
+ * <li>sync - A sync marker to denote end of the header.</li>
+ * </ul>
+ * 
+ * <h5>RCFile Format</h5>
+ * <ul>
+ * <li><a href="#Header">Header</a></li>
+ * <li>Record
+ * <li>Key part
+ * <ul>
+ * <li>Record length in bytes</li>
+ * <li>Key length in bytes</li>
+ * <li>Number_of_rows_in_this_record(vint)</li>
+ * <li>Column_1_ondisk_length(vint)</li>
+ * <li>Column_1_row_1_value_plain_length</li>
+ * <li>Column_1_row_2_value_plain_length</li>
+ * <li>...</li>
+ * <li>Column_2_ondisk_length(vint)</li>
+ * <li>Column_2_row_1_value_plain_length</li>
+ * <li>Column_2_row_2_value_plain_length</li>
+ * <li>...</li>
+ * </ul>
+ * </li>
+ * </li>
+ * <li>Value part
+ * <ul>
+ * <li>Compressed or plain data of [column_1_row_1_value,
+ * column_1_row_2_value,....]</li>
+ * <li>Compressed or plain data of [column_2_row_1_value,
+ * column_2_row_2_value,....]</li>
+ * </ul>
+ * </li>
+ * </ul>
+ * 
+ */
+public class RCFile {
+
+  private static final Log LOG = LogFactory.getLog(RCFile.class);
+
+  public static String RECORD_INTERVAL_CONF_STR = "hive.io.rcfile.record.interval";
+
+  public static String COLUMN_NUMBER_METADATA_STR = "hive.io.rcfile.column.number";
+
+  public static String COLUMN_NUMBER_CONF_STR = "hive.io.rcfile.column.number.conf";
+
+  /*
+   * these header and Sync are kept from SequenceFile, for compatible of
+   * SequenceFile's format.
+   */
+  private static final byte VERSION_WITH_METADATA = (byte) 6;
+  private static byte[] VERSION = new byte[] { (byte) 'S', (byte) 'E',
+      (byte) 'Q', VERSION_WITH_METADATA };
+
+  private static final int SYNC_ESCAPE = -1; // "length" of sync entries
+  private static final int SYNC_HASH_SIZE = 16; // number of bytes in hash
+  private static final int SYNC_SIZE = 4 + SYNC_HASH_SIZE; // escape + hash
+
+  /** The number of bytes between sync points. */
+  public static final int SYNC_INTERVAL = 100 * SYNC_SIZE;
+
+  /**
+   * KeyBuffer is the key of each record in RCFile. Its on-disk layout is as
+   * below:
+   * 
+   * <ul>
+   * <li>record length in bytes,it is the sum of bytes used to store the key
+   * part and the value part.</li>
+   * <li>Key length in bytes, it is how many bytes used by the key part.</li>
+   * <li>number_of_rows_in_this_record(vint),</li>
+   * <li>column_1_ondisk_length(vint),</li>
+   * <li>column_1_row_1_value_plain_length,</li>
+   * <li>column_1_row_2_value_plain_length,</li>
+   * <li>....</li>
+   * <li>column_2_ondisk_length(vint),</li>
+   * <li>column_2_row_1_value_plain_length,</li>
+   * <li>column_2_row_2_value_plain_length,</li>
+   * <li>.... .</li>
+   * <li>{the end of the key part}</li>
+   * </ul>
+   */
+  static class KeyBuffer implements Writable {
+    // each column's value length in a split
+    private int[] eachColumnValueLen = null;
+    private int[] eachColumnUncompressedValueLen = null;
+    // stores each cell's length of a column in one DataOutputBuffer element
+    private DataOutputBuffer[] allCellValLenBuffer = null;
+    // how many rows in this split
+    private int numberRows = 0;
+    // how many columns
+    private int columnNumber = 0;
+
+    KeyBuffer(int columnNumber) {
+      this(0, columnNumber);
+    }
+
+    KeyBuffer(int numberRows, int columnNum) {
+      columnNumber = columnNum;
+      eachColumnValueLen = new int[columnNumber];
+      eachColumnUncompressedValueLen = new int[columnNumber];
+      allCellValLenBuffer = new DataOutputBuffer[columnNumber];
+      this.numberRows = numberRows;
+    }
+
+    /**
+     * add in a new column's meta data.
+     * 
+     * @param columnValueLen
+     *          this total bytes number of this column's values in this split
+     * @param colValLenBuffer
+     *          each cell's length of this column's in this split
+     */
+    void setColumnLenInfo(int columnValueLen, DataOutputBuffer colValLenBuffer,
+        int columnUncompressedValueLen, int columnIndex) {
+      eachColumnValueLen[columnIndex] = columnValueLen;
+      eachColumnUncompressedValueLen[columnIndex] = columnUncompressedValueLen;
+      allCellValLenBuffer[columnIndex] = colValLenBuffer;
+    }
+
+    @Override
+    public void readFields(DataInput in) throws IOException {
+      eachColumnValueLen = new int[columnNumber];
+      eachColumnUncompressedValueLen = new int[columnNumber];
+      allCellValLenBuffer = new DataOutputBuffer[columnNumber];
+
+      numberRows = WritableUtils.readVInt(in);
+      for (int i = 0; i < columnNumber; i++) {
+        eachColumnValueLen[i] = WritableUtils.readVInt(in);
+        eachColumnUncompressedValueLen[i] = WritableUtils.readVInt(in);
+        int bufLen = WritableUtils.readVInt(in);
+        if (allCellValLenBuffer[i] == null)
+          allCellValLenBuffer[i] = new DataOutputBuffer();
+        else
+          allCellValLenBuffer[i].reset();
+        allCellValLenBuffer[i].write(in, bufLen);
+      }
+    }
+
+    @Override
+    public void write(DataOutput out) throws IOException {
+      // out.writeInt(numberRows);
+      WritableUtils.writeVLong(out, numberRows);
+      for (int i = 0; i < eachColumnValueLen.length; i++) {
+        WritableUtils.writeVLong(out, eachColumnValueLen[i]);
+        WritableUtils.writeVLong(out, eachColumnUncompressedValueLen[i]);
+        DataOutputBuffer colRowsLenBuf = allCellValLenBuffer[i];
+        int bufLen = colRowsLenBuf.getLength();
+        WritableUtils.writeVLong(out, bufLen);
+        out.write(colRowsLenBuf.getData(), 0, bufLen);
+      }
+    }
+
+    /**
+     * get number of bytes to store the keyBuffer
+     * 
+     * @return number of bytes used to store this KeyBuffer on disk
+     * @throws IOException
+     */
+    public int getSize() throws IOException {
+      int ret = 0;
+      ret += WritableUtils.getVIntSize(numberRows);
+      for (int i = 0; i < eachColumnValueLen.length; i++) {
+        ret += WritableUtils.getVIntSize(eachColumnValueLen[i]);
+        ret += WritableUtils.getVIntSize(eachColumnUncompressedValueLen[i]);
+        ret += WritableUtils.getVIntSize(allCellValLenBuffer[i].getLength());
+        ret += allCellValLenBuffer[i].getLength();
+      }
+
+      return ret;
+    }
+  }
+
+  /**
+   * ValueBuffer is the value of each record in RCFile. Its on-disk layout is as
+   * below:
+   * <ul>
+   * <li>Compressed or plain data of [column_1_row_1_value,
+   * column_1_row_2_value,....]</li>
+   * <li>Compressed or plain data of [column_2_row_1_value,
+   * column_2_row_2_value,....]</li>
+   * </ul>
+   */
+  static class ValueBuffer implements Writable {
+    // used to load columns' value into memory
+    private DataOutputBuffer[] loadedColumnsValueBuffer = null;
+
+    boolean inited = false;
+
+    // used for readFields
+    KeyBuffer keyBuffer;
+    private int columnNumber = 0;
+
+    // set true for columns that needed to skip loading into memory.
+    boolean[] skippedColIDs = null;
+
+    CompressionCodec codec;
+
+    Decompressor valDecompressor = null;
+    DataInputBuffer decompressBuffer = new DataInputBuffer();
+    CompressionInputStream deflatFilter = null;
+
+    public ValueBuffer(KeyBuffer keyBuffer) throws IOException {
+      this(keyBuffer, null);
+    }
+
+    public ValueBuffer(KeyBuffer keyBuffer, boolean[] skippedColIDs)
+        throws IOException {
+      this(keyBuffer, keyBuffer.columnNumber, skippedColIDs, null);
+    }
+
+    public ValueBuffer(KeyBuffer currentKey, int columnNumber,
+        boolean[] skippedCols, CompressionCodec codec) throws IOException {
+
+      keyBuffer = currentKey;
+      this.columnNumber = columnNumber;
+
+      if (skippedCols != null && skippedCols.length > 0) {
+        skippedColIDs = skippedCols;
+      } else {
+        skippedColIDs = new boolean[columnNumber];
+        for (int i = 0; i < skippedColIDs.length; i++)
+          skippedColIDs[i] = false;
+      }
+
+      int skipped = 0;
+      if (skippedColIDs != null) {
+        for (boolean currentSkip : skippedColIDs)
+          if (currentSkip)
+            skipped++;
+      }
+      loadedColumnsValueBuffer = new DataOutputBuffer[columnNumber - skipped];
+      this.codec = codec;
+      if (codec != null) {
+        valDecompressor = CodecPool.getDecompressor(codec);
+        deflatFilter = codec.createInputStream(decompressBuffer,
+            valDecompressor);
+      }
+
+      for (int k = 0, readIndex = 0; k < columnNumber; k++) {
+        if (skippedColIDs[k])
+          continue;
+        loadedColumnsValueBuffer[readIndex] = new DataOutputBuffer();
+        readIndex++;
+      }
+    }
+
+    public void setColumnValueBuffer(DataOutputBuffer valBuffer, int addIndex) {
+      loadedColumnsValueBuffer[addIndex] = valBuffer;
+    }
+
+    DataOutputBuffer compressedData = new DataOutputBuffer();
+
+    @Override
+    public void readFields(DataInput in) throws IOException {
+      int addIndex = 0;
+      int skipTotal = 0;
+      for (int i = 0; i < columnNumber; i++) {
+        int vaRowsLen = keyBuffer.eachColumnValueLen[i];
+        // skip this column
+        if (skippedColIDs[i]) {
+          skipTotal += vaRowsLen;
+          continue;
+        }
+
+        if (skipTotal != 0) {
+          in.skipBytes(skipTotal);
+          skipTotal = 0;
+        }
+
+        DataOutputBuffer valBuf = loadedColumnsValueBuffer[addIndex];
+        valBuf.reset();
+        if (codec != null) {
+          decompressBuffer.reset();
+          DataInputStream valueIn = new DataInputStream(deflatFilter);
+          deflatFilter.resetState();
+          compressedData.reset();
+          compressedData.write(in, vaRowsLen);
+          decompressBuffer.reset(compressedData.getData(), vaRowsLen);
+          valBuf.write(valueIn, keyBuffer.eachColumnUncompressedValueLen[i]);
+        } else {
+          valBuf.write(in, vaRowsLen);
+        }
+        addIndex++;
+      }
+
+      if (skipTotal != 0) {
+        in.skipBytes(skipTotal);
+      }
+    }
+
+    @Override
+    public void write(DataOutput out) throws IOException {
+      for (int i = 0; i < loadedColumnsValueBuffer.length; i++) {
+        DataOutputBuffer currentBuf = loadedColumnsValueBuffer[i];
+        out.write(currentBuf.getData(), 0, currentBuf.getLength());
+      }
+    }
+
+    public void clearColumnBuffer() throws IOException {
+      decompressBuffer.reset();
+    }
+
+    public void close() {
+      for (int i = 0; i < loadedColumnsValueBuffer.length; i++) {
+        IOUtils.closeStream(loadedColumnsValueBuffer[i]);
+      }
+      if (codec != null) {
+        IOUtils.closeStream(decompressBuffer);
+        CodecPool.returnDecompressor(valDecompressor);
+      }
+    }
+  }
+
+  /**
+   * Write KeyBuffer/ValueBuffer pairs to a RCFile. RCFile's format is
+   * compatible with SequenceFile's.
+   * 
+   */
+  public static class Writer {
+
+    Configuration conf;
+    FSDataOutputStream out;
+
+    CompressionCodec codec = null;
+    Metadata metadata = null;
+    Compressor compressor = null;
+
+    // Insert a globally unique 16-byte value every few entries, so that one
+    // can seek into the middle of a file and then synchronize with record
+    // starts and ends by scanning for this value.
+    long lastSyncPos; // position of last sync
+    byte[] sync; // 16 random bytes
+    {
+      try {
+        MessageDigest digester = MessageDigest.getInstance("MD5");
+        long time = System.currentTimeMillis();
+        digester.update((new UID() + "@" + time).getBytes());
+        sync = digester.digest();
+      } catch (Exception e) {
+        throw new RuntimeException(e);
+      }
+    }
+
+    // how many records the writer buffers before it writes to disk
+    private int RECORD_INTERVAL = Integer.MAX_VALUE;
+    // the max size of memory for buffering records before writes them out
+    private int columnsBufferSize = 4 * 1024 * 1024; // 4M
+    // the conf string for COLUMNS_BUFFER_SIZE
+    public static String COLUMNS_BUFFER_SIZE_CONF_STR = "hive.io.rcfile.record.buffer.size";
+
+    // how many records already buffered
+    private int bufferedRecords = 0;
+
+    DataOutputBuffer[] compressionBuffer;
+    CompressionOutputStream[] deflateFilter = null;
+    DataOutputStream[] deflateOut = null;
+    private ColumnBuffer[] columnBuffers;
+
+    DataOutputBuffer keyCompressionBuffer;
+    CompressionOutputStream keyDeflateFilter;
+    DataOutputStream keyDeflateOut;
+    Compressor keyCompressor;
+
+    private int columnNumber = 0;
+
+    private int[] columnValuePlainLength;
+
+    KeyBuffer key = null;
+    ValueBuffer value = null;
+
+    /*
+     * used for buffering appends before flush them out
+     */
+    class ColumnBuffer {
+      // used for buffer a column's values
+      DataOutputBuffer columnValBuffer;
+      // used to store each value's length
+      DataOutputBuffer valLenBuffer;
+
+      ColumnBuffer() throws IOException {
+        columnValBuffer = new DataOutputBuffer();
+        valLenBuffer = new DataOutputBuffer();
+      }
+
+      public void append(BytesRefWritable data) throws IOException {
+        data.writeDataTo(columnValBuffer);
+        WritableUtils.writeVLong(valLenBuffer, data.getLength());
+      }
+
+      public void clear() throws IOException {
+        valLenBuffer.reset();
+        columnValBuffer.reset();
+      }
+    }
+
+    public long getLength() throws IOException {
+      return out.getPos();
+    }
+
+    /** Constructs a RCFile Writer. */
+    public Writer(FileSystem fs, Configuration conf, Path name)
+        throws IOException {
+      this(fs, conf, name, null, new Metadata(), null);
+    }
+
+    /**
+     * Constructs a RCFile Writer.
+     * 
+     * @param fs
+     *          the file system used
+     * @param conf
+     *          the configuration file
+     * @param name
+     *          the file name
+     * @throws IOException
+     */
+    public Writer(FileSystem fs, Configuration conf, Path name,
+        Progressable progress, CompressionCodec codec) throws IOException {
+      this(fs, conf, name, null, new Metadata(), codec);
+    }
+
+    /**
+     * Constructs a RCFile Writer.
+     * 
+     * @param fs
+     *          the file system used
+     * @param conf
+     *          the configuration file
+     * @param name
+     *          the file name
+     * @param progress
+     * @param metadata
+     * @throws IOException
+     */
+    public Writer(FileSystem fs, Configuration conf, Path name,
+        Progressable progress, Metadata metadata, CompressionCodec codec)
+        throws IOException {
+      this(fs, conf, name, fs.getConf().getInt("io.file.buffer.size", 4096), fs
+          .getDefaultReplication(), fs.getDefaultBlockSize(), progress,
+          metadata, codec);
+    }
+
+    /**
+     * 
+     * Constructs a RCFile Writer.
+     * 
+     * @param fs
+     *          the file system used
+     * @param conf
+     *          the configuration file
+     * @param name
+     *          the file name
+     * @param bufferSize
+     * @param replication
+     * @param blockSize
+     * @param progress
+     * @param metadata
+     * @throws IOException
+     */
+    public Writer(FileSystem fs, Configuration conf, Path name, int bufferSize,
+        short replication, long blockSize, Progressable progress,
+        Metadata metadata, CompressionCodec codec) throws IOException {
+      RECORD_INTERVAL = conf.getInt(RECORD_INTERVAL_CONF_STR, RECORD_INTERVAL);
+      columnNumber = conf.getInt(COLUMN_NUMBER_CONF_STR, 0);
+
+      if (metadata == null)
+        metadata = new Metadata();
+      metadata.set(new Text(COLUMN_NUMBER_METADATA_STR), new Text(""
+          + columnNumber));
+
+      columnsBufferSize = conf.getInt(COLUMNS_BUFFER_SIZE_CONF_STR,
+          4 * 1024 * 1024);
+
+      columnValuePlainLength = new int[columnNumber];
+
+      columnBuffers = new ColumnBuffer[columnNumber];
+      for (int i = 0; i < columnNumber; i++) {
+        columnBuffers[i] = new ColumnBuffer();
+      }
+
+      init(name, conf, fs.create(name, true, bufferSize, replication,
+          blockSize, progress), codec, metadata);
+      initializeFileHeader();
+      writeFileHeader();
+      finalizeFileHeader();
+      key = new KeyBuffer(columnNumber);
+      value = new ValueBuffer(key);
+    }
+
+    /** Write the initial part of file header. */
+    void initializeFileHeader() throws IOException {
+      out.write(VERSION);
+    }
+
+    /** Write the final part of file header. */
+    void finalizeFileHeader() throws IOException {
+      out.write(sync); // write the sync bytes
+      out.flush(); // flush header
+    }
+
+    boolean isCompressed() {
+      return codec != null;
+    }
+
+    /** Write and flush the file header. */
+    void writeFileHeader() throws IOException {
+      Text.writeString(out, KeyBuffer.class.getName());
+      Text.writeString(out, ValueBuffer.class.getName());
+
+      out.writeBoolean(isCompressed());
+      out.writeBoolean(false);
+
+      if (isCompressed()) {
+        Text.writeString(out, (codec.getClass()).getName());
+      }
+      metadata.write(out);
+    }
+
+    void init(Path name, Configuration conf, FSDataOutputStream out,
+        CompressionCodec codec, Metadata metadata) throws IOException {
+      this.conf = conf;
+      this.out = out;
+      this.codec = codec;
+      this.metadata = metadata;
+      if (this.codec != null) {
+        ReflectionUtils.setConf(codec, this.conf);
+        compressor = CodecPool.getCompressor(codec);
+
+        compressionBuffer = new DataOutputBuffer[columnNumber];
+        deflateFilter = new CompressionOutputStream[columnNumber];
+        deflateOut = new DataOutputStream[columnNumber];
+        for (int i = 0; i < columnNumber; i++) {
+          compressionBuffer[i] = new DataOutputBuffer();
+          deflateFilter[i] = codec.createOutputStream(compressionBuffer[i],
+              compressor);
+          deflateOut[i] = new DataOutputStream(new BufferedOutputStream(
+              deflateFilter[i]));
+        }
+        keyCompressor = CodecPool.getCompressor(codec);
+        keyCompressionBuffer = new DataOutputBuffer();
+        keyDeflateFilter = codec.createOutputStream(keyCompressionBuffer,
+            keyCompressor);
+        keyDeflateOut = new DataOutputStream(new BufferedOutputStream(
+            keyDeflateFilter));
+      }
+    }
+
+    /** Returns the compression codec of data in this file. */
+    public CompressionCodec getCompressionCodec() {
+      return codec;
+    }
+
+    /** create a sync point */
+    public synchronized void sync() throws IOException {
+      if (sync != null && lastSyncPos != out.getPos()) {
+        out.writeInt(SYNC_ESCAPE); // mark the start of the sync
+        out.write(sync); // write sync
+        lastSyncPos = out.getPos(); // update lastSyncPos
+      }
+    }
+
+    /** Returns the configuration of this file. */
+    Configuration getConf() {
+      return conf;
+    }
+
+    synchronized void checkAndWriteSync() throws IOException {
+      if (sync != null && out.getPos() >= lastSyncPos + SYNC_INTERVAL) {
+        sync();
+      }
+    }
+
+    private int columnBufferSize = 0;
+
+    /**
+     * append a row of values. Currently it only can accept <
+     * {@link BytesRefArrayWritable}. If its<code>size()</code> is less than the
+     * column number in the file, zero bytes are appended for the empty columns.
+     * If its size() is greater then the column number in the file, the exceeded
+     * columns' bytes are ignored.
+     * 
+     * @param val
+     * @throws IOException
+     */
+    public synchronized void append(Writable val) throws IOException {
+
+      if (!(val instanceof BytesRefArrayWritable))
+        throw new UnsupportedOperationException(
+            "Currently the writer can only accept BytesRefArrayWritable");
+
+      BytesRefArrayWritable columns = (BytesRefArrayWritable) val;
+      int size = columns.size();
+      for (int i = 0; i < size; i++) {
+        BytesRefWritable cu = columns.get(i);
+        int plainLen = cu.getLength();
+        columnBufferSize += plainLen;
+        columnValuePlainLength[i] += plainLen;
+        columnBuffers[i].append(cu);
+      }
+
+      if (size < columnNumber) {
+        for (int i = columns.size(); i < columnNumber; i++) {
+          columnBuffers[i].append(BytesRefWritable.ZeroBytesRefWritable);
+        }
+      }
+
+      bufferedRecords++;
+      if ((columnBufferSize > columnsBufferSize)
+          || (bufferedRecords >= RECORD_INTERVAL)) {
+        flushRecords();
+      }
+    }
+
+    synchronized void flushRecords() throws IOException {
+
+      key.numberRows = bufferedRecords;
+      value.keyBuffer = key;
+
+      int valueLength = 0;
+      for (int columnIndex = 0; columnIndex < columnNumber; columnIndex++) {
+        ColumnBuffer currentBuf = columnBuffers[columnIndex];
+
+        DataOutputBuffer columnValue = currentBuf.columnValBuffer;
+
+        if (isCompressed()) {
+          compressionBuffer[columnIndex].reset();
+          deflateFilter[columnIndex].resetState();
+          deflateOut[columnIndex].write(columnValue.getData(), 0, columnValue
+              .getLength());
+          deflateOut[columnIndex].flush();
+          deflateFilter[columnIndex].finish();
+          int colLen = compressionBuffer[columnIndex].getLength();
+          key.setColumnLenInfo(colLen, currentBuf.valLenBuffer,
+              columnValuePlainLength[columnIndex], columnIndex);
+          value.setColumnValueBuffer(compressionBuffer[columnIndex],
+              columnIndex);
+          valueLength += colLen;
+        } else {
+          int colLen = columnValuePlainLength[columnIndex];
+          key.setColumnLenInfo(colLen, currentBuf.valLenBuffer, colLen,
+              columnIndex);
+          value.setColumnValueBuffer(columnValue, columnIndex);
+          valueLength += colLen;
+        }
+        columnValuePlainLength[columnIndex] = 0;
+      }
+
+      int keyLength = key.getSize();
+      if (keyLength < 0)
+        throw new IOException("negative length keys not allowed: " + key);
+
+      // Write the record out
+      checkAndWriteSync(); // sync
+      out.writeInt(keyLength + valueLength); // total record length
+      out.writeInt(keyLength); // key portion length
+      if (!isCompressed()) {
+        out.writeInt(keyLength);
+        key.write(out); // key
+      } else {
+        keyCompressionBuffer.reset();
+        keyDeflateFilter.resetState();
+        key.write(keyDeflateOut);
+        keyDeflateOut.flush();
+        keyDeflateFilter.finish();
+        int compressedKeyLen = keyCompressionBuffer.getLength();
+        out.writeInt(compressedKeyLen);
+        out.write(keyCompressionBuffer.getData(), 0, compressedKeyLen);
+      }
+      value.write(out);// value
+
+      // clear the columnBuffers
+      clearColumnBuffers();
+
+      bufferedRecords = 0;
+      columnBufferSize = 0;
+    }
+
+    private void clearColumnBuffers() throws IOException {
+      for (int i = 0; i < columnNumber; i++) {
+        columnBuffers[i].clear();
+      }
+    }
+
+    public synchronized void close() throws IOException {
+      if (bufferedRecords > 0)
+        flushRecords();
+      clearColumnBuffers();
+
+      if (isCompressed()) {
+        for (int i = 0; i < columnNumber; i++) {
+          deflateFilter[i].close();
+          IOUtils.closeStream(deflateOut[i]);
+        }
+        keyDeflateFilter.close();
+        IOUtils.closeStream(keyDeflateOut);
+        CodecPool.returnCompressor(keyCompressor);
+        keyCompressor = null;
+        CodecPool.returnCompressor(compressor);
+        compressor = null;
+      }
+
+      if (out != null) {
+
+        // Close the underlying stream if we own it...
+        out.flush();
+        out.close();
+        out = null;
+      }
+    }
+  }
+
+  public static String SKIP_COLUMN_IDS_CONF_STR = "hive.io.rcfile.skipcolumn.ids";
+
+  /**
+   * Sets skip columns' ids(start from zero) for RCFile's Reader. Once a column
+   * is included in the list, RCFile's reader will just skip its value.
+   * 
+   */
+  public static void setSkipColumnIDs(Configuration conf, int[] ids) {
+    String id = null;
+    if (ids != null) {
+      for (int i = 0; i < ids.length; i++) {
+        if (i == 0) {
+          id = "" + ids[i];
+        } else {
+          id = id + StringUtils.COMMA_STR + ids[i];
+        }
+      }
+    }
+
+    if (id == null || id.length() <= 0) {
+      conf.set(SKIP_COLUMN_IDS_CONF_STR, "");
+      return;
+    }
+
+    conf.set(SKIP_COLUMN_IDS_CONF_STR, id);
+  }
+
+  /**
+   * Returns an array of column ids(start from zero) which is set in the given
+   * parameter <tt>conf</tt>.
+   */
+  public static int[] getSkipColumnIDs(Configuration conf) {
+    String skips = conf.get(SKIP_COLUMN_IDS_CONF_STR, "");
+    String[] list = StringUtils.split(skips);
+    int[] result = new int[list.length];
+    for (int i = 0; i < list.length; i++) {
+      result[i] = Integer.parseInt(list[i]);
+    }
+    return result;
+  }
+
+  /**
+   * Clears the skip column ids set in the conf.
+   */
+  public static void clearSkipColumnIDs(Configuration conf) {
+    conf.set(SKIP_COLUMN_IDS_CONF_STR, "");
+  }
+
+  /**
+   * Read KeyBuffer/ValueBuffer pairs from a RCFile.
+   * 
+   */
+  public static class Reader {
+
+    private Path file;
+    private FSDataInputStream in;
+
+    private byte version;
+
+    private CompressionCodec codec = null;
+    private Metadata metadata = null;
+
+    private byte[] sync = new byte[SYNC_HASH_SIZE];
+    private byte[] syncCheck = new byte[SYNC_HASH_SIZE];
+    private boolean syncSeen;
+
+    private long end;
+    private int currentKeyLength;
+    private int currentRecordLength;
+
+    private Configuration conf;
+
+    private ValueBuffer currentValue;
+
+    private boolean[] skippedColIDs = null;
+
+    private int readRowsIndexInBuffer = 0;
+
+    private int recordsNumInValBuffer = 0;
+
+    private int columnNumber = 0;
+
+    private int loadColumnNum;
+
+    private int passedRowsNum = 0;
+
+    private int[] columnRowReadIndex = null;
+    private DataInputBuffer[] colValLenBufferReadIn;
+    private boolean decompress = false;
+
+    private Decompressor keyDecompressor;
+    DataOutputBuffer keyDecompressedData = new DataOutputBuffer();
+
+    /** Create a new RCFile reader. */
+    public Reader(FileSystem fs, Path file, Configuration conf)
+        throws IOException {
+      this(fs, file, conf.getInt("io.file.buffer.size", 4096), conf, 0, fs
+          .getFileStatus(file).getLen());
+    }
+
+    /** Create a new RCFile reader. */
+    public Reader(FileSystem fs, Path file, int bufferSize, Configuration conf,
+        long start, long length) throws IOException {
+      conf.setInt("io.file.buffer.size", bufferSize);
+      this.file = file;
+      in = fs.open(file, bufferSize);
+      this.conf = conf;
+      end = start + length;
+      if (start > 0) {
+        seek(0);
+        init();
+        seek(start);
+      } else {
+        init();
+      }
+      columnNumber = Integer.parseInt(metadata.get(
+          new Text(COLUMN_NUMBER_METADATA_STR)).toString());
+
+      int[] skipIDs = getSkipColumnIDs(conf);
+      skippedColIDs = new boolean[columnNumber];
+      for (int i = 0; i < skippedColIDs.length; i++) {
+        skippedColIDs[i] = false;
+      }
+      for (int skip : skipIDs) {
+        if (skip < columnNumber)
+          skippedColIDs[skip] = true;
+      }
+
+      loadColumnNum = columnNumber;
+      if (skippedColIDs != null && skippedColIDs.length > 0) {
+        for (int i = 0; i < skippedColIDs.length; i++) {
+          if (skippedColIDs[i])
+            loadColumnNum -= 1;
+        }
+      }
+
+      colValLenBufferReadIn = new DataInputBuffer[columnNumber];
+      columnRowReadIndex = new int[columnNumber];
+      for (int i = 0; i < columnNumber; i++) {
+        columnRowReadIndex[i] = 0;
+        if (!skippedColIDs[i])
+          colValLenBufferReadIn[i] = new DataInputBuffer();
+      }
+
+      currentKey = createKeyBuffer();
+      currentValue = new ValueBuffer(null, columnNumber, skippedColIDs, codec);
+    }
+
+    private void init() throws IOException {
+      byte[] versionBlock = new byte[VERSION.length];
+      in.readFully(versionBlock);
+
+      if ((versionBlock[0] != VERSION[0]) || (versionBlock[1] != VERSION[1])
+          || (versionBlock[2] != VERSION[2]))
+        throw new IOException(file + " not a RCFile");
+
+      // Set 'version'
+      version = versionBlock[3];
+      if (version > VERSION[3])
+        throw new VersionMismatchException(VERSION[3], version);
+
+      try {
+        Class<?> keyCls = Class.forName(Text.readString(in));
+        Class<?> valCls = Class.forName(Text.readString(in));
+        if (!keyCls.equals(KeyBuffer.class)
+            || !valCls.equals(ValueBuffer.class))
+          throw new IOException(file + " not a RCFile");
+      } catch (ClassNotFoundException e) {
+        throw new IOException(file + " not a RCFile", e);
+      }
+
+      if (version > 2) { // if version > 2
+        decompress = in.readBoolean(); // is compressed?
+      } else {
+        decompress = false;
+      }
+
+      // is block-compressed? it should be always false.
+      boolean blkCompressed = in.readBoolean();
+      if (blkCompressed) {
+        throw new IOException(file + " not a RCFile.");
+      }
+
+      // setup the compression codec
+      if (decompress) {
+        String codecClassname = Text.readString(in);
+        try {
+          Class<? extends CompressionCodec> codecClass = conf.getClassByName(
+              codecClassname).asSubclass(CompressionCodec.class);
+          codec = (CompressionCodec) ReflectionUtils.newInstance(codecClass,
+              conf);
+        } catch (ClassNotFoundException cnfe) {
+          throw new IllegalArgumentException(
+              "Unknown codec: " + codecClassname, cnfe);
+        }
+        keyDecompressor = CodecPool.getDecompressor(codec);
+      }
+
+      metadata = new Metadata();
+      if (version >= VERSION_WITH_METADATA) { // if version >= 6
+        metadata.readFields(in);
+      }
+
+      if (version > 1) { // if version > 1
+        in.readFully(sync); // read sync bytes
+      }
+    }
+
+    /** Return the current byte position in the input file. */
+    public synchronized long getPosition() throws IOException {
+      return in.getPos();
+    }
+
+    /**
+     * Set the current byte position in the input file.
+     * 
+     * <p>
+     * The position passed must be a position returned by
+     * {@link RCFile.Writer#getLength()} when writing this file. To seek to an
+     * arbitrary position, use {@link RCFile.Reader#sync(long)}. In another
+     * words, the current seek can only seek to the end of the file. For other
+     * positions, use {@link RCFile.Reader#sync(long)}.
+     */
+    public synchronized void seek(long position) throws IOException {
+      in.seek(position);
+    }
+
+    /** Seek to the next sync mark past a given position. */
+    public synchronized void sync(long position) throws IOException {
+      if (position + SYNC_SIZE >= end) {
+        seek(end);
+        return;
+      }
+
+      try {
+        seek(position + 4); // skip escape
+        in.readFully(syncCheck);
+        int syncLen = sync.length;
+        for (int i = 0; in.getPos() < end; i++) {
+          int j = 0;
+          for (; j < syncLen; j++) {
+            if (sync[j] != syncCheck[(i + j) % syncLen])
+              break;
+          }
+          if (j == syncLen) {
+            in.seek(in.getPos() - SYNC_SIZE); // position before
+            // sync
+            return;
+          }
+          syncCheck[i % syncLen] = in.readByte();
+        }
+      } catch (ChecksumException e) { // checksum failure
+        handleChecksumException(e);
+      }
+    }
+
+    private void handleChecksumException(ChecksumException e)
+        throws IOException {
+      if (conf.getBoolean("io.skip.checksum.errors", false)) {
+        LOG.warn("Bad checksum at " + getPosition() + ". Skipping entries.");
+        sync(getPosition() + conf.getInt("io.bytes.per.checksum", 512));
+      } else {
+        throw e;
+      }
+    }
+
+    private KeyBuffer createKeyBuffer() {
+      return new KeyBuffer(columnNumber);
+    }
+
+    @SuppressWarnings("unused")
+    private ValueBuffer createValueBuffer(KeyBuffer key) throws IOException {
+      return new ValueBuffer(key, skippedColIDs);
+    }
+
+    /**
+     * Read and return the next record length, potentially skipping over a sync
+     * block.
+     * 
+     * @return the length of the next record or -1 if there is no next record
+     * @throws IOException
+     */
+    private synchronized int readRecordLength() throws IOException {
+      if (in.getPos() >= end) {
+        return -1;
+      }
+      int length = in.readInt();
+      if (version > 1 && sync != null && length == SYNC_ESCAPE) { // process
+        // a
+        // sync entry
+        in.readFully(syncCheck); // read syncCheck
+        if (!Arrays.equals(sync, syncCheck)) // check it
+          throw new IOException("File is corrupt!");
+        syncSeen = true;
+        if (in.getPos() >= end) {
+          return -1;
+        }
+        length = in.readInt(); // re-read length
+      } else {
+        syncSeen = false;
+      }
+      return length;
+    }
+
+    private void seekToNextKeyBuffer() throws IOException {
+      if (!keyInit)
+        return;
+      if (!currentValue.inited) {
+        in.skip(currentRecordLength - currentKeyLength);
+      }
+    }
+
+    private int compressedKeyLen = 0;
+    DataInputBuffer keyDataIn = new DataInputBuffer();
+    DataInputBuffer keyDecompressBuffer = new DataInputBuffer();
+    DataOutputBuffer keyTempBuffer = new DataOutputBuffer();
+
+    KeyBuffer currentKey = null;
+    boolean keyInit = false;
+
+    protected int nextKeyBuffer() throws IOException {
+      seekToNextKeyBuffer();
+      currentRecordLength = readRecordLength();
+      if (currentRecordLength == -1) {
+        keyInit = false;
+        return -1;
+      }
+      currentKeyLength = in.readInt();
+      compressedKeyLen = in.readInt();
+      if (decompress) {
+        keyTempBuffer.reset();
+        keyTempBuffer.write(in, compressedKeyLen);
+        keyDecompressBuffer.reset(keyTempBuffer.getData(), compressedKeyLen);
+        CompressionInputStream deflatFilter = codec.createInputStream(
+            keyDecompressBuffer, keyDecompressor);
+        DataInputStream compressedIn = new DataInputStream(deflatFilter);
+        deflatFilter.resetState();
+        keyDecompressedData.reset();
+        keyDecompressedData.write(compressedIn, currentKeyLength);
+        keyDataIn.reset(keyDecompressedData.getData(), currentKeyLength);
+        currentKey.readFields(keyDataIn);
+      } else {
+        currentKey.readFields(in);
+      }
+
+      keyInit = true;
+      currentValue.inited = false;
+
+      readRowsIndexInBuffer = 0;
+      recordsNumInValBuffer = currentKey.numberRows;
+
+      for (int i = 0; i < columnNumber; i++) {
+        if (skippedColIDs[i])
+          continue;
+        colValLenBufferReadIn[i].reset(currentKey.allCellValLenBuffer[i]
+            .getData(), currentKey.allCellValLenBuffer[i].getLength());
+        columnRowReadIndex[i] = 0;
+      }
+      return currentKeyLength;
+    }
+
+    protected void currentValueBuffer() throws IOException {
+      if (!keyInit)
+        nextKeyBuffer();
+      currentValue.keyBuffer = currentKey;
+      currentValue.clearColumnBuffer();
+      currentValue.readFields(in);
+      currentValue.inited = true;
+    }
+
+    private boolean rowFetched = false;
+
+    // use this buffer to hold column's cells value length for usages in
+    // getColumn(), instead of using colValLenBufferReadIn directly.
+    private DataInputBuffer fetchColumnTempBuf = new DataInputBuffer();
+
+    /**
+     * Fetch all data in the buffer for a given column. This is useful for
+     * columnar operators, which perform operations on an array data of one
+     * column. It should be used together with {@link #nextColumnsBatch()}.
+     * Calling getColumn() with not change the result of
+     * {@link #next(LongWritable)} and
+     * {@link #getCurrentRow(BytesRefArrayWritable)}.
+     * 
+     * @param columnID
+     * @throws IOException
+     */
+    public BytesRefArrayWritable getColumn(int columnID,
+        BytesRefArrayWritable rest) throws IOException {
+
+      if (skippedColIDs[columnID]) {
+        return null;
+      }
+
+      if (rest == null) {
+        rest = new BytesRefArrayWritable();
+      }
+
+      rest.resetValid(recordsNumInValBuffer);
+
+      if (!currentValue.inited)
+        currentValueBuffer();
+
+      int columnNextRowStart = 0;
+      fetchColumnTempBuf.reset(currentKey.allCellValLenBuffer[columnID]
+          .getData(), currentKey.allCellValLenBuffer[columnID].getLength());
+      for (int i = 0; i < recordsNumInValBuffer; i++) {
+        int length = WritableUtils.readVInt(fetchColumnTempBuf);
+
+        BytesRefWritable currentCell = rest.get(i);
+        currentCell.set(currentValue.loadedColumnsValueBuffer[columnID]
+            .getData(), columnNextRowStart, length);
+        columnNextRowStart = columnNextRowStart + length;
+      }
+      return rest;
+    }
+
+    /**
+     * Read in next key buffer and throw any data in current key buffer and
+     * current value buffer. It will influence the result of {@link
+     * #next(LongWritable)} and {@link #getCurrentRow(BytesRefArrayWritable)}
+     * 
+     * @return whether there still has records or not
+     * @throws IOException
+     */
+    public synchronized boolean nextColumnsBatch() throws IOException {
+      passedRowsNum += (recordsNumInValBuffer - readRowsIndexInBuffer);
+      return nextKeyBuffer() > 0;
+    }
+
+    /**
+     * Returns how many rows we fetched with next(). It only means how many rows
+     * are read by next(). The returned result may be smaller than actual number
+     * of rows passed by, because {@link #seek(long)},
+     * {@link #nextColumnsBatch()} can change the underlying key buffer and
+     * value buffer.
+     * 
+     * @return next row number
+     * @throws IOException
+     */
+    public synchronized boolean next(LongWritable readRows) throws IOException {
+      if (hasRecordsInBuffer()) {
+        readRows.set(passedRowsNum);
+        readRowsIndexInBuffer++;
+        passedRowsNum++;
+        rowFetched = false;
+        return true;
+      } else {
+        keyInit = false;
+      }
+
+      int ret = -1;
+      try {
+        ret = nextKeyBuffer();
+      } catch (EOFException eof) {
+        eof.printStackTrace();
+      }
+      if (ret > 0) {
+        return next(readRows);
+      }
+      return false;
+    }
+
+    public boolean hasRecordsInBuffer() {
+      return readRowsIndexInBuffer < recordsNumInValBuffer;
+    }
+
+    /**
+     * get the current row used,make sure called {@link #next(LongWritable)}
+     * first.
+     * 
+     * @throws IOException
+     */
+    public synchronized void getCurrentRow(BytesRefArrayWritable ret)
+        throws IOException {
+
+      if (!keyInit || rowFetched)
+        return;
+
+      if (!currentValue.inited) {
+        currentValueBuffer();
+      }
+
+      // we do not use BytesWritable here to avoid the byte-copy from
+      // DataOutputStream to BytesWritable
+
+      ret.resetValid(columnNumber);
+
+      for (int i = 0, readIndex = 0; i < columnNumber; i++) {
+        BytesRefWritable ref = ret.unCheckedGet(i);
+
+        if (skippedColIDs[i]) {
+          if (ref != BytesRefWritable.ZeroBytesRefWritable)
+            ret.set(i, BytesRefWritable.ZeroBytesRefWritable);
+          continue;
+        }
+
+        int columnCurrentRowStart = (int) columnRowReadIndex[i];
+        int length = (int) WritableUtils.readVLong(colValLenBufferReadIn[i]);
+        columnRowReadIndex[i] = columnCurrentRowStart + length;
+
+        ref.set(currentValue.loadedColumnsValueBuffer[readIndex].getData(),
+            columnCurrentRowStart, length);
+        readIndex++;
+      }
+      rowFetched = true;
+    }
+
+    /** Returns true iff the previous call to next passed a sync mark. */
+    public boolean syncSeen() {
+      return syncSeen;
+    }
+
+    /** Returns the name of the file. */
+    public String toString() {
+      return file.toString();
+    }
+
+    /** Close the reader. */
+    public void close() {
+      IOUtils.closeStream(in);
+      currentValue.close();
+      if (this.decompress) {
+        IOUtils.closeStream(keyDecompressedData);
+        CodecPool.returnDecompressor(keyDecompressor);
+      }
+    }
+  }
+}

Added: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/RCFileInputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/RCFileInputFormat.java?rev=770548&view=auto
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/RCFileInputFormat.java (added)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/RCFileInputFormat.java Fri May  1 06:31:30 2009
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.io;
+
+import java.io.IOException;
+
+import org.apache.hadoop.hive.serde2.columnar.BytesRefArrayWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.FileSplit;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapred.Reporter;
+
+public class RCFileInputFormat<K extends LongWritable, V extends BytesRefArrayWritable>
+    extends FileInputFormat<K, V> {
+
+  public RCFileInputFormat() {
+    setMinSplitSize(SequenceFile.SYNC_INTERVAL);
+  }
+
+  @SuppressWarnings("unchecked")
+  public RecordReader<K, V> getRecordReader(InputSplit split, JobConf job,
+      Reporter reporter) throws IOException {
+
+    reporter.setStatus(split.toString());
+
+    return new RCFileRecordReader(job, (FileSplit) split);
+  }
+}

Added: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/RCFileOutputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/RCFileOutputFormat.java?rev=770548&view=auto
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/RCFileOutputFormat.java (added)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/RCFileOutputFormat.java Fri May  1 06:31:30 2009
@@ -0,0 +1,146 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.io;
+
+import java.io.IOException;
+import java.util.Properties;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.serde2.columnar.BytesRefArrayWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.DefaultCodec;
+import org.apache.hadoop.mapred.FileOutputFormat;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RecordWriter;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.util.Progressable;
+import org.apache.hadoop.util.ReflectionUtils;
+
+public class RCFileOutputFormat extends
+    FileOutputFormat<Void, BytesRefArrayWritable> implements
+    HiveOutputFormat<WritableComparable, Writable> {
+
+  /**
+   * set number of columns into the given configuration.
+   * 
+   * @param conf
+   *          configuration instance which need to set the column number
+   * @param columnNum
+   *          column number for RCFile's Writer
+   * 
+   */
+  public static void setColumnNumber(Configuration conf, int columnNum) {
+    assert columnNum > 0;
+    conf.setInt(RCFile.COLUMN_NUMBER_CONF_STR, columnNum);
+  }
+
+  /**
+   * Returns the number of columns set in the conf for writers
+   * 
+   * @param conf
+   * @return number of columns for RCFile's writer
+   */
+  public static int getColumnNumber(Configuration conf) {
+    return conf.getInt(RCFile.COLUMN_NUMBER_CONF_STR, 0);
+  }
+
+  /** {@inheritDoc} */
+  public RecordWriter<Void, BytesRefArrayWritable> getRecordWriter(
+      FileSystem ignored, JobConf job, String name, Progressable progress)
+      throws IOException {
+
+    Path outputPath = getWorkOutputPath(job);
+    FileSystem fs = outputPath.getFileSystem(job);
+    if (!fs.exists(outputPath)) {
+      throw new IOException("Output directory doesnt exist");
+    }
+    Path file = new Path(outputPath, name);
+    CompressionCodec codec = null;
+    if (getCompressOutput(job)) {
+      Class<?> codecClass = getOutputCompressorClass(job, DefaultCodec.class);
+      codec = (CompressionCodec) ReflectionUtils.newInstance(codecClass, job);
+    }
+    final RCFile.Writer out = new RCFile.Writer(fs, job, file, progress, codec);
+
+    return new RecordWriter<Void, BytesRefArrayWritable>() {
+
+      @Override
+      public void close(Reporter reporter) throws IOException {
+        out.close();
+      }
+
+      @Override
+      public void write(Void key, BytesRefArrayWritable value)
+          throws IOException {
+        out.append(value);
+      }
+    };
+  }
+
+  /**
+   * create the final out file,
+   * 
+   * @param jc
+   *          the job configuration file
+   * @param finalOutPath
+   *          the final output file to be created
+   * @param valueClass
+   *          the value class used for create
+   * @param isCompressed
+   *          whether the content is compressed or not
+   * @param tableProperties
+   *          the tableInfo of this file's corresponding table
+   * @param progress
+   *          progress used for status report
+   * @throws IOException
+   */
+  @Override
+  public org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter getHiveRecordWriter(
+      JobConf jc, Path finalOutPath, Class<? extends Writable> valueClass,
+      boolean isCompressed, Properties tableProperties, Progressable progress)
+      throws IOException {
+
+    String[] cols = null;
+    String columns = tableProperties.getProperty("columns");
+    if (columns == null || columns.trim().equals(""))
+      cols = new String[0];
+    else
+      cols = StringUtils.split(columns, ",");
+
+    RCFileOutputFormat.setColumnNumber(jc, cols.length);
+    final RCFile.Writer outWriter = Utilities.createRCFileWriter(jc, FileSystem
+        .get(jc), finalOutPath, isCompressed);
+
+    return new org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter() {
+      public void write(Writable r) throws IOException {
+        outWriter.append(r);
+      }
+
+      public void close(boolean abort) throws IOException {
+        outWriter.close();
+      }
+    };
+  }
+}
\ No newline at end of file

Added: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/RCFileRecordReader.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/RCFileRecordReader.java?rev=770548&view=auto
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/RCFileRecordReader.java (added)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/RCFileRecordReader.java Fri May  1 06:31:30 2009
@@ -0,0 +1,135 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.io;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.io.RCFile.Reader;
+import org.apache.hadoop.hive.serde2.columnar.BytesRefArrayWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.mapred.FileSplit;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.util.ReflectionUtils;
+
+public class RCFileRecordReader<K extends LongWritable, V extends BytesRefArrayWritable>
+    implements RecordReader<LongWritable, BytesRefArrayWritable> {
+
+  private Reader in;
+  private long start;
+  private long end;
+  private boolean more = true;
+  protected Configuration conf;
+
+  public RCFileRecordReader(Configuration conf, FileSplit split)
+      throws IOException {
+    Path path = split.getPath();
+    FileSystem fs = path.getFileSystem(conf);
+    this.in = new RCFile.Reader(fs, path, conf);
+    this.end = split.getStart() + split.getLength();
+    this.conf = conf;
+
+    if (split.getStart() > in.getPosition())
+      in.sync(split.getStart()); // sync to start
+
+    this.start = in.getPosition();
+    more = start < end;
+  }
+
+  public Class<?> getKeyClass() {
+    return LongWritable.class;
+  }
+
+  public Class<?> getValueClass() {
+    return BytesRefArrayWritable.class;
+  }
+
+  public LongWritable createKey() {
+    return (LongWritable) ReflectionUtils.newInstance(getKeyClass(), conf);
+  }
+
+  public BytesRefArrayWritable createValue() {
+    return (BytesRefArrayWritable) ReflectionUtils.newInstance(getValueClass(),
+        conf);
+  }
+
+  private boolean firstSeen = true;
+
+  @Override
+  public boolean next(LongWritable key, BytesRefArrayWritable value)
+      throws IOException {
+    if (!more)
+      return false;
+    long pos = in.getPosition();
+    boolean hasMore = in.next(key);
+    if (hasMore) {
+      in.getCurrentRow(value);
+    }
+    if (pos >= end && in.syncSeen() && !in.hasRecordsInBuffer()) {
+      more = false;
+      if (firstSeen) {
+        firstSeen = false;
+        return true;
+      }
+    } else {
+      more = hasMore;
+    }
+    return more;
+  }
+
+  protected boolean next(LongWritable key) throws IOException {
+    if (!more)
+      return false;
+    long pos = in.getPosition();
+    boolean hasMore = in.next(key);
+    if (pos >= end && in.syncSeen()) {
+      more = false;
+    } else {
+      more = hasMore;
+    }
+    return more;
+  }
+
+  /**
+   * Return the progress within the input split
+   * 
+   * @return 0.0 to 1.0 of the input byte range
+   */
+  public float getProgress() throws IOException {
+    if (end == start) {
+      return 0.0f;
+    } else {
+      return Math.min(1.0f, (in.getPosition() - start) / (float) (end - start));
+    }
+  }
+
+  public long getPos() throws IOException {
+    return in.getPosition();
+  }
+
+  protected void seek(long pos) throws IOException {
+    in.seek(pos);
+  }
+
+  public void close() throws IOException {
+    in.close();
+  }
+}

Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java?rev=770548&r1=770547&r2=770548&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java Fri May  1 06:31:30 2009
@@ -738,7 +738,7 @@
    * Replaces files in the partition with new data set specifed by srcf. Works by moving files
    *
    * @param srcf Files to be moved. Leaf Directories or Globbed File Paths
-   * @param dest The directory where the final data needs to go
+   * @param destf The directory where the final data needs to go
    * @param fs The filesystem handle
    * @param tmppath Temporary directory
    */

Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java?rev=770548&r1=770547&r2=770548&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java Fri May  1 06:31:30 2009
@@ -43,6 +43,8 @@
 import org.apache.hadoop.hive.ql.io.HiveFileFormatUtils;
 import org.apache.hadoop.hive.ql.io.HiveOutputFormat;
 import org.apache.hadoop.hive.ql.io.IgnoreKeyTextOutputFormat;
+import org.apache.hadoop.hive.ql.io.RCFileInputFormat;
+import org.apache.hadoop.hive.ql.io.RCFileOutputFormat;
 import org.apache.hadoop.hive.ql.plan.DDLWork;
 import org.apache.hadoop.hive.ql.plan.alterTableDesc;
 import org.apache.hadoop.hive.ql.plan.createTableDesc;
@@ -55,6 +57,7 @@
 import org.apache.hadoop.hive.serde2.SerDeUtils;
 import org.apache.hadoop.hive.ql.plan.fetchWork;
 import org.apache.hadoop.hive.ql.plan.tableDesc;
+import org.apache.hadoop.hive.serde2.columnar.ColumnarSerDe;
 import org.apache.hadoop.hive.serde2.dynamic_type.DynamicSerDe;
 import org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe;
 import org.apache.hadoop.mapred.TextInputFormat;
@@ -83,7 +86,11 @@
   private static final String TEXTFILE_OUTPUT = IgnoreKeyTextOutputFormat.class.getName();
   private static final String SEQUENCEFILE_INPUT = SequenceFileInputFormat.class.getName();
   private static final String SEQUENCEFILE_OUTPUT = SequenceFileOutputFormat.class.getName();
+  private static final String RCFILE_INPUT = RCFileInputFormat.class.getName();
+  private static final String RCFILE_OUTPUT = RCFileOutputFormat.class.getName();
 
+  private static final String COLUMNAR_SERDE = ColumnarSerDe.class.getName();
+  
   public static String getTypeName(int token) {
     return TokenToTypeName.get(token);
   }
@@ -234,6 +241,11 @@
           inputFormat = TEXTFILE_INPUT;
           outputFormat = TEXTFILE_OUTPUT;
           break;
+        case HiveParser.TOK_TBLRCFILE:
+          inputFormat = RCFILE_INPUT;
+          outputFormat = RCFILE_OUTPUT;
+          serde = COLUMNAR_SERDE;
+          break;
         case HiveParser.TOK_TABLEFILEFORMAT:
           inputFormat = unescapeSQLString(child.getChild(0).getText());
           outputFormat = unescapeSQLString(child.getChild(1).getText());
@@ -244,7 +256,6 @@
         default: assert false;
       }
     }
-
     createTableDesc crtTblDesc = 
       new createTableDesc(tableName, isExt, cols, partCols, bucketCols, 
                           sortCols, numBuckets,

Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/Hive.g
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/Hive.g?rev=770548&r1=770547&r2=770548&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/Hive.g (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/Hive.g Fri May  1 06:31:30 2009
@@ -106,6 +106,7 @@
 TOK_TABLEROWFORMATLINES;
 TOK_TBLSEQUENCEFILE;
 TOK_TBLTEXTFILE;
+TOK_TBLRCFILE;
 TOK_TABLEFILEFORMAT;
 TOK_TABCOLNAME;
 TOK_TABLELOCATION;
@@ -412,6 +413,7 @@
     :
       KW_STORED KW_AS KW_SEQUENCEFILE  -> TOK_TBLSEQUENCEFILE
       | KW_STORED KW_AS KW_TEXTFILE  -> TOK_TBLTEXTFILE
+      | KW_STORED KW_AS KW_RCFILE  -> TOK_TBLRCFILE
       | KW_STORED KW_AS KW_INPUTFORMAT inFmt=StringLiteral KW_OUTPUTFORMAT outFmt=StringLiteral
       -> ^(TOK_TABLEFILEFORMAT $inFmt $outFmt)
     ;
@@ -1154,6 +1156,7 @@
 KW_STORED: 'STORED';
 KW_SEQUENCEFILE: 'SEQUENCEFILE';
 KW_TEXTFILE: 'TEXTFILE';
+KW_RCFILE: 'RCFILE';
 KW_INPUTFORMAT: 'INPUTFORMAT';
 KW_OUTPUTFORMAT: 'OUTPUTFORMAT';
 KW_LOCATION: 'LOCATION';



Mime
View raw message