hive-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From prasan...@apache.org
Subject svn commit: r1661271 [7/39] - in /hive/branches/llap: ./ accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/serde/ accumulo-handler/src/test/org/apache/hadoop/hive/accumulo/ accumulo-handler/src/test/org/apache/hadoop/hive/accumulo/mr/ accumulo-...
Date Sat, 21 Feb 2015 02:37:03 GMT
Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java?rev=1661271&r1=1661270&r2=1661271&view=diff
==============================================================================
--- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java (original)
+++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java Sat Feb 21 02:36:54 2015
@@ -24,6 +24,7 @@ import java.io.IOException;
 import java.math.BigDecimal;
 import java.math.BigInteger;
 import java.nio.ByteBuffer;
+import java.sql.Date;
 import java.sql.Timestamp;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -34,6 +35,7 @@ import java.util.List;
 import java.util.Map;
 
 import org.apache.commons.lang.StringUtils;
+import org.apache.commons.lang3.exception.ExceptionUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -54,6 +56,7 @@ import org.apache.hadoop.hive.ql.exec.ve
 import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
 import org.apache.hadoop.hive.ql.exec.vector.expressions.StringExpr;
 import org.apache.hadoop.hive.ql.io.orc.RecordReaderUtils.ByteBufferAllocatorPool;
+import org.apache.hadoop.hive.ql.io.filters.BloomFilter;
 import org.apache.hadoop.hive.ql.io.sarg.PredicateLeaf;
 import org.apache.hadoop.hive.ql.io.sarg.SearchArgument;
 import org.apache.hadoop.hive.ql.io.sarg.SearchArgument.TruthValue;
@@ -103,7 +106,10 @@ public class RecordReaderImpl implements
   List<DiskRange> bufferChunks = new ArrayList<DiskRange>(0);
   private final TreeReader reader;
   private final OrcProto.RowIndex[] indexes;
+  private final OrcProto.BloomFilterIndex[] bloomFilterIndices;
   private final SargApplier sargApp;
+  // same as the above array, but indices are set to true
+  private final boolean[] sargColumns;
   // an array about which row groups aren't skipped
   private boolean[] includedRowGroups = null;
   private final Configuration conf;
@@ -112,6 +118,27 @@ public class RecordReaderImpl implements
   private final ByteBufferAllocatorPool pool = new ByteBufferAllocatorPool();
   private final ZeroCopyReaderShim zcr;
 
+  public final static class Index {
+    OrcProto.RowIndex[] rowGroupIndex;
+    OrcProto.BloomFilterIndex[] bloomFilterIndex;
+
+    Index(OrcProto.RowIndex[] rgIndex, OrcProto.BloomFilterIndex[] bfIndex) {
+      this.rowGroupIndex = rgIndex;
+      this.bloomFilterIndex = bfIndex;
+    }
+
+    public OrcProto.RowIndex[] getRowGroupIndex() {
+      return rowGroupIndex;
+    }
+
+    public OrcProto.BloomFilterIndex[] getBloomFilterIndex() {
+      return bloomFilterIndex;
+    }
+
+    public void setRowGroupIndex(OrcProto.RowIndex[] rowGroupIndex) {
+      this.rowGroupIndex = rowGroupIndex;
+    }
+  }
 
   /**
    * Given a list of column names, find the given column and return the index.
@@ -139,7 +166,7 @@ public class RecordReaderImpl implements
    *                   result
    * @return an array mapping the sarg leaves to concrete column numbers
    */
-  static int[] mapSargColumns(List<PredicateLeaf> sargLeaves,
+  public static int[] mapSargColumns(List<PredicateLeaf> sargLeaves,
                              String[] columnNames,
                              int rootColumn) {
     int[] result = new int[sargLeaves.size()];
@@ -172,9 +199,18 @@ public class RecordReaderImpl implements
     this.metadata = new MetadataReader(file, codec, bufferSize, types.size());
     SearchArgument sarg = options.getSearchArgument();
     if (sarg != null && strideRate != 0) {
-      sargApp = new SargApplier(sarg, options.getColumnNames(), strideRate);
+      sargApp = new SargApplier(sarg, options.getColumnNames(), strideRate, types);
+      // included will not be null, row options will fill the array with trues if null
+      sargColumns = new boolean[included.length];
+      for (int i : sargApp.filterColumns) {
+        // filter columns may have -1 as index which could be partition column in SARG.
+        if (i > 0) {
+          sargColumns[i] = true;
+        }
+      }
     } else {
       sargApp = null;
+      sargColumns = null;
     }
     long rows = 0;
     long skippedRows = 0;
@@ -199,6 +235,7 @@ public class RecordReaderImpl implements
     boolean skipCorrupt = HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_ORC_SKIP_CORRUPT_DATA);
     reader = createTreeReader(0, types, included, skipCorrupt);
     indexes = new OrcProto.RowIndex[types.size()];
+    bloomFilterIndices = new OrcProto.BloomFilterIndex[types.size()];
     advanceToNextRow(reader, 0L, true);
   }
 
@@ -1502,7 +1539,7 @@ public class RecordReaderImpl implements
         }
         len -= bytesRead;
         offset += bytesRead;
-      } 
+      }
 
       return allBytes;
     }
@@ -1905,7 +1942,7 @@ public class RecordReaderImpl implements
           }
         }
       } else {
-        if (result.noNulls){ 
+        if (result.noNulls){
           for (int i = 0; i < batchSize; i++) {
             adjustedDownLen = StringExpr.rightTrimAndTruncate(result.vector[i], result.start[i], result.length[i], maxLength);
             if (adjustedDownLen < result.length[i]) {
@@ -1974,7 +2011,7 @@ public class RecordReaderImpl implements
           }
         }
       } else {
-        if (result.noNulls){ 
+        if (result.noNulls){
           for (int i = 0; i < batchSize; i++) {
             adjustedDownLen = StringExpr.truncate(result.vector[i], result.start[i], result.length[i], maxLength);
             if (adjustedDownLen < result.length[i]) {
@@ -2539,15 +2576,20 @@ public class RecordReaderImpl implements
    * that is referenced in the predicate.
    * @param statsProto the statistics for the column mentioned in the predicate
    * @param predicate the leaf predicate we need to evaluation
+   * @param bloomFilter
    * @return the set of truth values that may be returned for the given
    *   predicate.
    */
-  static TruthValue evaluatePredicate(OrcProto.ColumnStatistics statsProto,
-                                      PredicateLeaf predicate) {
+  static TruthValue evaluatePredicateProto(OrcProto.ColumnStatistics statsProto,
+      PredicateLeaf predicate, OrcProto.BloomFilter bloomFilter) {
     ColumnStatistics cs = ColumnStatisticsImpl.deserialize(statsProto);
     Object minValue = getMin(cs);
     Object maxValue = getMax(cs);
-    return evaluatePredicateRange(predicate, minValue, maxValue, cs.hasNull());
+    BloomFilter bf = null;
+    if (bloomFilter != null) {
+      bf = new BloomFilter(bloomFilter);
+    }
+    return evaluatePredicateRange(predicate, minValue, maxValue, cs.hasNull(), bf);
   }
 
   /**
@@ -2559,14 +2601,14 @@ public class RecordReaderImpl implements
    *   predicate.
    */
   static TruthValue evaluatePredicate(ColumnStatistics stats,
-      PredicateLeaf predicate) {
+      PredicateLeaf predicate, BloomFilter bloomFilter) {
     Object minValue = getMin(stats);
     Object maxValue = getMax(stats);
-    return evaluatePredicateRange(predicate, minValue, maxValue, stats.hasNull());
+    return evaluatePredicateRange(predicate, minValue, maxValue, stats.hasNull(), bloomFilter);
   }
 
   static TruthValue evaluatePredicateRange(PredicateLeaf predicate, Object min,
-      Object max, boolean hasNull) {
+      Object max, boolean hasNull, BloomFilter bloomFilter) {
     // if we didn't have any values, everything must have been null
     if (min == null) {
       if (predicate.getOperator() == PredicateLeaf.Operator.IS_NULL) {
@@ -2576,21 +2618,42 @@ public class RecordReaderImpl implements
       }
     }
 
-    Location loc;
+    TruthValue result;
+    // Predicate object and stats object can be one of the following base types
+    // LONG, DOUBLE, STRING, DATE, DECIMAL
+    // Out of these DATE is not implicitly convertible to other types and rest
+    // others are implicitly convertible. In cases where DATE cannot be converted
+    // the stats object is converted to text and comparison is performed.
+    // When STRINGs are converted to other base types, NumberFormat exception
+    // can occur in which case TruthValue.YES_NO_NULL value is returned
     try {
-      // Predicate object and stats object can be one of the following base types
-      // LONG, DOUBLE, STRING, DATE, DECIMAL
-      // Out of these DATE is not implicitly convertible to other types and rest
-      // others are implicitly convertible. In cases where DATE cannot be converted
-      // the stats object is converted to text and comparison is performed.
-      // When STRINGs are converted to other base types, NumberFormat exception
-      // can occur in which case TruthValue.YES_NO_NULL value is returned
       Object baseObj = predicate.getLiteral(PredicateLeaf.FileFormat.ORC);
       Object minValue = getConvertedStatsObj(min, baseObj);
       Object maxValue = getConvertedStatsObj(max, baseObj);
       Object predObj = getBaseObjectForComparison(baseObj, minValue);
 
-      switch (predicate.getOperator()) {
+      result = evaluatePredicateMinMax(predicate, predObj, minValue, maxValue, hasNull);
+      if (bloomFilter != null && result != TruthValue.NO_NULL && result != TruthValue.NO) {
+        result = evaluatePredicateBloomFilter(predicate, predObj, bloomFilter, hasNull);
+      }
+      // in case failed conversion, return the default YES_NO_NULL truth value
+    } catch (NumberFormatException nfe) {
+      if (LOG.isWarnEnabled()) {
+        LOG.warn("NumberFormatException when type matching predicate object" +
+            " and statistics object. Exception: " + ExceptionUtils.getStackTrace(nfe));
+      }
+      result = hasNull ? TruthValue.YES_NO_NULL : TruthValue.YES_NO;
+    }
+    return result;
+  }
+
+  private static TruthValue evaluatePredicateMinMax(PredicateLeaf predicate, Object predObj,
+      Object minValue,
+      Object maxValue,
+      boolean hasNull) {
+    Location loc;
+
+    switch (predicate.getOperator()) {
       case NULL_SAFE_EQUALS:
         loc = compareToRange((Comparable) predObj, minValue, maxValue);
         if (loc == Location.BEFORE || loc == Location.AFTER) {
@@ -2675,12 +2738,81 @@ public class RecordReaderImpl implements
         return hasNull ? TruthValue.YES_NO : TruthValue.NO;
       default:
         return hasNull ? TruthValue.YES_NO_NULL : TruthValue.YES_NO;
+    }
+  }
+
+  private static TruthValue evaluatePredicateBloomFilter(PredicateLeaf predicate, Object predObj,
+      BloomFilter bloomFilter, boolean hasNull) {
+    switch (predicate.getOperator()) {
+      case NULL_SAFE_EQUALS:
+        // null safe equals does not return *_NULL variant. So set hasNull to false
+        return checkInBloomFilter(bloomFilter, predObj, false);
+      case EQUALS:
+        return checkInBloomFilter(bloomFilter, predObj, hasNull);
+      case IN:
+        for (Object arg : predicate.getLiteralList(PredicateLeaf.FileFormat.ORC)) {
+          // if atleast one value in IN list exist in bloom filter, qualify the row group/stripe
+          TruthValue result = checkInBloomFilter(bloomFilter, arg, hasNull);
+          if (result == TruthValue.YES_NO_NULL || result == TruthValue.YES_NO) {
+            return result;
+          }
+        }
+        return hasNull ? TruthValue.NO_NULL : TruthValue.NO;
+      default:
+        return hasNull ? TruthValue.YES_NO_NULL : TruthValue.YES_NO;
+    }
+  }
+
+  private static TruthValue checkInBloomFilter(BloomFilter bf, Object predObj, boolean hasNull) {
+    TruthValue result = hasNull ? TruthValue.NO_NULL : TruthValue.NO;
+
+    if (predObj instanceof Long) {
+      if (bf.testLong(((Long) predObj).longValue())) {
+        result = TruthValue.YES_NO_NULL;
+      }
+    } else if (predObj instanceof Double) {
+      if (bf.testDouble(((Double) predObj).doubleValue())) {
+        result = TruthValue.YES_NO_NULL;
+      }
+    } else if (predObj instanceof String || predObj instanceof Text ||
+        predObj instanceof HiveDecimal || predObj instanceof BigDecimal) {
+      if (bf.testString(predObj.toString())) {
+        result = TruthValue.YES_NO_NULL;
+      }
+    } else if (predObj instanceof Date) {
+      if (bf.testLong(DateWritable.dateToDays((Date) predObj))) {
+        result = TruthValue.YES_NO_NULL;
+      }
+    } else if (predObj instanceof DateWritable) {
+      if (bf.testLong(((DateWritable) predObj).getDays())) {
+        result = TruthValue.YES_NO_NULL;
+      }
+    } else if (predObj instanceof Timestamp) {
+      if (bf.testLong(((Timestamp) predObj).getTime())) {
+        result = TruthValue.YES_NO_NULL;
+      }
+    } else if (predObj instanceof TimestampWritable) {
+      if (bf.testLong(((TimestampWritable) predObj).getTimestamp().getTime())) {
+        result = TruthValue.YES_NO_NULL;
       }
+    } else {
+      // if the predicate object is null and if hasNull says there are no nulls then return NO
+      if (predObj == null && !hasNull) {
+        result = TruthValue.NO;
+      } else {
+        result = TruthValue.YES_NO_NULL;
+      }
+    }
 
-      // in case failed conversion, return the default YES_NO_NULL truth value
-    } catch (NumberFormatException nfe) {
-      return hasNull ? TruthValue.YES_NO_NULL : TruthValue.YES_NO;
+    if (result == TruthValue.YES_NO_NULL && !hasNull) {
+      result = TruthValue.YES_NO;
+    }
+
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Bloom filter evaluation: " + result.toString());
     }
+
+    return result;
   }
 
   private static Object getBaseObjectForComparison(Object predObj, Object statsObj) {
@@ -2742,18 +2874,22 @@ public class RecordReaderImpl implements
     private final List<PredicateLeaf> sargLeaves;
     private final int[] filterColumns;
     private final long rowIndexStride;
+    private final OrcProto.BloomFilterIndex[] bloomFilterIndices;
 
-    public SargApplier(SearchArgument sarg, String[] columnNames, long rowIndexStride) {
+    public SargApplier(SearchArgument sarg, String[] columnNames, long rowIndexStride,
+        List<OrcProto.Type> types) {
       this.sarg = sarg;
       sargLeaves = sarg.getLeaves();
       filterColumns = mapSargColumns(sargLeaves, columnNames, 0);
+      bloomFilterIndices = new OrcProto.BloomFilterIndex[types.size()];
       this.rowIndexStride = rowIndexStride;
     }
 
     /**
      * Pick the row groups that we need to load from the current stripe.
+     *
      * @return an array with a boolean for each row group or null if all of the
-     *    row groups must be read.
+     * row groups must be read.
      * @throws IOException
      */
     public boolean[] pickRowGroups(
@@ -2762,12 +2898,16 @@ public class RecordReaderImpl implements
       int groupsInStripe = (int) ((rowsInStripe + rowIndexStride - 1) / rowIndexStride);
       boolean[] result = new boolean[groupsInStripe]; // TODO: avoid alloc?
       TruthValue[] leafValues = new TruthValue[sargLeaves.size()];
-      for(int rowGroup=0; rowGroup < result.length; ++rowGroup) {
-        for(int pred=0; pred < leafValues.length; ++pred) {
+      for (int rowGroup = 0; rowGroup < result.length; ++rowGroup) {
+        for (int pred = 0; pred < leafValues.length; ++pred) {
           if (filterColumns[pred] != -1) {
             OrcProto.ColumnStatistics stats =
                 indexes[filterColumns[pred]].getEntry(rowGroup).getStatistics();
-            leafValues[pred] = evaluatePredicate(stats, sargLeaves.get(pred));
+            OrcProto.BloomFilter bf = null;
+            if (bloomFilterIndices[filterColumns[pred]] != null) {
+              bf = bloomFilterIndices[filterColumns[pred]].getBloomFilter(rowGroup);
+            }
+            leafValues[pred] = evaluatePredicateProto(stats, sargLeaves.get(pred), bf);
             if (LOG.isDebugEnabled()) {
               LOG.debug("Stats = " + stats);
               LOG.debug("Setting " + sargLeaves.get(pred) + " to " +
@@ -2781,20 +2921,19 @@ public class RecordReaderImpl implements
         result[rowGroup] = sarg.evaluate(leafValues).isNeeded();
         if (LOG.isDebugEnabled()) {
           LOG.debug("Row group " + (rowIndexStride * rowGroup) + " to " +
-              (rowIndexStride * (rowGroup+1) - 1) + " is " +
+              (rowIndexStride * (rowGroup + 1) - 1) + " is " +
               (result[rowGroup] ? "" : "not ") + "included.");
         }
       }
 
       // if we found something to skip, use the array. otherwise, return null.
-      for (boolean b: result) {
+      for (boolean b : result) {
         if (!b) {
           return result;
         }
       }
       return null;
     }
-
   }
 
   /**
@@ -2994,7 +3133,10 @@ public class RecordReaderImpl implements
       long length = stream.getLength();
       int column = stream.getColumn();
       OrcProto.Stream.Kind streamKind = stream.getKind();
-      if (StreamName.getArea(streamKind) == StreamName.Area.DATA && includedColumns[column]) {
+      // since stream kind is optional, first check if it exists
+      if (stream.hasKind() &&
+          (StreamName.getArea(streamKind) == StreamName.Area.DATA) &&
+          includedColumns[column]) {
         // if we aren't filtering or it is a dictionary, load it.
         if (includedRowGroups == null
             || RecordReaderUtils.isDictionary(streamKind, encodings.get(column))) {
@@ -3042,7 +3184,8 @@ public class RecordReaderImpl implements
     for (OrcProto.Stream streamDesc: streamDescriptions) {
       int column = streamDesc.getColumn();
       if ((includeColumn != null && !includeColumn[column]) ||
-          StreamName.getArea(streamDesc.getKind()) != StreamName.Area.DATA) {
+          streamDesc.hasKind() &&
+          (StreamName.getArea(streamDesc.getKind()) != StreamName.Area.DATA)) {
         streamOffset += streamDesc.getLength();
         continue;
       }
@@ -3247,21 +3390,22 @@ public class RecordReaderImpl implements
     throw new IllegalArgumentException("Seek after the end of reader range");
   }
 
-  OrcProto.RowIndex[] readRowIndex(
-      int stripeIndex, boolean[] included) throws IOException {
-    return readRowIndex(stripeIndex, included, null);
+  Index readRowIndex(int stripeIndex, boolean[] included) throws IOException {
+    return readRowIndex(stripeIndex, included, null, null, null);
   }
 
-  OrcProto.RowIndex[] readRowIndex(
-      int stripeIndex, boolean[] included, OrcProto.RowIndex[] indexes) throws IOException {
+  Index readRowIndex(int stripeIndex, boolean[] included, OrcProto.RowIndex[] indexes,
+      OrcProto.BloomFilterIndex[] bloomFilterIndex, boolean[] sargColumns) throws IOException {
     StripeInformation stripe = stripes.get(stripeIndex);
     OrcProto.StripeFooter stripeFooter = null;
     // if this is the current stripe, use the cached objects.
     if (stripeIndex == currentStripe) {
       stripeFooter = this.stripeFooter;
       indexes = indexes == null ? this.indexes : indexes;
+      bloomFilterIndex = bloomFilterIndex == null ? this.bloomFilterIndices : bloomFilterIndex;
     }
-    return metadata.readRowIndex(stripe, stripeFooter, included, indexes);
+    return metadata.readRowIndex(stripe, stripeFooter, included, indexes, sargColumns,
+        bloomFilterIndex);
   }
 
   private void seekToRowEntry(TreeReader reader, int rowEntry) throws IOException {
@@ -3278,10 +3422,10 @@ public class RecordReaderImpl implements
   public void seekToRow(long rowNumber) throws IOException {
     if (rowNumber < 0) {
       throw new IllegalArgumentException("Seek to a negative row number " +
-                                         rowNumber);
+          rowNumber);
     } else if (rowNumber < firstRow) {
       throw new IllegalArgumentException("Seek before reader range " +
-                                         rowNumber);
+          rowNumber);
     }
     // convert to our internal form (rows from the beginning of slice)
     rowNumber -= firstRow;

Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderUtils.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderUtils.java?rev=1661271&r1=1661270&r2=1661271&view=diff
==============================================================================
--- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderUtils.java (original)
+++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderUtils.java Sat Feb 21 02:36:54 2015
@@ -46,7 +46,7 @@ public class RecordReaderUtils {
       List<OrcProto.Stream> streamList, List<OrcProto.Type> types) {
     boolean[] hasNull = new boolean[types.size()];
     for(OrcProto.Stream stream: streamList) {
-      if (stream.getKind() == OrcProto.Stream.Kind.PRESENT) {
+      if (stream.hasKind() && (stream.getKind() == OrcProto.Stream.Kind.PRESENT)) {
         hasNull[stream.getColumn()] = true;
       }
     }

Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/StreamName.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/StreamName.java?rev=1661271&r1=1661270&r2=1661271&view=diff
==============================================================================
--- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/StreamName.java (original)
+++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/StreamName.java Sat Feb 21 02:36:54 2015
@@ -75,6 +75,7 @@ class StreamName implements Comparable<S
     switch (kind) {
       case ROW_INDEX:
       case DICTIONARY_COUNT:
+      case BLOOM_FILTER:
         return Area.INDEX;
       default:
         return Area.DATA;

Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/WriterImpl.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/WriterImpl.java?rev=1661271&r1=1661270&r2=1661271&view=diff
==============================================================================
--- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/WriterImpl.java (original)
+++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/WriterImpl.java Sat Feb 21 02:36:54 2015
@@ -26,6 +26,7 @@ import java.lang.management.ManagementFa
 import java.nio.ByteBuffer;
 import java.sql.Timestamp;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.EnumSet;
 import java.util.List;
 import java.util.Map;
@@ -40,6 +41,7 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.common.type.HiveDecimal;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.ql.io.IOConstants;
+import org.apache.hadoop.hive.ql.io.filters.BloomFilter;
 import org.apache.hadoop.hive.ql.io.orc.CompressionCodec.Modifier;
 import org.apache.hadoop.hive.ql.io.orc.OrcFile.CompressionStrategy;
 import org.apache.hadoop.hive.ql.io.orc.OrcFile.EncodingStrategy;
@@ -77,7 +79,9 @@ import org.apache.hadoop.io.BytesWritabl
 import org.apache.hadoop.io.Text;
 
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Joiner;
 import com.google.common.collect.Lists;
+import com.google.common.primitives.Longs;
 import com.google.protobuf.ByteString;
 import com.google.protobuf.CodedOutputStream;
 
@@ -145,23 +149,27 @@ public class WriterImpl implements Write
   private final OrcFile.WriterContext callbackContext;
   private final OrcFile.EncodingStrategy encodingStrategy;
   private final OrcFile.CompressionStrategy compressionStrategy;
+  private final boolean[] bloomFilterColumns;
+  private final double bloomFilterFpp;
 
   WriterImpl(FileSystem fs,
-             Path path,
-             Configuration conf,
-             ObjectInspector inspector,
-             long stripeSize,
-             CompressionKind compress,
-             int bufferSize,
-             int rowIndexStride,
-             MemoryManager memoryManager,
-             boolean addBlockPadding,
-             OrcFile.Version version,
-             OrcFile.WriterCallback callback,
-             OrcFile.EncodingStrategy encodingStrategy,
-             CompressionStrategy compressionStrategy,
-             float paddingTolerance,
-             long blockSizeValue) throws IOException {
+      Path path,
+      Configuration conf,
+      ObjectInspector inspector,
+      long stripeSize,
+      CompressionKind compress,
+      int bufferSize,
+      int rowIndexStride,
+      MemoryManager memoryManager,
+      boolean addBlockPadding,
+      OrcFile.Version version,
+      OrcFile.WriterCallback callback,
+      EncodingStrategy encodingStrategy,
+      CompressionStrategy compressionStrategy,
+      float paddingTolerance,
+      long blockSizeValue,
+      String bloomFilterColumnNames,
+      double bloomFilterFpp) throws IOException {
     this.fs = fs;
     this.path = path;
     this.conf = conf;
@@ -190,7 +198,20 @@ public class WriterImpl implements Write
     this.memoryManager = memoryManager;
     buildIndex = rowIndexStride > 0;
     codec = createCodec(compress);
-    this.bufferSize = getEstimatedBufferSize(bufferSize);
+    String allColumns = conf.get(IOConstants.COLUMNS);
+    if (allColumns == null) {
+      allColumns = getColumnNamesFromInspector(inspector);
+    }
+    this.bufferSize = getEstimatedBufferSize(allColumns, bufferSize);
+    if (version == OrcFile.Version.V_0_11) {
+      /* do not write bloom filters for ORC v11 */
+      this.bloomFilterColumns =
+          OrcUtils.includeColumns(null, allColumns, inspector);
+    } else {
+      this.bloomFilterColumns =
+          OrcUtils.includeColumns(bloomFilterColumnNames, allColumns, inspector);
+    }
+    this.bloomFilterFpp = bloomFilterFpp;
     treeWriter = createTreeWriter(inspector, streamFactory, false);
     if (buildIndex && rowIndexStride < MIN_ROW_INDEX_STRIDE) {
       throw new IllegalArgumentException("Row stride must be at least " +
@@ -201,8 +222,25 @@ public class WriterImpl implements Write
     memoryManager.addWriter(path, stripeSize, this);
   }
 
+  private String getColumnNamesFromInspector(ObjectInspector inspector) {
+    List<String> fieldNames = Lists.newArrayList();
+    Joiner joiner = Joiner.on(",");
+    if (inspector instanceof StructObjectInspector) {
+      StructObjectInspector soi = (StructObjectInspector) inspector;
+      List<? extends StructField> fields = soi.getAllStructFieldRefs();
+      for(StructField sf : fields) {
+        fieldNames.add(sf.getFieldName());
+      }
+    }
+    return joiner.join(fieldNames);
+  }
+
+  @VisibleForTesting
   int getEstimatedBufferSize(int bs) {
-    String colNames = conf.get(IOConstants.COLUMNS);
+      return getEstimatedBufferSize(conf.get(IOConstants.COLUMNS), bs);
+  }
+
+  int getEstimatedBufferSize(String colNames, int bs) {
     long availableMem = getMemoryAvailableForORC();
     if (colNames != null) {
       final int numCols = colNames.split(",").length;
@@ -459,26 +497,27 @@ public class WriterImpl implements Write
       final EnumSet<CompressionCodec.Modifier> modifiers;
 
       switch (kind) {
-      case DATA:
-      case DICTIONARY_DATA:
-        if (getCompressionStrategy() == CompressionStrategy.SPEED) {
-          modifiers = EnumSet.of(Modifier.FAST, Modifier.TEXT);
-        } else {
-          modifiers = EnumSet.of(Modifier.DEFAULT, Modifier.TEXT);
-        }
-        break;
-      case LENGTH:
-      case DICTIONARY_COUNT:
-      case PRESENT:
-      case ROW_INDEX:
-      case SECONDARY:
-        // easily compressed using the fastest modes
-        modifiers = EnumSet.of(Modifier.FASTEST, Modifier.BINARY);
-        break;
-      default:
-        LOG.warn("Missing ORC compression modifiers for " + kind);
-        modifiers = null;
-        break;
+        case BLOOM_FILTER:
+        case DATA:
+        case DICTIONARY_DATA:
+          if (getCompressionStrategy() == CompressionStrategy.SPEED) {
+            modifiers = EnumSet.of(Modifier.FAST, Modifier.TEXT);
+          } else {
+            modifiers = EnumSet.of(Modifier.DEFAULT, Modifier.TEXT);
+          }
+          break;
+        case LENGTH:
+        case DICTIONARY_COUNT:
+        case PRESENT:
+        case ROW_INDEX:
+        case SECONDARY:
+          // easily compressed using the fastest modes
+          modifiers = EnumSet.of(Modifier.FASTEST, Modifier.BINARY);
+          break;
+        default:
+          LOG.warn("Missing ORC compression modifiers for " + kind);
+          modifiers = null;
+          break;
       }
 
       BufferedStream result = streams.get(name);
@@ -499,6 +538,15 @@ public class WriterImpl implements Write
     }
 
     /**
+     * Get the current column id. After creating all tree writers this count should tell how many
+     * columns (including columns within nested complex objects) are created in total.
+     * @return current column id
+     */
+    public int getCurrentColumnId() {
+      return columnCount;
+    }
+
+    /**
      * Get the stride rate of the row index.
      */
     public int getRowIndexStride() {
@@ -538,6 +586,22 @@ public class WriterImpl implements Write
     }
 
     /**
+     * Get the bloom filter columns
+     * @return bloom filter columns
+     */
+    public boolean[] getBloomFilterColumns() {
+      return bloomFilterColumns;
+    }
+
+    /**
+     * Get bloom filter false positive percentage.
+     * @return fpp
+     */
+    public double getBloomFilterFPP() {
+      return bloomFilterFpp;
+    }
+
+    /**
      * Get the writer's configuration.
      * @return configuration
      */
@@ -572,6 +636,11 @@ public class WriterImpl implements Write
     private final OrcProto.RowIndex.Builder rowIndex;
     private final OrcProto.RowIndexEntry.Builder rowIndexEntry;
     private final PositionedOutputStream rowIndexStream;
+    private final PositionedOutputStream bloomFilterStream;
+    protected final BloomFilter bloomFilter;
+    protected final boolean createBloomFilter;
+    private final OrcProto.BloomFilterIndex.Builder bloomFilterIndex;
+    private final OrcProto.BloomFilter.Builder bloomFilterEntry;
     private boolean foundNulls;
     private OutStream isPresentOutStream;
     private final List<StripeStatistics.Builder> stripeStatsBuilders;
@@ -598,6 +667,7 @@ public class WriterImpl implements Write
         isPresent = null;
       }
       this.foundNulls = false;
+      createBloomFilter = streamFactory.getBloomFilterColumns()[columnId];
       indexStatistics = ColumnStatisticsImpl.create(inspector);
       stripeColStatistics = ColumnStatisticsImpl.create(inspector);
       fileStatistics = ColumnStatisticsImpl.create(inspector);
@@ -607,11 +677,22 @@ public class WriterImpl implements Write
       rowIndexPosition = new RowIndexPositionRecorder(rowIndexEntry);
       stripeStatsBuilders = Lists.newArrayList();
       if (streamFactory.buildIndex()) {
-        rowIndexStream = streamFactory.createStream(id,
-            OrcProto.Stream.Kind.ROW_INDEX);
+        rowIndexStream = streamFactory.createStream(id, OrcProto.Stream.Kind.ROW_INDEX);
       } else {
         rowIndexStream = null;
       }
+      if (createBloomFilter) {
+        bloomFilterEntry = OrcProto.BloomFilter.newBuilder();
+        bloomFilterIndex = OrcProto.BloomFilterIndex.newBuilder();
+        bloomFilterStream = streamFactory.createStream(id, OrcProto.Stream.Kind.BLOOM_FILTER);
+        bloomFilter = new BloomFilter(streamFactory.getRowIndexStride(),
+            streamFactory.getBloomFilterFPP());
+      } else {
+        bloomFilterEntry = null;
+        bloomFilterIndex = null;
+        bloomFilterStream = null;
+        bloomFilter = null;
+      }
     }
 
     protected OrcProto.RowIndex.Builder getRowIndex() {
@@ -725,6 +806,14 @@ public class WriterImpl implements Write
       }
       rowIndex.clear();
       rowIndexEntry.clear();
+
+      // write the bloom filter to out stream
+      if (bloomFilterStream != null) {
+        bloomFilterIndex.build().writeTo(bloomFilterStream);
+        bloomFilterStream.flush();
+        bloomFilterIndex.clear();
+        bloomFilterEntry.clear();
+      }
     }
 
     private void writeStripeStatistics(OrcProto.StripeStatistics.Builder builder,
@@ -763,12 +852,23 @@ public class WriterImpl implements Write
       indexStatistics.reset();
       rowIndex.addEntry(rowIndexEntry);
       rowIndexEntry.clear();
+      addBloomFilterEntry();
       recordPosition(rowIndexPosition);
       for(TreeWriter child: childrenWriters) {
         child.createRowIndexEntry();
       }
     }
 
+    void addBloomFilterEntry() {
+      if (createBloomFilter) {
+        bloomFilterEntry.setNumHashFunctions(bloomFilter.getNumHashFunctions());
+        bloomFilterEntry.addAllBitset(Longs.asList(bloomFilter.getBitSet()));
+        bloomFilterIndex.addBloomFilter(bloomFilterEntry.build());
+        bloomFilter.reset();
+        bloomFilterEntry.clear();
+      }
+    }
+
     /**
      * Record the current position in each of this column's streams.
      * @param recorder where should the locations be recorded
@@ -851,6 +951,9 @@ public class WriterImpl implements Write
       if (obj != null) {
         byte val = ((ByteObjectInspector) inspector).get(obj);
         indexStatistics.updateInteger(val);
+        if (createBloomFilter) {
+          bloomFilter.addLong(val);
+        }
         writer.write(val);
       }
     }
@@ -926,6 +1029,10 @@ public class WriterImpl implements Write
           val = shortInspector.get(obj);
         }
         indexStatistics.updateInteger(val);
+        if (createBloomFilter) {
+          // integers are converted to longs in column statistics and during SARG evaluation
+          bloomFilter.addLong(val);
+        }
         writer.write(val);
       }
     }
@@ -966,6 +1073,10 @@ public class WriterImpl implements Write
       if (obj != null) {
         float val = ((FloatObjectInspector) inspector).get(obj);
         indexStatistics.updateDouble(val);
+        if (createBloomFilter) {
+          // floats are converted to doubles in column statistics and during SARG evaluation
+          bloomFilter.addDouble(val);
+        }
         utils.writeFloat(stream, val);
       }
     }
@@ -1006,6 +1117,9 @@ public class WriterImpl implements Write
       if (obj != null) {
         double val = ((DoubleObjectInspector) inspector).get(obj);
         indexStatistics.updateDouble(val);
+        if (createBloomFilter) {
+          bloomFilter.addDouble(val);
+        }
         utils.writeDouble(stream, val);
       }
     }
@@ -1099,6 +1213,9 @@ public class WriterImpl implements Write
           directLengthOutput.write(val.getLength());
         }
         indexStatistics.updateString(val);
+        if (createBloomFilter) {
+          bloomFilter.addBytes(val.getBytes(), val.getLength());
+        }
       }
     }
 
@@ -1258,6 +1375,7 @@ public class WriterImpl implements Write
       OrcProto.RowIndexEntry base = rowIndexEntry.build();
       savedRowIndex.add(base);
       rowIndexEntry.clear();
+      addBloomFilterEntry();
       recordPosition(rowIndexPosition);
       rowIndexValueCount.add(Long.valueOf(rows.size()));
       if (strideDictionaryCheck) {
@@ -1368,6 +1486,9 @@ public class WriterImpl implements Write
         stream.write(val.getBytes(), 0, val.getLength());
         length.write(val.getLength());
         indexStatistics.updateBinary(val);
+        if (createBloomFilter) {
+          bloomFilter.addBytes(val.getBytes(), val.getLength());
+        }
       }
     }
 
@@ -1430,6 +1551,9 @@ public class WriterImpl implements Write
         indexStatistics.updateTimestamp(val);
         seconds.write((val.getTime() / MILLIS_PER_SECOND) - BASE_TIMESTAMP);
         nanos.write(formatNanos(val.getNanos()));
+        if (createBloomFilter) {
+          bloomFilter.addLong(val.getTime());
+        }
       }
     }
 
@@ -1490,6 +1614,9 @@ public class WriterImpl implements Write
         DateWritable val = ((DateObjectInspector) inspector).getPrimitiveWritableObject(obj);
         indexStatistics.updateDate(val);
         writer.write(val.getDays());
+        if (createBloomFilter) {
+          bloomFilter.addLong(val.getDays());
+        }
       }
     }
 
@@ -1558,6 +1685,9 @@ public class WriterImpl implements Write
             decimal.unscaledValue());
         scaleStream.write(decimal.scale());
         indexStatistics.updateDecimal(decimal);
+        if (createBloomFilter) {
+          bloomFilter.addString(decimal.toString());
+        }
       }
     }
 
@@ -1657,6 +1787,9 @@ public class WriterImpl implements Write
         ListObjectInspector insp = (ListObjectInspector) inspector;
         int len = insp.getListLength(obj);
         lengths.write(len);
+        if (createBloomFilter) {
+          bloomFilter.addLong(len);
+        }
         for(int i=0; i < len; ++i) {
           childrenWriters[0].write(insp.getListElement(obj, i));
         }
@@ -1721,6 +1854,9 @@ public class WriterImpl implements Write
         // accessor in the MapObjectInspector.
         Map<?, ?> valueMap = insp.getMap(obj);
         lengths.write(valueMap.size());
+        if (createBloomFilter) {
+          bloomFilter.addLong(valueMap.size());
+        }
         for(Map.Entry<?, ?> entry: valueMap.entrySet()) {
           childrenWriters[0].write(entry.getKey());
           childrenWriters[1].write(entry.getValue());
@@ -1773,6 +1909,9 @@ public class WriterImpl implements Write
         UnionObjectInspector insp = (UnionObjectInspector) inspector;
         byte tag = insp.getTag(obj);
         tags.write(tag);
+        if (createBloomFilter) {
+          bloomFilter.addLong(tag);
+        }
         childrenWriters[tag].write(insp.getField(obj));
       }
     }

Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/MapredParquetOutputFormat.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/MapredParquetOutputFormat.java?rev=1661271&r1=1661270&r2=1661271&view=diff
==============================================================================
--- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/MapredParquetOutputFormat.java (original)
+++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/MapredParquetOutputFormat.java Sat Feb 21 02:36:54 2015
@@ -21,7 +21,6 @@ import java.util.Properties;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.ql.io.HiveOutputFormat;
@@ -29,10 +28,10 @@ import org.apache.hadoop.hive.ql.io.IOCo
 import org.apache.hadoop.hive.ql.io.parquet.convert.HiveSchemaConverter;
 import org.apache.hadoop.hive.ql.io.parquet.write.DataWritableWriteSupport;
 import org.apache.hadoop.hive.ql.io.parquet.write.ParquetRecordWriterWrapper;
+import org.apache.hadoop.hive.serde2.io.ParquetHiveRecord;
 import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
 import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
 import org.apache.hadoop.hive.shims.ShimLoader;
-import org.apache.hadoop.io.ArrayWritable;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.mapred.FileOutputFormat;
 import org.apache.hadoop.mapred.JobConf;
@@ -41,27 +40,25 @@ import org.apache.hadoop.mapreduce.Outpu
 import org.apache.hadoop.util.Progressable;
 
 import parquet.hadoop.ParquetOutputFormat;
-import parquet.hadoop.metadata.CompressionCodecName;
-import parquet.hadoop.util.ContextUtil;
 
 /**
  *
  * A Parquet OutputFormat for Hive (with the deprecated package mapred)
  *
  */
-public class MapredParquetOutputFormat extends FileOutputFormat<Void, ArrayWritable> implements
-  HiveOutputFormat<Void, ArrayWritable> {
+public class MapredParquetOutputFormat extends FileOutputFormat<Void, ParquetHiveRecord> implements
+  HiveOutputFormat<Void, ParquetHiveRecord> {
 
   private static final Log LOG = LogFactory.getLog(MapredParquetOutputFormat.class);
 
-  protected ParquetOutputFormat<ArrayWritable> realOutputFormat;
+  protected ParquetOutputFormat<ParquetHiveRecord> realOutputFormat;
 
   public MapredParquetOutputFormat() {
-    realOutputFormat = new ParquetOutputFormat<ArrayWritable>(new DataWritableWriteSupport());
+    realOutputFormat = new ParquetOutputFormat<ParquetHiveRecord>(new DataWritableWriteSupport());
   }
 
-  public MapredParquetOutputFormat(final OutputFormat<Void, ArrayWritable> mapreduceOutputFormat) {
-    realOutputFormat = (ParquetOutputFormat<ArrayWritable>) mapreduceOutputFormat;
+  public MapredParquetOutputFormat(final OutputFormat<Void, ParquetHiveRecord> mapreduceOutputFormat) {
+    realOutputFormat = (ParquetOutputFormat<ParquetHiveRecord>) mapreduceOutputFormat;
   }
 
   @Override
@@ -70,7 +67,7 @@ public class MapredParquetOutputFormat e
   }
 
   @Override
-  public RecordWriter<Void, ArrayWritable> getRecordWriter(
+  public RecordWriter<Void, ParquetHiveRecord> getRecordWriter(
       final FileSystem ignored,
       final JobConf job,
       final String name,
@@ -119,7 +116,7 @@ public class MapredParquetOutputFormat e
   }
 
   protected ParquetRecordWriterWrapper getParquerRecordWriterWrapper(
-      ParquetOutputFormat<ArrayWritable> realOutputFormat,
+      ParquetOutputFormat<ParquetHiveRecord> realOutputFormat,
       JobConf jobConf,
       String finalOutPath,
       Progressable progress,

Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/serde/ParquetHiveSerDe.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/serde/ParquetHiveSerDe.java?rev=1661271&r1=1661270&r2=1661271&view=diff
==============================================================================
--- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/serde/ParquetHiveSerDe.java (original)
+++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/serde/ParquetHiveSerDe.java Sat Feb 21 02:36:54 2015
@@ -13,61 +13,31 @@
  */
 package org.apache.hadoop.hive.ql.io.parquet.serde;
 
-import java.io.UnsupportedEncodingException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
 import java.util.Properties;
 
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hive.common.type.HiveDecimal;
 import org.apache.hadoop.hive.serde.serdeConstants;
 import org.apache.hadoop.hive.serde2.AbstractSerDe;
 import org.apache.hadoop.hive.serde2.SerDeException;
 import org.apache.hadoop.hive.serde2.SerDeSpec;
 import org.apache.hadoop.hive.serde2.SerDeStats;
-import org.apache.hadoop.hive.serde2.io.ByteWritable;
-import org.apache.hadoop.hive.serde2.io.DoubleWritable;
-import org.apache.hadoop.hive.serde2.io.ShortWritable;
-import org.apache.hadoop.hive.serde2.io.TimestampWritable;
-import org.apache.hadoop.hive.serde2.objectinspector.ListObjectInspector;
-import org.apache.hadoop.hive.serde2.objectinspector.MapObjectInspector;
+import org.apache.hadoop.hive.serde2.io.ParquetHiveRecord;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector.Category;
-import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
-import org.apache.hadoop.hive.serde2.objectinspector.primitive.BinaryObjectInspector;
-import org.apache.hadoop.hive.serde2.objectinspector.primitive.BooleanObjectInspector;
-import org.apache.hadoop.hive.serde2.objectinspector.primitive.ByteObjectInspector;
-import org.apache.hadoop.hive.serde2.objectinspector.primitive.DoubleObjectInspector;
-import org.apache.hadoop.hive.serde2.objectinspector.primitive.FloatObjectInspector;
-import org.apache.hadoop.hive.serde2.objectinspector.primitive.HiveCharObjectInspector;
-import org.apache.hadoop.hive.serde2.objectinspector.primitive.HiveVarcharObjectInspector;
-import org.apache.hadoop.hive.serde2.objectinspector.primitive.IntObjectInspector;
-import org.apache.hadoop.hive.serde2.objectinspector.primitive.LongObjectInspector;
-import org.apache.hadoop.hive.serde2.objectinspector.primitive.ShortObjectInspector;
-import org.apache.hadoop.hive.serde2.objectinspector.primitive.StringObjectInspector;
-import org.apache.hadoop.hive.serde2.objectinspector.primitive.TimestampObjectInspector;
-import org.apache.hadoop.hive.serde2.objectinspector.StructField;
 import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
 
-import org.apache.hadoop.hive.serde2.typeinfo.DecimalTypeInfo;
 import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo;
 import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
 import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
 import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
 import org.apache.hadoop.io.ArrayWritable;
-import org.apache.hadoop.io.BooleanWritable;
-import org.apache.hadoop.io.BytesWritable;
-import org.apache.hadoop.io.FloatWritable;
-import org.apache.hadoop.io.IntWritable;
-import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Writable;
 import parquet.hadoop.ParquetOutputFormat;
 import parquet.hadoop.ParquetWriter;
-import parquet.io.api.Binary;
 
 /**
  *
@@ -110,6 +80,13 @@ public class ParquetHiveSerDe extends Ab
   private long deserializedSize;
   private String compressionType;
 
+  private ParquetHiveRecord parquetRow;
+
+  public ParquetHiveSerDe() {
+    parquetRow = new ParquetHiveRecord();
+    stats = new SerDeStats();
+  }
+
   @Override
   public final void initialize(final Configuration conf, final Properties tbl) throws SerDeException {
 
@@ -144,7 +121,6 @@ public class ParquetHiveSerDe extends Ab
     this.objInspector = new ArrayWritableObjectInspector((StructTypeInfo) rowTypeInfo);
 
     // Stats part
-    stats = new SerDeStats();
     serializedSize = 0;
     deserializedSize = 0;
     status = LAST_OPERATION.UNKNOWN;
@@ -169,7 +145,7 @@ public class ParquetHiveSerDe extends Ab
 
   @Override
   public Class<? extends Writable> getSerializedClass() {
-    return ArrayWritable.class;
+    return ParquetHiveRecord.class;
   }
 
   @Override
@@ -178,154 +154,11 @@ public class ParquetHiveSerDe extends Ab
     if (!objInspector.getCategory().equals(Category.STRUCT)) {
       throw new SerDeException("Cannot serialize " + objInspector.getCategory() + ". Can only serialize a struct");
     }
-    final ArrayWritable serializeData = createStruct(obj, (StructObjectInspector) objInspector);
-    serializedSize = serializeData.get().length;
+    serializedSize = ((StructObjectInspector)objInspector).getAllStructFieldRefs().size();
     status = LAST_OPERATION.SERIALIZE;
-    return serializeData;
-  }
-
-  private ArrayWritable createStruct(final Object obj, final StructObjectInspector inspector)
-      throws SerDeException {
-    final List<? extends StructField> fields = inspector.getAllStructFieldRefs();
-    final Writable[] arr = new Writable[fields.size()];
-    for (int i = 0; i < fields.size(); i++) {
-      final StructField field = fields.get(i);
-      final Object subObj = inspector.getStructFieldData(obj, field);
-      final ObjectInspector subInspector = field.getFieldObjectInspector();
-      arr[i] = createObject(subObj, subInspector);
-    }
-    return new ArrayWritable(Writable.class, arr);
-  }
-
-  private Writable createMap(final Object obj, final MapObjectInspector inspector)
-      throws SerDeException {
-    final Map<?, ?> sourceMap = inspector.getMap(obj);
-    final ObjectInspector keyInspector = inspector.getMapKeyObjectInspector();
-    final ObjectInspector valueInspector = inspector.getMapValueObjectInspector();
-    final List<ArrayWritable> array = new ArrayList<ArrayWritable>();
-
-    if (sourceMap != null) {
-      for (final Entry<?, ?> keyValue : sourceMap.entrySet()) {
-        final Writable key = createObject(keyValue.getKey(), keyInspector);
-        final Writable value = createObject(keyValue.getValue(), valueInspector);
-        if (key != null) {
-          Writable[] arr = new Writable[2];
-          arr[0] = key;
-          arr[1] = value;
-          array.add(new ArrayWritable(Writable.class, arr));
-        }
-      }
-    }
-    if (array.size() > 0) {
-      final ArrayWritable subArray = new ArrayWritable(ArrayWritable.class,
-          array.toArray(new ArrayWritable[array.size()]));
-      return new ArrayWritable(Writable.class, new Writable[] {subArray});
-    } else {
-      return null;
-    }
-  }
-
-  private ArrayWritable createArray(final Object obj, final ListObjectInspector inspector)
-      throws SerDeException {
-    final List<?> sourceArray = inspector.getList(obj);
-    final ObjectInspector subInspector = inspector.getListElementObjectInspector();
-    final List<Writable> array = new ArrayList<Writable>();
-    if (sourceArray != null) {
-      for (final Object curObj : sourceArray) {
-        array.add(createObject(curObj, subInspector));
-      }
-    }
-    if (array.size() > 0) {
-      final ArrayWritable subArray = new ArrayWritable(Writable.class,
-          array.toArray(new Writable[array.size()]));
-      return new ArrayWritable(Writable.class, new Writable[] {subArray});
-    } else {
-      return null;
-    }
-  }
-
-  private Writable createPrimitive(final Object obj, final PrimitiveObjectInspector inspector)
-      throws SerDeException {
-    if (obj == null) {
-      return null;
-    }
-    switch (inspector.getPrimitiveCategory()) {
-    case VOID:
-      return null;
-    case BOOLEAN:
-      return new BooleanWritable(((BooleanObjectInspector) inspector).get(obj) ? Boolean.TRUE : Boolean.FALSE);
-    case BYTE:
-      return new ByteWritable(((ByteObjectInspector) inspector).get(obj));
-    case DOUBLE:
-      return new DoubleWritable(((DoubleObjectInspector) inspector).get(obj));
-    case FLOAT:
-      return new FloatWritable(((FloatObjectInspector) inspector).get(obj));
-    case INT:
-      return new IntWritable(((IntObjectInspector) inspector).get(obj));
-    case LONG:
-      return new LongWritable(((LongObjectInspector) inspector).get(obj));
-    case SHORT:
-      return new ShortWritable(((ShortObjectInspector) inspector).get(obj));
-    case STRING:
-      String v = ((StringObjectInspector) inspector).getPrimitiveJavaObject(obj);
-      try {
-        return new BytesWritable(v.getBytes("UTF-8"));
-      } catch (UnsupportedEncodingException e) {
-        throw new SerDeException("Failed to encode string in UTF-8", e);
-      }
-    case DECIMAL:
-      HiveDecimal hd = (HiveDecimal)inspector.getPrimitiveJavaObject(obj);
-      DecimalTypeInfo decTypeInfo = (DecimalTypeInfo) inspector.getTypeInfo();
-      int prec = decTypeInfo.precision();
-      int scale = decTypeInfo.scale();
-      byte[] src = hd.setScale(scale).unscaledValue().toByteArray();
-      // Estimated number of bytes needed.
-      int bytes =  PRECISION_TO_BYTE_COUNT[prec - 1];
-      if (bytes == src.length) {
-        // No padding needed.
-        return new BytesWritable(src);
-      }
-      byte[] tgt = new byte[bytes];
-      if ( hd.signum() == -1) {
-        // For negative number, initializing bits to 1
-        for (int i = 0; i < bytes; i++) {
-          tgt[i] |= 0xFF;
-        }
-      }
-      System.arraycopy(src, 0, tgt, bytes - src.length, src.length); // Padding leading zeroes/ones.
-      return new BytesWritable(tgt);
-    case TIMESTAMP:
-      return new TimestampWritable(((TimestampObjectInspector) inspector).getPrimitiveJavaObject(obj));
-    case CHAR:
-      String strippedValue = ((HiveCharObjectInspector) inspector).getPrimitiveJavaObject(obj).getStrippedValue();
-      return new BytesWritable(Binary.fromString(strippedValue).getBytes());
-    case VARCHAR:
-      String value = ((HiveVarcharObjectInspector) inspector).getPrimitiveJavaObject(obj).getValue();
-      return new BytesWritable(Binary.fromString(value).getBytes());
-    case BINARY:
-      return new BytesWritable(((BinaryObjectInspector) inspector).getPrimitiveJavaObject(obj));
-    default:
-      throw new SerDeException("Unknown primitive : " + inspector.getPrimitiveCategory());
-    }
-  }
-
-  private Writable createObject(final Object obj, final ObjectInspector inspector) throws SerDeException {
-    if (obj == null) {
-      return null;
-    }
-
-    switch (inspector.getCategory()) {
-    case STRUCT:
-      return createStruct(obj, (StructObjectInspector) inspector);
-    case LIST:
-      return createArray(obj, (ListObjectInspector) inspector);
-    case MAP:
-      return createMap(obj, (MapObjectInspector) inspector);
-    case PRIMITIVE:
-      return createPrimitive(obj, (PrimitiveObjectInspector) inspector);
-    default:
-      throw new SerDeException("Unknown data type" + inspector.getCategory());
-    }
+    parquetRow.value = obj;
+    parquetRow.inspector= (StructObjectInspector)objInspector;
+    return parquetRow;
   }
 
   @Override

Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/write/DataWritableWriteSupport.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/write/DataWritableWriteSupport.java?rev=1661271&r1=1661270&r2=1661271&view=diff
==============================================================================
--- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/write/DataWritableWriteSupport.java (original)
+++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/write/DataWritableWriteSupport.java Sat Feb 21 02:36:54 2015
@@ -16,7 +16,7 @@ package org.apache.hadoop.hive.ql.io.par
 import java.util.HashMap;
 
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.ArrayWritable;
+import org.apache.hadoop.hive.serde2.io.ParquetHiveRecord;
 
 import parquet.hadoop.api.WriteSupport;
 import parquet.io.api.RecordConsumer;
@@ -28,7 +28,7 @@ import parquet.schema.MessageTypeParser;
  * DataWritableWriteSupport is a WriteSupport for the DataWritableWriter
  *
  */
-public class DataWritableWriteSupport extends WriteSupport<ArrayWritable> {
+public class DataWritableWriteSupport extends WriteSupport<ParquetHiveRecord> {
 
   public static final String PARQUET_HIVE_SCHEMA = "parquet.hive.schema";
 
@@ -55,7 +55,7 @@ public class DataWritableWriteSupport ex
   }
 
   @Override
-  public void write(final ArrayWritable record) {
+  public void write(final ParquetHiveRecord record) {
     writer.write(record);
   }
 }

Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/write/DataWritableWriter.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/write/DataWritableWriter.java?rev=1661271&r1=1661270&r2=1661271&view=diff
==============================================================================
--- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/write/DataWritableWriter.java (original)
+++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/write/DataWritableWriter.java Sat Feb 21 02:36:54 2015
@@ -13,37 +13,29 @@
  */
 package org.apache.hadoop.hive.ql.io.parquet.write;
 
-import java.sql.Timestamp;
-
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hive.ql.io.parquet.timestamp.NanoTime;
+import org.apache.hadoop.hive.common.type.HiveDecimal;
+import org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe;
 import org.apache.hadoop.hive.ql.io.parquet.timestamp.NanoTimeUtils;
-import org.apache.hadoop.hive.serde2.io.ByteWritable;
-import org.apache.hadoop.hive.serde2.io.DoubleWritable;
-import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable;
-import org.apache.hadoop.hive.serde2.io.ShortWritable;
-import org.apache.hadoop.hive.serde2.io.TimestampWritable;
-import org.apache.hadoop.io.ArrayWritable;
-import org.apache.hadoop.io.BooleanWritable;
-import org.apache.hadoop.io.BytesWritable;
-import org.apache.hadoop.io.FloatWritable;
-import org.apache.hadoop.io.IntWritable;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.Writable;
-
+import org.apache.hadoop.hive.serde2.io.ParquetHiveRecord;
+import org.apache.hadoop.hive.serde2.objectinspector.*;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.*;
+import org.apache.hadoop.hive.serde2.typeinfo.DecimalTypeInfo;
 import parquet.io.api.Binary;
 import parquet.io.api.RecordConsumer;
 import parquet.schema.GroupType;
 import parquet.schema.OriginalType;
 import parquet.schema.Type;
 
+import java.sql.Timestamp;
+import java.util.List;
+import java.util.Map;
+
 /**
  *
- * DataWritableWriter is a writer,
- * that will read an ArrayWritable and give the data to parquet
- * with the expected schema
- * This is a helper class used by DataWritableWriteSupport class.
+ * DataWritableWriter is a writer that reads a ParquetWritable object and send the data to the Parquet
+ * API with the expected schema. This class is only used through DataWritableWriteSupport class.
  */
 public class DataWritableWriter {
   private static final Log LOG = LogFactory.getLog(DataWritableWriter.class);
@@ -57,13 +49,13 @@ public class DataWritableWriter {
 
   /**
    * It writes all record values to the Parquet RecordConsumer.
-   * @param record Contains the record of values that are going to be written
+   * @param record Contains the record that are going to be written.
    */
-  public void write(final ArrayWritable record) {
+  public void write(final ParquetHiveRecord record) {
     if (record != null) {
       recordConsumer.startMessage();
       try {
-        writeGroupFields(record, schema);
+        writeGroupFields(record.getObject(), record.getObjectInspector(), schema);
       } catch (RuntimeException e) {
         String errorMessage = "Parquet record is malformed: " + e.getMessage();
         LOG.error(errorMessage, e);
@@ -76,19 +68,23 @@ public class DataWritableWriter {
   /**
    * It writes all the fields contained inside a group to the RecordConsumer.
    * @param value The list of values contained in the group.
+   * @param inspector The object inspector used to get the correct value type.
    * @param type Type that contains information about the group schema.
    */
-  public void writeGroupFields(final ArrayWritable value, final GroupType type) {
+  private void writeGroupFields(final Object value, final StructObjectInspector inspector, final GroupType type) {
     if (value != null) {
+      List<? extends StructField> fields = inspector.getAllStructFieldRefs();
+      List<Object> fieldValuesList = inspector.getStructFieldsDataAsList(value);
+
       for (int i = 0; i < type.getFieldCount(); i++) {
         Type fieldType = type.getType(i);
         String fieldName = fieldType.getName();
-        Writable fieldValue = value.get()[i];
+        Object fieldValue = fieldValuesList.get(i);
 
-        // Parquet does not write null elements
         if (fieldValue != null) {
+          ObjectInspector fieldInspector = fields.get(i).getFieldObjectInspector();
           recordConsumer.startField(fieldName, i);
-          writeValue(fieldValue, fieldType);
+          writeValue(fieldValue, fieldInspector, fieldType);
           recordConsumer.endField(fieldName, i);
         }
       }
@@ -96,68 +92,93 @@ public class DataWritableWriter {
   }
 
   /**
-   * It writes the field value to the Parquet RecordConsumer. It detects the field type, and writes
+   * It writes the field value to the Parquet RecordConsumer. It detects the field type, and calls
    * the correct write function.
    * @param value The writable object that contains the value.
+   * @param inspector The object inspector used to get the correct value type.
    * @param type Type that contains information about the type schema.
    */
-  private void writeValue(final Writable value, final Type type) {
+  private void writeValue(final Object value, final ObjectInspector inspector, final Type type) {
     if (type.isPrimitive()) {
-      writePrimitive(value);
-    } else if (value instanceof ArrayWritable) {
+      checkInspectorCategory(inspector, ObjectInspector.Category.PRIMITIVE);
+      writePrimitive(value, (PrimitiveObjectInspector)inspector);
+    } else {
       GroupType groupType = type.asGroupType();
       OriginalType originalType = type.getOriginalType();
 
       if (originalType != null && originalType.equals(OriginalType.LIST)) {
-        writeArray((ArrayWritable)value, groupType);
+        checkInspectorCategory(inspector, ObjectInspector.Category.LIST);
+        writeArray(value, (ListObjectInspector)inspector, groupType);
       } else if (originalType != null && originalType.equals(OriginalType.MAP)) {
-        writeMap((ArrayWritable)value, groupType);
+        checkInspectorCategory(inspector, ObjectInspector.Category.MAP);
+        writeMap(value, (MapObjectInspector)inspector, groupType);
       } else {
-        writeGroup((ArrayWritable) value, groupType);
+        checkInspectorCategory(inspector, ObjectInspector.Category.STRUCT);
+        writeGroup(value, (StructObjectInspector)inspector, groupType);
       }
-    } else {
-      throw new RuntimeException("Field value is not an ArrayWritable object: " + type);
+    }
+  }
+
+  /**
+   * Checks that an inspector matches the category indicated as a parameter.
+   * @param inspector The object inspector to check
+   * @param category The category to match
+   * @throws IllegalArgumentException if inspector does not match the category
+   */
+  private void checkInspectorCategory(ObjectInspector inspector, ObjectInspector.Category category) {
+    if (!inspector.getCategory().equals(category)) {
+      throw new IllegalArgumentException("Invalid data type: expected " + category
+          + " type, but found: " + inspector.getCategory());
     }
   }
 
   /**
    * It writes a group type and all its values to the Parquet RecordConsumer.
    * This is used only for optional and required groups.
-   * @param value ArrayWritable object that contains the group values
-   * @param type Type that contains information about the group schema
+   * @param value Object that contains the group values.
+   * @param inspector The object inspector used to get the correct value type.
+   * @param type Type that contains information about the group schema.
    */
-  private void writeGroup(final ArrayWritable value, final GroupType type) {
+  private void writeGroup(final Object value, final StructObjectInspector inspector, final GroupType type) {
     recordConsumer.startGroup();
-    writeGroupFields(value, type);
+    writeGroupFields(value, inspector, type);
     recordConsumer.endGroup();
   }
 
   /**
-   * It writes a map type and its key-pair values to the Parquet RecordConsumer.
-   * This is called when the original type (MAP) is detected by writeValue()
-   * @param value The list of map values that contains the repeated KEY_PAIR_VALUE group type
-   * @param type Type that contains information about the group schema
+   * It writes a list type and its array elements to the Parquet RecordConsumer.
+   * This is called when the original type (LIST) is detected by writeValue()/
+   * This function assumes the following schema:
+   *    optional group arrayCol (LIST) {
+   *      repeated group array {
+   *        optional TYPE array_element;
+   *      }
+   *    }
+   * @param value The object that contains the array values.
+   * @param inspector The object inspector used to get the correct value type.
+   * @param type Type that contains information about the group (LIST) schema.
    */
-  private void writeMap(final ArrayWritable value, final GroupType type) {
+  private void writeArray(final Object value, final ListObjectInspector inspector, final GroupType type) {
+    // Get the internal array structure
     GroupType repeatedType = type.getType(0).asGroupType();
-    ArrayWritable repeatedValue = (ArrayWritable)value.get()[0];
 
     recordConsumer.startGroup();
     recordConsumer.startField(repeatedType.getName(), 0);
 
-    Writable[] map_values = repeatedValue.get();
-    for (int record = 0; record < map_values.length; record++) {
-      Writable key_value_pair = map_values[record];
-      if (key_value_pair != null) {
-        // Hive wraps a map key-pair into an ArrayWritable
-        if (key_value_pair instanceof ArrayWritable) {
-          writeGroup((ArrayWritable)key_value_pair, repeatedType);
-        } else {
-          throw new RuntimeException("Map key-value pair is not an ArrayWritable object on record " + record);
-        }
-      } else {
-        throw new RuntimeException("Map key-value pair is null on record " + record);
+    List<?> arrayValues = inspector.getList(value);
+    ObjectInspector elementInspector = inspector.getListElementObjectInspector();
+
+    Type elementType = repeatedType.getType(0);
+    String elementName = elementType.getName();
+
+    for (Object element : arrayValues) {
+      recordConsumer.startGroup();
+      if (element != null) {
+        recordConsumer.startField(elementName, 0);
+        writeValue(element, elementInspector, elementType);
+        recordConsumer.endField(elementName, 0);
       }
+      recordConsumer.endGroup();
     }
 
     recordConsumer.endField(repeatedType.getName(), 0);
@@ -165,35 +186,53 @@ public class DataWritableWriter {
   }
 
   /**
-   * It writes a list type and its array elements to the Parquet RecordConsumer.
-   * This is called when the original type (LIST) is detected by writeValue()
-   * @param array The list of array values that contains the repeated array group type
-   * @param type Type that contains information about the group schema
+   * It writes a map type and its key-pair values to the Parquet RecordConsumer.
+   * This is called when the original type (MAP) is detected by writeValue().
+   * This function assumes the following schema:
+   *    optional group mapCol (MAP) {
+   *      repeated group map (MAP_KEY_VALUE) {
+   *        required TYPE key;
+   *        optional TYPE value;
+   *      }
+   *    }
+   * @param value The object that contains the map key-values.
+   * @param inspector The object inspector used to get the correct value type.
+   * @param type Type that contains information about the group (MAP) schema.
    */
-  private void writeArray(final ArrayWritable array, final GroupType type) {
+  private void writeMap(final Object value, final MapObjectInspector inspector, final GroupType type) {
+    // Get the internal map structure (MAP_KEY_VALUE)
     GroupType repeatedType = type.getType(0).asGroupType();
-    ArrayWritable repeatedValue = (ArrayWritable)array.get()[0];
 
     recordConsumer.startGroup();
     recordConsumer.startField(repeatedType.getName(), 0);
 
-    Writable[] array_values = repeatedValue.get();
-    for (int record = 0; record < array_values.length; record++) {
-      recordConsumer.startGroup();
+    Map<?, ?> mapValues = inspector.getMap(value);
 
-      // Null values must be wrapped into startGroup/endGroup
-      Writable element = array_values[record];
-      if (element != null) {
-        for (int i = 0; i < type.getFieldCount(); i++) {
-          Type fieldType = repeatedType.getType(i);
-          String fieldName = fieldType.getName();
+    Type keyType = repeatedType.getType(0);
+    String keyName = keyType.getName();
+    ObjectInspector keyInspector = inspector.getMapKeyObjectInspector();
 
-          recordConsumer.startField(fieldName, i);
-          writeValue(element, fieldType);
-          recordConsumer.endField(fieldName, i);
+    Type valuetype = repeatedType.getType(1);
+    String valueName = valuetype.getName();
+    ObjectInspector valueInspector = inspector.getMapValueObjectInspector();
+
+    for (Map.Entry<?, ?> keyValue : mapValues.entrySet()) {
+      recordConsumer.startGroup();
+      if (keyValue != null) {
+        // write key element
+        Object keyElement = keyValue.getKey();
+        recordConsumer.startField(keyName, 0);
+        writeValue(keyElement, keyInspector, keyType);
+        recordConsumer.endField(keyName, 0);
+
+        // write value element
+        Object valueElement = keyValue.getValue();
+        if (valueElement != null) {
+          recordConsumer.startField(valueName, 1);
+          writeValue(valueElement, valueInspector, valuetype);
+          recordConsumer.endField(valueName, 1);
         }
       }
-
       recordConsumer.endGroup();
     }
 
@@ -203,36 +242,89 @@ public class DataWritableWriter {
 
   /**
    * It writes the primitive value to the Parquet RecordConsumer.
-   * @param value The writable object that contains the primitive value.
+   * @param value The object that contains the primitive value.
+   * @param inspector The object inspector used to get the correct value type.
    */
-  private void writePrimitive(final Writable value) {
+  private void writePrimitive(final Object value, final PrimitiveObjectInspector inspector) {
     if (value == null) {
       return;
     }
-    if (value instanceof DoubleWritable) {
-      recordConsumer.addDouble(((DoubleWritable) value).get());
-    } else if (value instanceof BooleanWritable) {
-      recordConsumer.addBoolean(((BooleanWritable) value).get());
-    } else if (value instanceof FloatWritable) {
-      recordConsumer.addFloat(((FloatWritable) value).get());
-    } else if (value instanceof IntWritable) {
-      recordConsumer.addInteger(((IntWritable) value).get());
-    } else if (value instanceof LongWritable) {
-      recordConsumer.addLong(((LongWritable) value).get());
-    } else if (value instanceof ShortWritable) {
-      recordConsumer.addInteger(((ShortWritable) value).get());
-    } else if (value instanceof ByteWritable) {
-      recordConsumer.addInteger(((ByteWritable) value).get());
-    } else if (value instanceof HiveDecimalWritable) {
-      throw new UnsupportedOperationException("HiveDecimalWritable writing not implemented");
-    } else if (value instanceof BytesWritable) {
-      recordConsumer.addBinary((Binary.fromByteArray(((BytesWritable) value).getBytes())));
-    } else if (value instanceof TimestampWritable) {
-      Timestamp ts = ((TimestampWritable) value).getTimestamp();
-      NanoTime nt = NanoTimeUtils.getNanoTime(ts, false);
-      nt.writeValue(recordConsumer);
-    } else {
-      throw new IllegalArgumentException("Unknown value type: " + value + " " + value.getClass());
+
+    switch (inspector.getPrimitiveCategory()) {
+      case VOID:
+        return;
+      case DOUBLE:
+        recordConsumer.addDouble(((DoubleObjectInspector) inspector).get(value));
+        break;
+      case BOOLEAN:
+        recordConsumer.addBoolean(((BooleanObjectInspector) inspector).get(value));
+        break;
+      case FLOAT:
+        recordConsumer.addFloat(((FloatObjectInspector) inspector).get(value));
+        break;
+      case BYTE:
+        recordConsumer.addInteger(((ByteObjectInspector) inspector).get(value));
+        break;
+      case INT:
+        recordConsumer.addInteger(((IntObjectInspector) inspector).get(value));
+        break;
+      case LONG:
+        recordConsumer.addLong(((LongObjectInspector) inspector).get(value));
+        break;
+      case SHORT:
+        recordConsumer.addInteger(((ShortObjectInspector) inspector).get(value));
+        break;
+      case STRING:
+        String v = ((StringObjectInspector) inspector).getPrimitiveJavaObject(value);
+        recordConsumer.addBinary(Binary.fromString(v));
+        break;
+      case CHAR:
+        String vChar = ((HiveCharObjectInspector) inspector).getPrimitiveJavaObject(value).getStrippedValue();
+        recordConsumer.addBinary(Binary.fromString(vChar));
+        break;
+      case VARCHAR:
+        String vVarchar = ((HiveVarcharObjectInspector) inspector).getPrimitiveJavaObject(value).getValue();
+        recordConsumer.addBinary(Binary.fromString(vVarchar));
+        break;
+      case BINARY:
+        byte[] vBinary = ((BinaryObjectInspector) inspector).getPrimitiveJavaObject(value);
+        recordConsumer.addBinary(Binary.fromByteArray(vBinary));
+        break;
+      case TIMESTAMP:
+        Timestamp ts = ((TimestampObjectInspector) inspector).getPrimitiveJavaObject(value);
+        recordConsumer.addBinary(NanoTimeUtils.getNanoTime(ts, false).toBinary());
+        break;
+      case DECIMAL:
+        HiveDecimal vDecimal = ((HiveDecimal)inspector.getPrimitiveJavaObject(value));
+        DecimalTypeInfo decTypeInfo = (DecimalTypeInfo)inspector.getTypeInfo();
+        recordConsumer.addBinary(decimalToBinary(vDecimal, decTypeInfo));
+        break;
+      default:
+        throw new IllegalArgumentException("Unsupported primitive data type: " + inspector.getPrimitiveCategory());
+    }
+  }
+
+  private Binary decimalToBinary(final HiveDecimal hiveDecimal, final DecimalTypeInfo decimalTypeInfo) {
+    int prec = decimalTypeInfo.precision();
+    int scale = decimalTypeInfo.scale();
+    byte[] decimalBytes = hiveDecimal.setScale(scale).unscaledValue().toByteArray();
+
+    // Estimated number of bytes needed.
+    int precToBytes = ParquetHiveSerDe.PRECISION_TO_BYTE_COUNT[prec - 1];
+    if (precToBytes == decimalBytes.length) {
+      // No padding needed.
+      return Binary.fromByteArray(decimalBytes);
     }
+
+    byte[] tgt = new byte[precToBytes];
+      if (hiveDecimal.signum() == -1) {
+      // For negative number, initializing bits to 1
+      for (int i = 0; i < precToBytes; i++) {
+        tgt[i] |= 0xFF;
+      }
+    }
+
+    System.arraycopy(decimalBytes, 0, tgt, precToBytes - decimalBytes.length, decimalBytes.length); // Padding leading zeroes/ones.
+    return Binary.fromByteArray(tgt);
   }
 }

Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/write/ParquetRecordWriterWrapper.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/write/ParquetRecordWriterWrapper.java?rev=1661271&r1=1661270&r2=1661271&view=diff
==============================================================================
--- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/write/ParquetRecordWriterWrapper.java (original)
+++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/write/ParquetRecordWriterWrapper.java Sat Feb 21 02:36:54 2015
@@ -20,7 +20,6 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.ArrayWritable;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapreduce.JobContext;
@@ -29,22 +28,23 @@ import org.apache.hadoop.mapred.Reporter
 import org.apache.hadoop.mapreduce.OutputFormat;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.hive.serde2.io.ParquetHiveRecord;
 import org.apache.hadoop.util.Progressable;
 
 import parquet.hadoop.ParquetOutputFormat;
 import parquet.hadoop.metadata.CompressionCodecName;
 import parquet.hadoop.util.ContextUtil;
 
-public class ParquetRecordWriterWrapper implements RecordWriter<Void, ArrayWritable>,
+public class ParquetRecordWriterWrapper implements RecordWriter<Void, ParquetHiveRecord>,
   org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter {
 
   public static final Log LOG = LogFactory.getLog(ParquetRecordWriterWrapper.class);
 
-  private final org.apache.hadoop.mapreduce.RecordWriter<Void, ArrayWritable> realWriter;
+  private final org.apache.hadoop.mapreduce.RecordWriter<Void, ParquetHiveRecord> realWriter;
   private final TaskAttemptContext taskContext;
 
   public ParquetRecordWriterWrapper(
-      final OutputFormat<Void, ArrayWritable> realOutputFormat,
+      final OutputFormat<Void, ParquetHiveRecord> realOutputFormat,
       final JobConf jobConf,
       final String name,
       final Progressable progress, Properties tableProperties) throws
@@ -106,7 +106,7 @@ public class ParquetRecordWriterWrapper
   }
 
   @Override
-  public void write(final Void key, final ArrayWritable value) throws IOException {
+  public void write(final Void key, final ParquetHiveRecord value) throws IOException {
     try {
       realWriter.write(key, value);
     } catch (final InterruptedException e) {
@@ -121,7 +121,7 @@ public class ParquetRecordWriterWrapper
 
   @Override
   public void write(final Writable w) throws IOException {
-    write(null, (ArrayWritable) w);
+    write(null, (ParquetHiveRecord) w);
   }
 
 }

Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java?rev=1661271&r1=1661270&r2=1661271&view=diff
==============================================================================
--- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java (original)
+++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java Sat Feb 21 02:36:54 2015
@@ -29,7 +29,6 @@ import static org.apache.hadoop.hive.ser
 
 import java.io.FileNotFoundException;
 import java.io.IOException;
-import java.net.URI;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashMap;
@@ -49,8 +48,6 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.PathFilter;
-import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.hive.common.FileUtils;
 import org.apache.hadoop.hive.common.HiveStatsUtils;
 import org.apache.hadoop.hive.common.ObjectPair;
@@ -73,7 +70,6 @@ import org.apache.hadoop.hive.metastore.
 import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj;
 import org.apache.hadoop.hive.metastore.api.CompactionType;
 import org.apache.hadoop.hive.metastore.api.Database;
-import org.apache.hadoop.hive.metastore.api.EventRequestType;
 import org.apache.hadoop.hive.metastore.api.FieldSchema;
 import org.apache.hadoop.hive.metastore.api.FireEventRequest;
 import org.apache.hadoop.hive.metastore.api.FireEventRequestData;
@@ -102,6 +98,8 @@ import org.apache.hadoop.hive.metastore.
 import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
 import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
 import org.apache.hadoop.hive.ql.ErrorMsg;
+import org.apache.hadoop.hive.ql.exec.FunctionRegistry;
+import org.apache.hadoop.hive.ql.exec.FunctionTask;
 import org.apache.hadoop.hive.ql.exec.Utilities;
 import org.apache.hadoop.hive.ql.index.HiveIndexHandler;
 import org.apache.hadoop.hive.ql.io.AcidUtils;
@@ -156,6 +154,31 @@ public class Hive {
     }
   };
 
+  // register all permanent functions. need improvement
+  static {
+    try {
+      reloadFunctions();
+    } catch (Exception e) {
+      LOG.warn("Failed to access metastore. This class should not accessed in runtime.",e);
+    }
+  }
+
+  public static void reloadFunctions() throws HiveException {
+    Hive db = Hive.get();
+    for (String dbName : db.getAllDatabases()) {
+      for (String functionName : db.getFunctions(dbName, "*")) {
+        Function function = db.getFunction(dbName, functionName);
+        try {
+          FunctionRegistry.registerPermanentFunction(functionName, function.getClassName(), false,
+              FunctionTask.toFunctionResource(function.getResourceUris()));
+        } catch (Exception e) {
+          LOG.warn("Failed to register persistent function " +
+              functionName + ":" + function.getClassName() + ". Ignore and continue.");
+        }
+      }
+    }
+  }
+
   public static Hive get(Configuration c, Class<?> clazz) throws HiveException {
     return get(c instanceof HiveConf ? (HiveConf)c : new HiveConf(c, clazz));
   }
@@ -1590,10 +1613,18 @@ private void constructOneLBLocationMap(F
       throws HiveException {
     List<Path> newFiles = new ArrayList<Path>();
     Table tbl = getTable(tableName);
+    HiveConf sessionConf = SessionState.getSessionConf();
     if (replace) {
-      tbl.replaceFiles(loadPath, isSrcLocal);
+      Path tableDest = tbl.getPath();
+      replaceFiles(tableDest, loadPath, tableDest, tableDest, sessionConf, isSrcLocal);
     } else {
-      tbl.copyFiles(loadPath, isSrcLocal, isAcid, newFiles);
+      FileSystem fs;
+      try {
+        fs = tbl.getDataLocation().getFileSystem(sessionConf);
+        copyFiles(sessionConf, loadPath, tbl.getPath(), fs, isSrcLocal, isAcid, newFiles);
+      } catch (IOException e) {
+        throw new HiveException("addFiles: filesystem error in check phase", e);
+      }
       tbl.getParameters().put(StatsSetupConst.STATS_GENERATED_VIA_STATS_TASK, "true");
     }
 

Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/metadata/Partition.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/metadata/Partition.java?rev=1661271&r1=1661270&r2=1661271&view=diff
==============================================================================
--- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/metadata/Partition.java (original)
+++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/metadata/Partition.java Sat Feb 21 02:36:54 2015
@@ -43,6 +43,7 @@ import org.apache.hadoop.hive.metastore.
 import org.apache.hadoop.hive.ql.exec.Utilities;
 import org.apache.hadoop.hive.ql.io.HiveFileFormatUtils;
 import org.apache.hadoop.hive.ql.io.HiveOutputFormat;
+import org.apache.hadoop.hive.ql.session.SessionState;
 import org.apache.hadoop.hive.serde2.Deserializer;
 import org.apache.hadoop.mapred.InputFormat;
 import org.apache.hadoop.mapred.OutputFormat;
@@ -246,10 +247,8 @@ public class Partition implements Serial
   final public Deserializer getDeserializer() {
     if (deserializer == null) {
       try {
-        deserializer = MetaStoreUtils.getDeserializer(Hive.get().getConf(),
+        deserializer = MetaStoreUtils.getDeserializer(SessionState.getSessionConf(),
             tPartition, table.getTTable());
-      } catch (HiveException e) {
-        throw new RuntimeException(e);
       } catch (MetaException e) {
         throw new RuntimeException(e);
       }
@@ -367,7 +366,7 @@ public class Partition implements Serial
     try {
       // Previously, this got the filesystem of the Table, which could be
       // different from the filesystem of the partition.
-      FileSystem fs = getDataLocation().getFileSystem(Hive.get().getConf());
+      FileSystem fs = getDataLocation().getFileSystem(SessionState.getSessionConf());
       String pathPattern = getDataLocation().toString();
       if (getBucketCount() > 0) {
         pathPattern = pathPattern + "/*";
@@ -495,11 +494,11 @@ public class Partition implements Serial
   public List<FieldSchema> getCols() {
 
     try {
-      if (Table.hasMetastoreBasedSchema(Hive.get().getConf(), tPartition.getSd())) {
+      if (Table.hasMetastoreBasedSchema(SessionState.getSessionConf(), tPartition.getSd())) {
         return tPartition.getSd().getCols();
       }
-      return Hive.getFieldsFromDeserializer(table.getTableName(), getDeserializer());
-    } catch (HiveException e) {
+      return MetaStoreUtils.getFieldsFromDeserializer(table.getTableName(), getDeserializer());
+    } catch (Exception e) {
       LOG.error("Unable to get cols from serde: " +
           tPartition.getSd().getSerdeInfo().getSerializationLib(), e);
     }

Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/metadata/Table.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/metadata/Table.java?rev=1661271&r1=1661270&r2=1661271&view=diff
==============================================================================
--- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/metadata/Table.java (original)
+++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/metadata/Table.java Sat Feb 21 02:36:54 2015
@@ -18,7 +18,6 @@
 
 package org.apache.hadoop.hive.ql.metadata;
 
-import java.io.IOException;
 import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -41,7 +40,6 @@ import org.apache.hadoop.hive.metastore.
 import org.apache.hadoop.hive.metastore.ProtectMode;
 import org.apache.hadoop.hive.metastore.TableType;
 import org.apache.hadoop.hive.metastore.api.FieldSchema;
-import org.apache.hadoop.hive.metastore.api.Index;
 import org.apache.hadoop.hive.metastore.api.MetaException;
 import org.apache.hadoop.hive.metastore.api.Order;
 import org.apache.hadoop.hive.metastore.api.SerDeInfo;
@@ -259,7 +257,7 @@ public class Table implements Serializab
   }
 
   final public Class<? extends Deserializer> getDeserializerClass() throws Exception {
-    return MetaStoreUtils.getDeserializerClass(Hive.get().getConf(), tTable);
+    return MetaStoreUtils.getDeserializerClass(SessionState.getSessionConf(), tTable);
   }
 
   final public Deserializer getDeserializer(boolean skipConfError) {
@@ -271,11 +269,9 @@ public class Table implements Serializab
 
   final public Deserializer getDeserializerFromMetaStore(boolean skipConfError) {
     try {
-      return MetaStoreUtils.getDeserializer(Hive.get().getConf(), tTable, skipConfError);
+      return MetaStoreUtils.getDeserializer(SessionState.getSessionConf(), tTable, skipConfError);
     } catch (MetaException e) {
       throw new RuntimeException(e);
-    } catch (HiveException e) {
-      throw new RuntimeException(e);
     }
   }
 
@@ -285,7 +281,7 @@ public class Table implements Serializab
     }
     try {
       storageHandler = HiveUtils.getStorageHandler(
-        Hive.get().getConf(),
+          SessionState.getSessionConf(),
         getProperty(
           org.apache.hadoop.hive.metastore.api.hive_metastoreConstants.META_TABLE_STORAGE));
     } catch (Exception e) {
@@ -589,12 +585,12 @@ public class Table implements Serializab
 
     String serializationLib = getSerializationLib();
     try {
-      if (hasMetastoreBasedSchema(Hive.get().getConf(), serializationLib)) {
+      if (hasMetastoreBasedSchema(SessionState.getSessionConf(), serializationLib)) {
         return tTable.getSd().getCols();
       } else {
-        return Hive.getFieldsFromDeserializer(getTableName(), getDeserializer());
+        return MetaStoreUtils.getFieldsFromDeserializer(getTableName(), getDeserializer());
       }
-    } catch (HiveException e) {
+    } catch (Exception e) {
       LOG.error("Unable to get field from serde: " + serializationLib, e);
     }
     return new ArrayList<FieldSchema>();
@@ -625,45 +621,6 @@ public class Table implements Serializab
     return tTable.getSd().getNumBuckets();
   }
 
-  /**
-   * Replaces the directory corresponding to the table by srcf. Works by
-   * deleting the table directory and renaming the source directory.
-   *
-   * @param srcf
-   *          Source directory
-   * @param isSrcLocal
-   *          If the source directory is LOCAL
-   */
-  protected void replaceFiles(Path srcf, boolean isSrcLocal)
-      throws HiveException {
-    Path tableDest = getPath();
-    Hive.replaceFiles(tableDest, srcf, tableDest, tableDest, Hive.get().getConf(),
-        isSrcLocal);
-  }
-
-  /**
-   * Inserts files specified into the partition. Works by moving files
-   *
-   * @param srcf
-   *          Files to be moved. Leaf directories or globbed file paths
-   * @param isSrcLocal
-   *          If the source directory is LOCAL
-   * @param isAcid
-   *          True if this is an ACID based insert, update, or delete
-   * @param newFiles optional list of paths.  If non-null, then all files copyied to the table
-   *                 will be added to this list.
-   */
-  protected void copyFiles(Path srcf, boolean isSrcLocal, boolean isAcid, List<Path> newFiles)
-      throws HiveException {
-    FileSystem fs;
-    try {
-      fs = getDataLocation().getFileSystem(Hive.get().getConf());
-      Hive.copyFiles(Hive.get().getConf(), srcf, getPath(), fs, isSrcLocal, isAcid, newFiles);
-    } catch (IOException e) {
-      throw new HiveException("addFiles: filesystem error in check phase", e);
-    }
-  }
-
   public void setInputFormatClass(String name) throws HiveException {
     if (name == null) {
       inputFormatClass = null;
@@ -934,22 +891,12 @@ public class Table implements Serializab
     return dbName + "@" + tabName;
   }
 
-  /**
-   * @return List containing Indexes names if there are indexes on this table
-   * @throws HiveException
-   **/
-  public List<Index> getAllIndexes(short max) throws HiveException {
-    Hive hive = Hive.get();
-    return hive.getIndexes(getTTable().getDbName(), getTTable().getTableName(), max);
-  }
-
   @SuppressWarnings("nls")
   public FileStatus[] getSortedPaths() {
     try {
       // Previously, this got the filesystem of the Table, which could be
       // different from the filesystem of the partition.
-      FileSystem fs = FileSystem.get(getPath().toUri(), Hive.get()
-          .getConf());
+      FileSystem fs = FileSystem.get(getPath().toUri(), SessionState.getSessionConf());
       String pathPattern = getPath().toString();
       if (getNumBuckets() > 0) {
         pathPattern = pathPattern + "/*";



Mime
View raw message