hive-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sze...@apache.org
Subject svn commit: r1673583 [4/27] - in /hive/branches/spark: ./ beeline/src/java/org/apache/hive/beeline/ bin/ cli/src/java/org/apache/hadoop/hive/cli/ cli/src/test/org/apache/hadoop/hive/cli/ common/ common/src/java/org/apache/hadoop/hive/common/ common/src...
Date Tue, 14 Apr 2015 23:36:09 GMT
Modified: hive/branches/spark/metastore/src/java/org/apache/hadoop/hive/metastore/events/AddPartitionEvent.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/metastore/src/java/org/apache/hadoop/hive/metastore/events/AddPartitionEvent.java?rev=1673583&r1=1673582&r2=1673583&view=diff
==============================================================================
--- hive/branches/spark/metastore/src/java/org/apache/hadoop/hive/metastore/events/AddPartitionEvent.java (original)
+++ hive/branches/spark/metastore/src/java/org/apache/hadoop/hive/metastore/events/AddPartitionEvent.java Tue Apr 14 23:36:02 2015
@@ -61,18 +61,18 @@ public class AddPartitionEvent extends L
     return table;
   }
 
-  /**
-   * @return List of partitions.
-   */
-  public List<Partition> getPartitions() {
-    return partitions;
-  }
+
+  // Note : List<Partition> getPartitions() removed with HIVE-9609 because it will result in OOM errors with large add_partitions.
 
   /**
    * @return Iterator for partitions.
    */
   public Iterator<Partition> getPartitionIterator() {
-    return partitionSpecProxy == null ? null : partitionSpecProxy.getPartitionIterator();
+    if (partitions != null){
+      return partitions.iterator();
+    } else {
+      return partitionSpecProxy == null ? null : partitionSpecProxy.getPartitionIterator();
+    }
   }
 
 }

Modified: hive/branches/spark/packaging/src/main/assembly/bin.xml
URL: http://svn.apache.org/viewvc/hive/branches/spark/packaging/src/main/assembly/bin.xml?rev=1673583&r1=1673582&r2=1673583&view=diff
==============================================================================
--- hive/branches/spark/packaging/src/main/assembly/bin.xml (original)
+++ hive/branches/spark/packaging/src/main/assembly/bin.xml Tue Apr 14 23:36:02 2015
@@ -146,6 +146,7 @@
       <directory>${project.parent.basedir}/conf</directory>
       <includes>
         <include>*.template</include>
+        <include>ivysettings.xml</include>
       </includes>
       <outputDirectory>conf</outputDirectory>
     </fileSet>

Modified: hive/branches/spark/pom.xml
URL: http://svn.apache.org/viewvc/hive/branches/spark/pom.xml?rev=1673583&r1=1673582&r2=1673583&view=diff
==============================================================================
--- hive/branches/spark/pom.xml (original)
+++ hive/branches/spark/pom.xml Tue Apr 14 23:36:02 2015
@@ -124,8 +124,9 @@
     <hbase.hadoop1.version>0.98.9-hadoop1</hbase.hadoop1.version>
     <hbase.hadoop2.version>0.98.9-hadoop2</hbase.hadoop2.version>
     <!-- httpcomponents are not always in version sync -->
-    <httpcomponents.client.version>4.2.5</httpcomponents.client.version>
-    <httpcomponents.core.version>4.2.5</httpcomponents.core.version>
+    <httpcomponents.client.version>4.4</httpcomponents.client.version>
+    <httpcomponents.core.version>4.4</httpcomponents.core.version>
+    <ivy.version>2.4.0</ivy.version>
     <jackson.version>1.9.2</jackson.version>
     <javaewah.version>0.3.2</javaewah.version>
     <javolution.version>5.5.1</javolution.version>
@@ -147,7 +148,7 @@
     <mockito-all.version>1.9.5</mockito-all.version>
     <mina.version>2.0.0-M5</mina.version>
     <netty.version>4.0.23.Final</netty.version>
-    <parquet.version>1.6.0rc3</parquet.version>
+    <parquet.version>1.6.0rc6</parquet.version>
     <pig.version>0.12.0</pig.version>
     <protobuf.version>2.5.0</protobuf.version>
     <stax.version>1.0.1</stax.version>
@@ -1092,6 +1093,16 @@
             <groupId>org.apache.hadoop</groupId>
             <artifactId>hadoop-common</artifactId>
             <version>${hadoop-23.version}</version>
+            <exclusions>
+              <exclusion>
+                <groupId>org.apache.httpcomponents</groupId>
+                <artifactId>httpcore</artifactId>
+              </exclusion>
+              <exclusion>
+                <groupId>org.apache.httpcomponents</groupId>
+                <artifactId>httpclient</artifactId>
+              </exclusion>
+            </exclusions>
           </dependency>
           <dependency>
             <groupId>org.apache.hadoop</groupId>

Modified: hive/branches/spark/ql/pom.xml
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/pom.xml?rev=1673583&r1=1673582&r2=1673583&view=diff
==============================================================================
--- hive/branches/spark/ql/pom.xml (original)
+++ hive/branches/spark/ql/pom.xml Tue Apr 14 23:36:02 2015
@@ -163,6 +163,11 @@
       <version>${libfb303.version}</version>
     </dependency>
     <dependency>
+      <groupId>org.apache.ivy</groupId>
+      <artifactId>ivy</artifactId>
+      <version>${ivy.version}</version>
+    </dependency>
+    <dependency>
       <groupId>org.apache.thrift</groupId>
       <artifactId>libthrift</artifactId>
       <version>${libthrift.version}</version>

Modified: hive/branches/spark/ql/src/gen/vectorization/ExpressionTemplates/ColumnArithmeticColumn.txt
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/gen/vectorization/ExpressionTemplates/ColumnArithmeticColumn.txt?rev=1673583&r1=1673582&r2=1673583&view=diff
==============================================================================
--- hive/branches/spark/ql/src/gen/vectorization/ExpressionTemplates/ColumnArithmeticColumn.txt (original)
+++ hive/branches/spark/ql/src/gen/vectorization/ExpressionTemplates/ColumnArithmeticColumn.txt Tue Apr 14 23:36:02 2015
@@ -83,25 +83,27 @@ public class <ClassName> extends VectorE
     if (inputColVector1.isRepeating && inputColVector2.isRepeating) { 
       outputVector[0] = vector1[0] <OperatorSymbol> vector2[0];
     } else if (inputColVector1.isRepeating) {
+      final <OperandType1> vector1Value = vector1[0];
       if (batch.selectedInUse) {
         for(int j = 0; j != n; j++) {
           int i = sel[j];
-          outputVector[i] = vector1[0] <OperatorSymbol> vector2[i];
+          outputVector[i] = vector1Value <OperatorSymbol> vector2[i];
         }
       } else {
         for(int i = 0; i != n; i++) {
-          outputVector[i] = vector1[0] <OperatorSymbol> vector2[i];
+          outputVector[i] = vector1Value <OperatorSymbol> vector2[i];
         }
       }
     } else if (inputColVector2.isRepeating) {
+      final <OperandType2> vector2Value = vector2[0];
       if (batch.selectedInUse) {
         for(int j = 0; j != n; j++) {
           int i = sel[j];
-          outputVector[i] = vector1[i] <OperatorSymbol> vector2[0];
+          outputVector[i] = vector1[i] <OperatorSymbol> vector2Value;
         }
       } else {
         for(int i = 0; i != n; i++) {
-          outputVector[i] = vector1[i] <OperatorSymbol> vector2[0];
+          outputVector[i] = vector1[i] <OperatorSymbol> vector2Value;
         }
       }
     } else {

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/Context.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/Context.java?rev=1673583&r1=1673582&r2=1673583&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/Context.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/Context.java Tue Apr 14 23:36:02 2015
@@ -23,6 +23,7 @@ import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.net.URI;
 import java.text.SimpleDateFormat;
+import java.util.ArrayList;
 import java.util.Date;
 import java.util.HashMap;
 import java.util.List;
@@ -84,6 +85,7 @@ public class Context {
   private final Configuration conf;
   protected int pathid = 10000;
   protected boolean explain = false;
+  protected String cboInfo;
   protected boolean explainLogical = false;
   protected String cmd = "";
   // number of previous attempts
@@ -695,4 +697,13 @@ public class Context {
   public AcidUtils.Operation getAcidOperation() {
     return acidOperation;
   }
+
+  public String getCboInfo() {
+    return cboInfo;
+  }
+
+  public void setCboInfo(String cboInfo) {
+    this.cboInfo = cboInfo;
+  }
+
 }

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/Driver.java?rev=1673583&r1=1673582&r2=1673583&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/Driver.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/Driver.java Tue Apr 14 23:36:02 2015
@@ -63,7 +63,6 @@ import org.apache.hadoop.hive.ql.hooks.H
 import org.apache.hadoop.hive.ql.hooks.PostExecute;
 import org.apache.hadoop.hive.ql.hooks.PreExecute;
 import org.apache.hadoop.hive.ql.hooks.ReadEntity;
-import org.apache.hadoop.hive.ql.hooks.Redactor;
 import org.apache.hadoop.hive.ql.hooks.WriteEntity;
 import org.apache.hadoop.hive.ql.lockmgr.HiveLock;
 import org.apache.hadoop.hive.ql.lockmgr.HiveLockMode;
@@ -485,7 +484,6 @@ public class Driver implements CommandPr
               + explainOutput);
         }
       }
-
       return 0;
     } catch (Exception e) {
       ErrorMsg error = ErrorMsg.getErrorMsg(e.getMessage());
@@ -508,10 +506,19 @@ public class Driver implements CommandPr
       return error.getErrorCode();
     } finally {
       perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.COMPILE);
+      dumpMetaCallTimingWithoutEx("compilation");
       restoreSession(queryState);
     }
   }
 
+  private void dumpMetaCallTimingWithoutEx(String phase) {
+    try {
+      Hive.get().dumpAndClearMetaCallTiming(phase);
+    } catch (HiveException he) {
+      LOG.warn("Caught exception attempting to write metadata call information " + he, he);
+    }
+  }
+
   /**
    * Returns EXPLAIN EXTENDED output for a semantically
    * analyzed query.
@@ -1182,7 +1189,6 @@ public class Driver implements CommandPr
         return createProcessorResponse(ret);
       }
     }
-
     ret = execute();
     if (ret != 0) {
       //if needRequireLock is false, the release here will do nothing because there is no lock
@@ -1307,7 +1313,6 @@ public class Driver implements CommandPr
   public int execute() throws CommandNeedRetryException {
     PerfLogger perfLogger = PerfLogger.getPerfLogger();
     perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.DRIVER_EXECUTE);
-
     boolean noName = StringUtils.isEmpty(conf.getVar(HiveConf.ConfVars.HADOOPJOBNAME));
     int maxlen = conf.getIntVar(HiveConf.ConfVars.HIVEJOBNAMELENGTH);
 
@@ -1318,6 +1323,9 @@ public class Driver implements CommandPr
 
     try {
       LOG.info("Starting command: " + queryStr);
+      // compile and execute can get called from different threads in case of HS2
+      // so clear timing in this thread's Hive object before proceeding.
+      Hive.get().clearMetaCallTiming();
 
       plan.setStarted();
 
@@ -1548,6 +1556,7 @@ public class Driver implements CommandPr
       if (noName) {
         conf.setVar(HiveConf.ConfVars.HADOOPJOBNAME, "");
       }
+      dumpMetaCallTimingWithoutEx("execution");
       perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.DRIVER_EXECUTE);
 
       Map<String, MapRedStats> stats = SessionState.get().getMapRedStats();

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java?rev=1673583&r1=1673582&r2=1673583&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java Tue Apr 14 23:36:02 2015
@@ -429,6 +429,7 @@ public enum ErrorMsg {
       "Alter table partition type {0} does not support cascade", true),
 
   DROP_NATIVE_FUNCTION(10301, "Cannot drop native function"),
+  UPDATE_CANNOT_UPDATE_BUCKET_VALUE(10302, "Updating values of bucketing columns is not supported.  Column {0}.", true),
 
   //========================== 20000 range starts here ========================//
   SCRIPT_INIT_ERROR(20000, "Unable to initialize custom script."),
@@ -443,6 +444,9 @@ public enum ErrorMsg {
       "is controlled by hive.exec.max.dynamic.partitions and hive.exec.max.dynamic.partitions.pernode. "),
   PARTITION_SCAN_LIMIT_EXCEEDED(20005, "Number of partitions scanned (={0}) on table {1} exceeds limit" +
       " (={2}). This is controlled by hive.limit.query.max.table.partition.", true),
+  OP_NOT_ALLOWED_IN_AUTOCOMMIT(20006, "Operation {0} is not allowed when autoCommit=true.", true),//todo: better SQLState?
+  OP_NOT_ALLOWED_IN_TXN(20007, "Operation {0} is not allowed in a transaction.  TransactionID={1}.", true),
+  OP_NOT_ALLOWED_WITHOUT_TXN(2008, "Operation {0} is not allowed since autoCommit=false and there is no active transaction", true),
 
   //========================== 30000 range starts here ========================//
   STATSPUBLISHER_NOT_OBTAINED(30000, "StatsPublisher cannot be obtained. " +
@@ -508,7 +512,7 @@ public enum ErrorMsg {
   static {
     for (ErrorMsg errorMsg : values()) {
       if (errorMsg.format != null) {
-        String pattern = errorMsg.mesg.replaceAll("\\{.*\\}", ".*");
+        String pattern = errorMsg.mesg.replaceAll("\\{[0-9]+\\}", ".*");
         formatToErrorMsgMap.put(Pattern.compile("^" + pattern + "$"), errorMsg);
       } else {
         mesgToErrorMsgMap.put(errorMsg.getMsg().trim(), errorMsg);

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/CommonJoinOperator.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/CommonJoinOperator.java?rev=1673583&r1=1673582&r2=1673583&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/CommonJoinOperator.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/CommonJoinOperator.java Tue Apr 14 23:36:02 2015
@@ -691,11 +691,11 @@ public abstract class CommonJoinOperator
         Byte alias = order[i];
         AbstractRowContainer<List<Object>> alw = storage[alias];
 
-        if (alw.rowCount() != 1) {
+        if (!alw.isSingleRow()) {
           allOne = false;
         }
 
-        if (alw.rowCount() == 0) {
+        if (!alw.hasRows()) {
           alw.addRow(dummyObj[i]);
           hasNulls = true;
         } else if (condn[i].getPreserved()) {
@@ -721,16 +721,16 @@ public abstract class CommonJoinOperator
         AbstractRowContainer<List<Object>> alw = storage[alias];
 
         if (noOuterJoin) {
-          if (alw.rowCount() == 0) {
+          if (!alw.hasRows()) {
             return;
-          } else if (alw.rowCount() > 1) {
+          } else if (!alw.isSingleRow()) {
             mayHasMoreThanOne = true;
           }
         } else {
-          if (alw.rowCount() == 0) {
+          if (!alw.hasRows()) {
             hasEmpty = true;
             alw.addRow(dummyObj[i]);
-          } else if (!hasEmpty && alw.rowCount() == 1) {
+          } else if (!hasEmpty && alw.isSingleRow()) {
             if (hasAnyFiltered(alias, alw.rowIter().first())) {
               hasEmpty = true;
             }

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/CommonMergeJoinOperator.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/CommonMergeJoinOperator.java?rev=1673583&r1=1673582&r2=1673583&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/CommonMergeJoinOperator.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/CommonMergeJoinOperator.java Tue Apr 14 23:36:02 2015
@@ -35,6 +35,7 @@ import org.apache.hadoop.hive.ql.exec.te
 import org.apache.hadoop.hive.ql.exec.tez.TezContext;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.plan.CommonMergeJoinDesc;
+import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
 import org.apache.hadoop.hive.ql.plan.OperatorDesc;
 import org.apache.hadoop.hive.ql.plan.api.OperatorType;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils;
@@ -78,6 +79,8 @@ public class CommonMergeJoinOperator ext
   transient List<Object> otherKey = null;
   transient List<Object> values = null;
   transient RecordSource[] sources;
+  transient WritableComparator[][] keyComparators;
+
   transient List<Operator<? extends OperatorDesc>> originalParents =
       new ArrayList<Operator<? extends OperatorDesc>>();
 
@@ -105,6 +108,11 @@ public class CommonMergeJoinOperator ext
     nextKeyWritables = new ArrayList[maxAlias];
     fetchDone = new boolean[maxAlias];
     foundNextKeyGroup = new boolean[maxAlias];
+    keyComparators = new WritableComparator[maxAlias][];
+
+    for (Entry<Byte, List<ExprNodeDesc>> entry : conf.getKeys().entrySet()) {
+      keyComparators[entry.getKey().intValue()] = new WritableComparator[entry.getValue().size()];
+    }
 
     int bucketSize;
 
@@ -279,7 +287,7 @@ public class CommonMergeJoinOperator ext
         result[pos] = -1;
         continue;
       }
-      result[pos] = compareKeys(key, smallestOne);
+      result[pos] = compareKeys(pos, key, smallestOne);
       if (result[pos] < 0) {
         smallestOne = key;
       }
@@ -383,7 +391,7 @@ public class CommonMergeJoinOperator ext
         if (candidateStorage[pos] == null) {
           continue;
         }
-        if (this.candidateStorage[pos].rowCount() > 0) {
+        if (this.candidateStorage[pos].hasRows()) {
           dataInCache = true;
           break;
         }
@@ -411,14 +419,16 @@ public class CommonMergeJoinOperator ext
     this.nextGroupStorage[t] = oldRowContainer;
   }
 
+  @SuppressWarnings("rawtypes")
   private boolean processKey(byte alias, List<Object> key) throws HiveException {
     List<Object> keyWritable = keyWritables[alias];
     if (keyWritable == null) {
       // the first group.
       keyWritables[alias] = key;
+      keyComparators[alias] = new WritableComparator[key.size()];
       return false;
     } else {
-      int cmp = compareKeys(key, keyWritable);
+      int cmp = compareKeys(alias, key, keyWritable);
       if (cmp != 0) {
         nextKeyWritables[alias] = key;
         return true;
@@ -428,30 +438,42 @@ public class CommonMergeJoinOperator ext
   }
 
   @SuppressWarnings("rawtypes")
-  private int compareKeys(List<Object> k1, List<Object> k2) {
-    int ret = 0;
+  private int compareKeys(byte alias, List<Object> k1, List<Object> k2) {
+    final WritableComparator[] comparators = keyComparators[alias];
 
     // join keys have difference sizes?
-    ret = k1.size() - k2.size();
-    if (ret != 0) {
-      return ret;
+    if (k1.size() != k2.size()) {
+      return k1.size() - k2.size();
+    }
+
+    if (comparators.length == 0) {
+      // cross-product - no keys really
+      return 0;
     }
 
-    for (int i = 0; i < k1.size(); i++) {
+    if (comparators.length > 1) {
+      // rare case
+      return compareKeysMany(comparators, k1, k2);
+    } else {
+      return compareKey(comparators, 0,
+          (WritableComparable) k1.get(0),
+          (WritableComparable) k2.get(0),
+          nullsafes != null ? nullsafes[0]: false);
+    }
+  }
+
+  @SuppressWarnings("rawtypes")
+  private int compareKeysMany(WritableComparator[] comparators,
+      final List<Object> k1,
+      final List<Object> k2) {
+    // invariant: k1.size == k2.size
+    int ret = 0;
+    final int size = k1.size();
+    for (int i = 0; i < size; i++) {
       WritableComparable key_1 = (WritableComparable) k1.get(i);
       WritableComparable key_2 = (WritableComparable) k2.get(i);
-      if (key_1 == null && key_2 == null) {
-        if (nullsafes != null && nullsafes[i]) {
-          continue;
-        } else {
-          return -1;
-        }
-      } else if (key_1 == null) {
-        return -1;
-      } else if (key_2 == null) {
-        return 1;
-      }
-      ret = WritableComparator.get(key_1.getClass()).compare(key_1, key_2);
+      ret = compareKey(comparators, i, key_1, key_2,
+          nullsafes != null ? nullsafes[i] : false);
       if (ret != 0) {
         return ret;
       }
@@ -459,6 +481,30 @@ public class CommonMergeJoinOperator ext
     return ret;
   }
 
+  @SuppressWarnings("rawtypes")
+  private int compareKey(final WritableComparator comparators[], final int pos,
+      final WritableComparable key_1,
+      final WritableComparable key_2,
+      final boolean nullsafe) {
+
+    if (key_1 == null && key_2 == null) {
+      if (nullsafe) {
+        return 0;
+      } else {
+        return -1;
+      }
+    } else if (key_1 == null) {
+      return -1;
+    } else if (key_2 == null) {
+      return 1;
+    }
+
+    if (comparators[pos] == null) {
+      comparators[pos] = WritableComparator.get(key_1.getClass());
+    }
+    return comparators[pos].compare(key_1, key_2);
+  }
+
   @SuppressWarnings("unchecked")
   private List<Object> mergeJoinComputeKeys(Object row, Byte alias) throws HiveException {
     if ((joinKeysObjectInspectors != null) && (joinKeysObjectInspectors[alias] != null)) {
@@ -501,12 +547,13 @@ public class CommonMergeJoinOperator ext
     if (parent == null) {
       throw new HiveException("No valid parents.");
     }
-    Map<Integer, DummyStoreOperator> dummyOps = parent.getTagToOperatorTree();
+    Map<Integer, DummyStoreOperator> dummyOps =
+        ((TezContext) (MapredContext.get())).getDummyOpsMap();
     for (Entry<Integer, DummyStoreOperator> connectOp : dummyOps.entrySet()) {
       if (connectOp.getValue().getChildOperators() == null
-	  || connectOp.getValue().getChildOperators().isEmpty()) {
-	parentOperators.add(connectOp.getKey(), connectOp.getValue());
-	connectOp.getValue().getChildOperators().add(this);
+          || connectOp.getValue().getChildOperators().isEmpty()) {
+        parentOperators.add(connectOp.getKey(), connectOp.getValue());
+        connectOp.getValue().getChildOperators().add(this);
       }
     }
     super.initializeLocalWork(hconf);

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java?rev=1673583&r1=1673582&r2=1673583&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java Tue Apr 14 23:36:02 2015
@@ -3404,13 +3404,13 @@ public class DDLTask extends Task<DDLWor
 
   private int alterTableOrSinglePartition(AlterTableDesc alterTbl, Table tbl, Partition part)
       throws HiveException {
-    List<FieldSchema> oldCols = (part == null ? tbl.getCols() : part.getCols());
-    StorageDescriptor sd = (part == null ? tbl.getTTable().getSd() : part.getTPartition().getSd());
 
     if (alterTbl.getOp() == AlterTableDesc.AlterTableTypes.RENAME) {
       tbl.setDbName(Utilities.getDatabaseName(alterTbl.getNewName()));
       tbl.setTableName(Utilities.getTableName(alterTbl.getNewName()));
     } else if (alterTbl.getOp() == AlterTableDesc.AlterTableTypes.ADDCOLS) {
+      List<FieldSchema> oldCols = (part == null ? tbl.getCols() : part.getCols());
+      StorageDescriptor sd = (part == null ? tbl.getTTable().getSd() : part.getTPartition().getSd());
       List<FieldSchema> newCols = alterTbl.getNewCols();
       String serializationLib = sd.getSerdeInfo().getSerializationLib();
       if (serializationLib.equals(
@@ -3437,6 +3437,8 @@ public class DDLTask extends Task<DDLWor
         sd.setCols(oldCols);
       }
     } else if (alterTbl.getOp() == AlterTableDesc.AlterTableTypes.RENAMECOLUMN) {
+      List<FieldSchema> oldCols = (part == null ? tbl.getCols() : part.getCols());
+      StorageDescriptor sd = (part == null ? tbl.getTTable().getSd() : part.getTPartition().getSd());
       List<FieldSchema> newCols = new ArrayList<FieldSchema>();
       Iterator<FieldSchema> iterOldCols = oldCols.iterator();
       String oldName = alterTbl.getOldColName();
@@ -3499,6 +3501,7 @@ public class DDLTask extends Task<DDLWor
 
       sd.setCols(newCols);
     } else if (alterTbl.getOp() == AlterTableDesc.AlterTableTypes.REPLACECOLS) {
+      StorageDescriptor sd = (part == null ? tbl.getTTable().getSd() : part.getTPartition().getSd());
       // change SerDe to LazySimpleSerDe if it is columnsetSerDe
       String serializationLib = sd.getSerdeInfo().getSerializationLib();
       if (serializationLib.equals(
@@ -3523,8 +3526,10 @@ public class DDLTask extends Task<DDLWor
         tbl.getTTable().getParameters().remove(keyItr.next());
       }
     } else if (alterTbl.getOp() == AlterTableDesc.AlterTableTypes.ADDSERDEPROPS) {
+      StorageDescriptor sd = (part == null ? tbl.getTTable().getSd() : part.getTPartition().getSd());
       sd.getSerdeInfo().getParameters().putAll(alterTbl.getProps());
     } else if (alterTbl.getOp() == AlterTableDesc.AlterTableTypes.ADDSERDE) {
+      StorageDescriptor sd = (part == null ? tbl.getTTable().getSd() : part.getTPartition().getSd());
       String serdeName = alterTbl.getSerdeName();
       sd.getSerdeInfo().setSerializationLib(serdeName);
       if ((alterTbl.getProps() != null) && (alterTbl.getProps().size() > 0)) {
@@ -3539,6 +3544,7 @@ public class DDLTask extends Task<DDLWor
         }
       }
     } else if (alterTbl.getOp() == AlterTableDesc.AlterTableTypes.ADDFILEFORMAT) {
+      StorageDescriptor sd = (part == null ? tbl.getTTable().getSd() : part.getTPartition().getSd());
       sd.setInputFormat(alterTbl.getInputFormat());
       sd.setOutputFormat(alterTbl.getOutputFormat());
       if (alterTbl.getSerdeName() != null) {
@@ -3559,6 +3565,7 @@ public class DDLTask extends Task<DDLWor
         tbl.setProtectMode(mode);
       }
     } else if (alterTbl.getOp() == AlterTableDesc.AlterTableTypes.ADDCLUSTERSORTCOLUMN) {
+      StorageDescriptor sd = (part == null ? tbl.getTTable().getSd() : part.getTPartition().getSd());
       // validate sort columns and bucket columns
       List<String> columns = Utilities.getColumnNamesFromFieldSchema(tbl
           .getCols());
@@ -3583,6 +3590,7 @@ public class DDLTask extends Task<DDLWor
         sd.setSortCols(alterTbl.getSortColumns());
       }
     } else if (alterTbl.getOp() == AlterTableDesc.AlterTableTypes.ALTERLOCATION) {
+      StorageDescriptor sd = (part == null ? tbl.getTTable().getSd() : part.getTPartition().getSd());
       String newLocation = alterTbl.getNewLocation();
       try {
         URI locUri = new URI(newLocation);

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/ExplainTask.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/ExplainTask.java?rev=1673583&r1=1673582&r2=1673583&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/ExplainTask.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/ExplainTask.java Tue Apr 14 23:36:02 2015
@@ -40,6 +40,8 @@ import java.util.Set;
 import java.util.TreeMap;
 
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.common.jsonexplain.JsonParser;
+import org.apache.hadoop.hive.common.jsonexplain.JsonParserFactory;
 import org.apache.hadoop.hive.metastore.api.FieldSchema;
 import org.apache.hadoop.hive.ql.Driver;
 import org.apache.hadoop.hive.ql.DriverContext;
@@ -47,7 +49,9 @@ import org.apache.hadoop.hive.ql.hooks.R
 import org.apache.hadoop.hive.ql.metadata.Table;
 import org.apache.hadoop.hive.ql.optimizer.physical.StageIDsRearranger;
 import org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
 import org.apache.hadoop.hive.ql.plan.Explain;
+import org.apache.hadoop.hive.ql.plan.Explain.Level;
 import org.apache.hadoop.hive.ql.plan.ExplainWork;
 import org.apache.hadoop.hive.ql.plan.HiveOperation;
 import org.apache.hadoop.hive.ql.plan.OperatorDesc;
@@ -284,10 +288,30 @@ public class ExplainTask extends Task<Ex
         JSONObject jsonDependencies = getJSONDependencies(work);
         out.print(jsonDependencies);
       } else {
-	JSONObject jsonPlan = getJSONPlan(out, work);
-	if (work.isFormatted()) {
-	  out.print(jsonPlan);
-	}
+        if (work.getDependency()) {
+          JSONObject jsonDependencies = getJSONDependencies(work);
+          out.print(jsonDependencies);
+        } else {
+          if (work.isUserLevelExplain()) {
+            JsonParser jsonParser = JsonParserFactory.getParser(conf);
+            if (jsonParser != null) {
+              work.setFormatted(true);
+              JSONObject jsonPlan = getJSONPlan(out, work);
+              if (work.getCboInfo() != null) {
+                jsonPlan.put("cboInfo", work.getCboInfo());
+              }
+              jsonParser.print(jsonPlan, out);
+            } else {
+              throw new SemanticException(
+                  "Hive UserLevelExplain only supports tez engine right now.");
+            }
+          } else {
+            JSONObject jsonPlan = getJSONPlan(out, work);
+            if (work.isFormatted()) {
+              out.print(jsonPlan);
+            }
+          }
+        }
       }
 
       out.close();
@@ -561,7 +585,17 @@ public class ExplainTask extends Task<Ex
 
     if (note instanceof Explain) {
       Explain xpl_note = (Explain) note;
-      if (extended || xpl_note.normalExplain()) {
+      boolean invokeFlag = false;
+      if (this.work.isUserLevelExplain()) {
+        invokeFlag = Level.USER.in(xpl_note.explainLevels());
+      } else {
+        if (extended) {
+          invokeFlag = Level.EXTENDED.in(xpl_note.explainLevels());
+        } else {
+          invokeFlag = Level.DEFAULT.in(xpl_note.explainLevels());
+        }
+      }
+      if (invokeFlag) {
         keyJSONObject = xpl_note.displayName();
         if (out != null) {
           out.print(indentString(indent));
@@ -584,6 +618,12 @@ public class ExplainTask extends Task<Ex
         String appender = isLogical ? " (" + operator.getOperatorId() + ")" : "";
         JSONObject jsonOut = outputPlan(operator.getConf(), out, extended,
             jsonOutput, jsonOutput ? 0 : indent, appender);
+        if (this.work.isUserLevelExplain()) {
+          if (jsonOut != null && jsonOut.length() > 0) {
+            ((JSONObject) jsonOut.get(JSONObject.getNames(jsonOut)[0])).put("OperatorId:",
+                operator.getOperatorId());
+          }
+        }
         if (jsonOutput) {
             json = jsonOut;
         }
@@ -618,8 +658,17 @@ public class ExplainTask extends Task<Ex
 
       if (note instanceof Explain) {
         Explain xpl_note = (Explain) note;
-
-        if (extended || xpl_note.normalExplain()) {
+        boolean invokeFlag = false;
+        if (this.work.isUserLevelExplain()) {
+          invokeFlag = Level.USER.in(xpl_note.explainLevels());
+        } else {
+          if (extended) {
+            invokeFlag = Level.EXTENDED.in(xpl_note.explainLevels());
+          } else {
+            invokeFlag = Level.DEFAULT.in(xpl_note.explainLevels());
+          }
+        }
+        if (invokeFlag) {
 
           Object val = null;
           try {

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionRegistry.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionRegistry.java?rev=1673583&r1=1673582&r2=1673583&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionRegistry.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionRegistry.java Tue Apr 14 23:36:02 2015
@@ -273,6 +273,7 @@ public final class FunctionRegistry {
     system.registerGenericUDF("date_sub", GenericUDFDateSub.class);
     system.registerGenericUDF("datediff", GenericUDFDateDiff.class);
     system.registerGenericUDF("add_months", GenericUDFAddMonths.class);
+    system.registerGenericUDF("months_between", GenericUDFMonthsBetween.class);
 
     system.registerUDF("get_json_object", UDFJson.class, false);
 

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java?rev=1673583&r1=1673582&r2=1673583&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java Tue Apr 14 23:36:02 2015
@@ -498,26 +498,26 @@ public class MapJoinOperator extends Abs
   private void reloadHashTable(HashPartition partition,
                                HybridHashTableContainer hybridHtContainer)
       throws IOException, ClassNotFoundException, HiveException, SerDeException {
-    // Deserialize the on-disk hash table
-    // We're sure this part is smaller than memory limit
-    BytesBytesMultiHashMap restoredHashMap = partition.getHashMapFromDisk();
-    int rowCount = restoredHashMap.getNumValues();
-    LOG.info("Hybrid Grace Hash Join: Deserializing spilled hash partition...");
-    LOG.info("Hybrid Grace Hash Join: Number of rows restored from hashmap: " + rowCount);
 
     // Merge the sidefile into the newly created hash table
     // This is where the spilling may happen again
     KeyValueContainer kvContainer = partition.getSidefileKVContainer();
-    rowCount += kvContainer.size();
+    int rowCount = kvContainer.size();
     LOG.info("Hybrid Grace Hash Join: Number of rows restored from KeyValueContainer: " +
         kvContainer.size());
 
+    // Deserialize the on-disk hash table
+    // We're sure this part is smaller than memory limit
+    BytesBytesMultiHashMap restoredHashMap = partition.getHashMapFromDisk(rowCount);
+    rowCount += restoredHashMap.getNumValues();
+    LOG.info("Hybrid Grace Hash Join: Deserializing spilled hash partition...");
+    LOG.info("Hybrid Grace Hash Join: Number of rows in hashmap: " + rowCount);
+
     // If based on the new key count, keyCount is smaller than a threshold,
     // then just load the entire restored hashmap into memory.
     // The size of deserialized partition shouldn't exceed half of memory limit
     if (rowCount * hybridHtContainer.getTableRowSize() >= hybridHtContainer.getMemoryThreshold() / 2) {
-      throw new RuntimeException("Hybrid Grace Hash Join: Hash table cannot be reloaded since it" +
-          " will be greater than memory limit. Recursive spilling is currently not supported");
+      LOG.info("Hybrid Grace Hash Join: Hash table reload can fail since it will be greater than memory limit. Recursive spilling is currently not supported");
     }
 
     KeyValueHelper writeHelper = hybridHtContainer.getWriteHelper();

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java?rev=1673583&r1=1673582&r2=1673583&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java Tue Apr 14 23:36:02 2015
@@ -637,11 +637,6 @@ public class MapOperator extends Operato
     return null;
   }
 
-  @Override
-  public Map<Integer, DummyStoreOperator> getTagToOperatorTree() {
-    return MapRecordProcessor.getConnectOps();
-  }
-
   public void initializeContexts() {
     Path fpath = getExecContext().getCurrentInputPath();
     String nominalPath = getNominalPath(fpath);

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java?rev=1673583&r1=1673582&r2=1673583&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java Tue Apr 14 23:36:02 2015
@@ -1350,12 +1350,4 @@ public abstract class Operator<T extends
       return childOperators;
     }
   }
-
-  public Map<Integer, DummyStoreOperator> getTagToOperatorTree() {
-    if ((parentOperators == null) || (parentOperators.size() == 0)) {
-      return null;
-    }
-    Map<Integer, DummyStoreOperator> dummyOps = parentOperators.get(0).getTagToOperatorTree();
-    return dummyOps;
-  }
 }

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/SMBMapJoinOperator.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/SMBMapJoinOperator.java?rev=1673583&r1=1673582&r2=1673583&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/SMBMapJoinOperator.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/SMBMapJoinOperator.java Tue Apr 14 23:36:02 2015
@@ -342,7 +342,7 @@ public class SMBMapJoinOperator extends
       joinOneGroup();
       dataInCache = false;
       for (byte pos = 0; pos < order.length; pos++) {
-        if (this.candidateStorage[pos].rowCount() > 0) {
+        if (this.candidateStorage[pos].hasRows()) {
           dataInCache = true;
           break;
         }

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/SecureCmdDoAs.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/SecureCmdDoAs.java?rev=1673583&r1=1673582&r2=1673583&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/SecureCmdDoAs.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/SecureCmdDoAs.java Tue Apr 14 23:36:02 2015
@@ -45,14 +45,14 @@ public class SecureCmdDoAs {
     // metastore tokens into a file
     String uname = UserGroupInformation.getLoginUser().getShortUserName();
     FileSystem fs = FileSystem.get(conf);
-    Token<?> fsToken = fs.getDelegationToken(uname);
+    Credentials cred = new Credentials();
+    // Use method addDelegationTokens instead of getDelegationToken to get all the tokens including KMS.
+    fs.addDelegationTokens(uname, cred);
 
     tokenFile = File.createTempFile("hive_hadoop_delegation_token", null);
     tokenPath = new Path(tokenFile.toURI());
 
     //write credential with token to file
-    Credentials cred = new Credentials();
-    cred.addToken(fsToken.getService(), fsToken);
     cred.writeTokenStorageFile(tokenPath, conf);
   }
 

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java?rev=1673583&r1=1673582&r2=1673583&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java Tue Apr 14 23:36:02 2015
@@ -187,6 +187,7 @@ public abstract class Task<T extends Ser
     this.childTasks = childTasks;
   }
 
+  @Override
   public List<? extends Node> getChildren() {
     return getChildTasks();
   }
@@ -521,7 +522,7 @@ public abstract class Task<T extends Ser
     return exception;
   }
 
-  void setException(Throwable ex) {
+  protected void setException(Throwable ex) {
     exception = ex;
   }
 
@@ -542,10 +543,12 @@ public abstract class Task<T extends Ser
     return getId() + ":" + getType();
   }
 
+  @Override
   public int hashCode() {
     return toString().hashCode();
   }
 
+  @Override
   public boolean equals(Object obj) {
     return toString().equals(String.valueOf(obj));
   }

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java?rev=1673583&r1=1673582&r2=1673583&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java Tue Apr 14 23:36:02 2015
@@ -46,6 +46,7 @@ import java.sql.Connection;
 import java.sql.DriverManager;
 import java.sql.PreparedStatement;
 import java.sql.SQLException;
+import java.sql.SQLFeatureNotSupportedException;
 import java.sql.SQLTransientException;
 import java.sql.Timestamp;
 import java.text.SimpleDateFormat;
@@ -211,6 +212,8 @@ public final class Utilities {
   public static final String MAPRED_MAPPER_CLASS = "mapred.mapper.class";
   public static final String MAPRED_REDUCER_CLASS = "mapred.reducer.class";
   public static final String HIVE_ADDED_JARS = "hive.added.jars";
+  public static String MAPNAME = "Map ";
+  public static String REDUCENAME = "Reducer ";
 
   /**
    * ReduceField:
@@ -242,6 +245,7 @@ public final class Utilities {
 
   private static ThreadLocal<Map<Path, BaseWork>> gWorkMap =
       new ThreadLocal<Map<Path, BaseWork>>() {
+    @Override
     protected Map<Path, BaseWork> initialValue() {
       return new HashMap<Path, BaseWork>();
     }
@@ -307,12 +311,13 @@ public final class Utilities {
   public static Path setMergeWork(JobConf conf, MergeJoinWork mergeJoinWork, Path mrScratchDir,
       boolean useCache) {
     for (BaseWork baseWork : mergeJoinWork.getBaseWorkList()) {
-      setBaseWork(conf, baseWork, mrScratchDir, baseWork.getName() + MERGE_PLAN_NAME, useCache);
+      String prefix = baseWork.getName();
+      setBaseWork(conf, baseWork, mrScratchDir, prefix + MERGE_PLAN_NAME, useCache);
       String prefixes = conf.get(DagUtils.TEZ_MERGE_WORK_FILE_PREFIXES);
       if (prefixes == null) {
-        prefixes = baseWork.getName();
+        prefixes = prefix;
       } else {
-        prefixes = prefixes + "," + baseWork.getName();
+        prefixes = prefixes + "," + prefix;
       }
       conf.set(DagUtils.TEZ_MERGE_WORK_FILE_PREFIXES, prefixes);
     }
@@ -432,7 +437,13 @@ public final class Utilities {
                 + MAPRED_REDUCER_CLASS +" was "+ conf.get(MAPRED_REDUCER_CLASS)) ;
           }
         } else if (name.contains(MERGE_PLAN_NAME)) {
-          gWork = deserializePlan(in, MapWork.class, conf);
+          if (name.startsWith(MAPNAME)) {
+            gWork = deserializePlan(in, MapWork.class, conf);
+          } else if (name.startsWith(REDUCENAME)) {
+            gWork = deserializePlan(in, ReduceWork.class, conf);
+          } else {
+            throw new RuntimeException("Unknown work type: " + name);
+          }
         }
         gWorkMap.get().put(path, gWork);
       } else if (LOG.isDebugEnabled()) {
@@ -457,9 +468,9 @@ public final class Utilities {
     }
   }
 
-  public static Map<String, Map<Integer, String>> getMapWorkAllScratchColumnVectorTypeMaps(Configuration hiveConf) {
+  public static Map<Integer, String> getMapWorkVectorScratchColumnTypeMap(Configuration hiveConf) {
     MapWork mapWork = getMapWork(hiveConf);
-    return mapWork.getAllScratchColumnVectorTypeMaps();
+    return mapWork.getVectorScratchColumnTypeMap();
   }
 
   public static void setWorkflowAdjacencies(Configuration conf, QueryPlan plan) {
@@ -3104,6 +3115,24 @@ public final class Utilities {
     }
   }
 
+  public static void setQueryTimeout(java.sql.Statement stmt, int timeout) throws SQLException {
+    if (timeout < 0) {
+      LOG.info("Invalid query timeout " + timeout);
+      return;
+    }
+    try {
+      stmt.setQueryTimeout(timeout);
+    } catch (SQLException e) {
+      String message = e.getMessage() == null ? null : e.getMessage().toLowerCase();
+      if (e instanceof SQLFeatureNotSupportedException ||
+         (message != null && (message.contains("implemented") || message.contains("supported")))) {
+        LOG.info("setQueryTimeout is not supported");
+        return;
+      }
+      throw e;
+    }
+  }
+
   /**
    * Introducing a random factor to the wait time before another retry.
    * The wait time is dependent on # of failures and a random factor.

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/AbstractRowContainer.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/AbstractRowContainer.java?rev=1673583&r1=1673582&r2=1673583&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/AbstractRowContainer.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/AbstractRowContainer.java Tue Apr 14 23:36:02 2015
@@ -37,6 +37,17 @@ public interface AbstractRowContainer<RO
   public void addRow(ROW t) throws HiveException;
 
   /**
+   * @return whether the row container has at least 1 row.
+   * NOTE: Originally we named this isEmpty, but that name conflicted with another interface.
+   */
+  public boolean hasRows() throws HiveException;
+
+  /**
+   * @return whether the row container has 1 row.
+   */
+  public boolean isSingleRow() throws HiveException;
+
+  /**
    * @return number of elements in the RowContainer
    */
   public int rowCount() throws HiveException;

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/BytesBytesMultiHashMap.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/BytesBytesMultiHashMap.java?rev=1673583&r1=1673582&r2=1673583&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/BytesBytesMultiHashMap.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/BytesBytesMultiHashMap.java Tue Apr 14 23:36:02 2015
@@ -146,7 +146,7 @@ public final class BytesBytesMultiHashMa
   private long[] refs;
   private int startingHashBitCount, hashBitCount;
 
-  private int metricPutConflict = 0, metricExpands = 0, metricExpandsUs = 0;
+  private int metricPutConflict = 0, metricGetConflict = 0, metricExpands = 0, metricExpandsMs = 0;
 
   /** We have 39 bits to store list pointer from the first record; this is size limit */
   final static long MAX_WB_SIZE = ((long)1) << 38;
@@ -184,6 +184,240 @@ public final class BytesBytesMultiHashMa
     this(initialCapacity, loadFactor, wbSize, -1);
   }
 
+  public class ThreadSafeGetter {
+    private WriteBuffers.Position position = new WriteBuffers.Position();
+    public byte getValueResult(byte[] key, int offset, int length,
+            BytesBytesMultiHashMap.Result hashMapResult) {
+      return BytesBytesMultiHashMap.this.getValueResult(key, offset, length, hashMapResult, position);
+    }
+
+    public void populateValue(WriteBuffers.ByteSegmentRef valueRef) {
+      // Convenience method, populateValue is thread-safe.
+      BytesBytesMultiHashMap.this.populateValue(valueRef);
+    }
+  }
+
+  /**
+   * The result of looking up a key in the multi-hash map.
+   *
+   * This object can read through the 0, 1, or more values found for the key.
+   */
+  public static class Result {
+
+    // Whether there are more than 0 rows.
+    private boolean hasRows;
+
+    // We need a pointer to the hash map since this class must be static to support having
+    // multiple hash tables with Hybrid Grace partitioning.
+    private BytesBytesMultiHashMap hashMap;
+
+    // And, a mutable read position for thread safety when sharing a hash map.
+    private WriteBuffers.Position readPos;
+
+    // These values come from setValueResult when it finds a key.  These values allow this
+    // class to read (and re-read) the values.
+    private long firstOffset;
+    private boolean hasList;
+    private long offsetAfterListRecordKeyLen;
+
+    // When we have multiple values, we save the next value record's offset here.
+    private long nextTailOffset;
+
+    // 0-based index of which row we are on.
+    private long readIndex;
+
+    // A reference to the current row.
+    private WriteBuffers.ByteSegmentRef byteSegmentRef;
+
+    public Result() {
+      hasRows = false;
+      byteSegmentRef = new WriteBuffers.ByteSegmentRef();
+    }
+
+    /**
+     * @return Whether there are 1 or more values.
+     */
+    public boolean hasRows() {
+      // NOTE: Originally we named this isEmpty, but that name conflicted with another interface.
+      return hasRows;
+    }
+
+    /**
+     * @return Whether there is just 1 value row.
+     */
+    public boolean isSingleRow() {
+      return !hasList;
+    }
+
+    /**
+     * Set internal values for reading the values after finding a key.
+     *
+     * @param hashMap
+     *          The hash map we found the key in.
+     * @param firstOffset
+     *          The absolute offset of the first record in the write buffers.
+     * @param hasList
+     *          Whether there are multiple values (true) or just a single value (false).
+     * @param offsetAfterListRecordKeyLen
+     *          The offset of just after the key length in the list record.  Or, 0 when single row.
+     * @param readPos
+     *          Holds mutable read position for thread safety.
+     */
+    public void set(BytesBytesMultiHashMap hashMap, long firstOffset, boolean hasList,
+        long offsetAfterListRecordKeyLen, WriteBuffers.Position readPos) {
+
+      this.hashMap = hashMap;
+      this.readPos = readPos;
+
+      this.firstOffset = firstOffset;
+      this.hasList = hasList;
+      this.offsetAfterListRecordKeyLen = offsetAfterListRecordKeyLen;
+
+      // Position at first row.
+      readIndex = 0;
+      nextTailOffset = -1;
+
+      hasRows = true;
+    }
+
+    public WriteBuffers.ByteSegmentRef first() {
+      if (!hasRows) {
+        return null;
+      }
+
+      // Position at first row.
+      readIndex = 0;
+      nextTailOffset = -1;
+
+      return internalRead();
+    }
+
+    public WriteBuffers.ByteSegmentRef next() {
+      if (!hasRows) {
+        return null;
+      }
+
+      return internalRead();
+    }
+
+    /**
+     * Read the current value.  
+     * 
+     * @return
+     *           The ByteSegmentRef to the current value read.
+     */
+    private WriteBuffers.ByteSegmentRef internalRead() {
+
+      if (!hasList) {
+
+        /*
+         * Single value.
+         */
+
+        if (readIndex > 0) {
+          return null;
+        }
+
+        // For a non-list (i.e. single value), the offset is for the variable length long (VLong)
+        // holding the value length (followed by the key length).
+        hashMap.writeBuffers.setReadPoint(firstOffset, readPos);
+        int valueLength = (int) hashMap.writeBuffers.readVLong(readPos);
+
+        // The value is before the offset.  Make byte segment reference absolute.
+        byteSegmentRef.reset(firstOffset - valueLength, valueLength);
+        hashMap.writeBuffers.populateValue(byteSegmentRef);
+
+        readIndex++;
+        return byteSegmentRef;
+      }
+
+      /*
+       * Multiple values.
+       */
+
+      if (readIndex == 0) {
+        // For a list, the value and key lengths of 1st record were overwritten with the
+        // relative offset to a new list record.
+        long relativeOffset = hashMap.writeBuffers.readNByteLong(firstOffset, 5, readPos);
+
+        // At the beginning of the list record will be the value length.
+        hashMap.writeBuffers.setReadPoint(firstOffset + relativeOffset, readPos);
+        int valueLength = (int) hashMap.writeBuffers.readVLong(readPos);
+
+        // The value is before the list record offset.  Make byte segment reference absolute.
+        byteSegmentRef.reset(firstOffset - valueLength, valueLength);
+        hashMap.writeBuffers.populateValue(byteSegmentRef);
+
+        readIndex++;
+        return byteSegmentRef;
+      }
+
+      if (readIndex == 1) {
+        // We remembered the offset of just after the key length in the list record.
+        // Read the absolute offset to the 2nd value.
+        nextTailOffset = hashMap.writeBuffers.readNByteLong(offsetAfterListRecordKeyLen, 5, readPos);
+        if (nextTailOffset <= 0) {
+          throw new Error("Expecting a second value");
+        }
+      } else if (nextTailOffset <= 0) {
+        return null;
+      }
+
+      hashMap.writeBuffers.setReadPoint(nextTailOffset, readPos);
+
+      // Get the value length.
+      int valueLength = (int) hashMap.writeBuffers.readVLong(readPos);
+
+      // Now read the relative offset to next record. Next record is always before the
+      // previous record in the write buffers (see writeBuffers javadoc).
+      long delta = hashMap.writeBuffers.readVLong(readPos);
+      long newTailOffset = delta == 0 ? 0 : (nextTailOffset - delta);
+
+      // The value is before the value record offset.  Make byte segment reference absolute.
+      byteSegmentRef.reset(nextTailOffset - valueLength, valueLength);
+      hashMap.writeBuffers.populateValue(byteSegmentRef);
+
+      nextTailOffset = newTailOffset;
+      readIndex++;
+      return byteSegmentRef;
+    }
+
+    /**
+     * @return Whether we have read all the values or not.
+     */
+    public boolean isEof() {
+      // LOG.info("BytesBytesMultiHashMap isEof hasRows " + hasRows + " hasList " + hasList + " readIndex " + readIndex + " nextTailOffset " + nextTailOffset);
+      if (!hasRows) {
+        return true;
+      }
+
+      if (!hasList) {
+        return (readIndex > 0);
+      } else {
+        // Multiple values.
+        if (readIndex <= 1) {
+          // Careful: We have not read the list record and 2nd value yet, so nextTailOffset
+          // is not valid yet.
+          return false;
+        } else {
+          return (nextTailOffset <= 0);
+        }
+      }
+    }
+
+    /**
+     * Lets go of any references to a hash map.
+     */
+    public void forget() {
+      hashMap = null;
+      readPos = null;
+      byteSegmentRef.reset(0, 0);
+      hasRows = false;
+      readIndex = 0;
+      nextTailOffset = -1;
+    }
+  }
+
   /** The source of keys and values to put into hashtable; avoids byte copying. */
   public static interface KvSource {
     /** Write key into output. */
@@ -201,7 +435,7 @@ public final class BytesBytesMultiHashMa
   }
 
   /**
-   * Adds new value to new or existing key in hashmap.
+   * Adds new value to new or existing key in hashmap. Not thread-safe.
    * @param kv Keyvalue writer. Each method will be called at most once.
    */
   private static final byte[] FOUR_ZEROES = new byte[] { 0, 0, 0, 0 };
@@ -247,52 +481,45 @@ public final class BytesBytesMultiHashMa
     ++numValues;
   }
 
+  public ThreadSafeGetter createGetterForThread() {
+    return new ThreadSafeGetter();
+  }
+
+  /** Not thread-safe! Use createGetterForThread. */
+  public byte getValueResult(byte[] key, int offset, int length, Result hashMapResult) {
+    return getValueResult(key, offset, length, hashMapResult, writeBuffers.getReadPosition());
+  }
+
   /**
-   * Gets "lazy" values for a key (as a set of byte segments in underlying buffer).
+   * Finds a key.  Values can be read with the supplied result object.
+   *
    * @param key Key buffer.
-   * @param length Length of the key in buffer.
-   * @param result The list to use to store the results.
-   * @return the state byte for the key (see class description).
+   * @param offset the offset to the key in the buffer
+   * @param hashMapResult The object to fill in that can read the values.
+   * @param readPos Holds mutable read position for thread safety.
+   * @return The state byte.
    */
-  public byte getValueRefs(byte[] key, int length, List<WriteBuffers.ByteSegmentRef> result) {
+  private byte getValueResult(byte[] key, int offset, int length, Result hashMapResult,
+          WriteBuffers.Position readPos) {
+
+    hashMapResult.forget();
+
     // First, find first record for the key.
-    result.clear();
-    long ref = findKeyRefToRead(key, length);
+    long ref = findKeyRefToRead(key, offset, length, readPos);
     if (ref == 0) {
       return 0;
     }
+
     boolean hasList = Ref.hasList(ref);
 
     // This relies on findKeyRefToRead doing key equality check and leaving read ptr where needed.
-    long lrPtrOffset = hasList ? writeBuffers.getReadPoint() : 0;
+    long offsetAfterListRecordKeyLen = hasList ? writeBuffers.getReadPoint(readPos) : 0;
 
-    writeBuffers.setReadPoint(getFirstRecordLengthsOffset(ref));
-    int valueLength = (int)writeBuffers.readVLong();
-    // LOG.info("Returning value at " + (Ref.getOffset(ref) - valueLength) +  " length " + valueLength);
-    result.add(new WriteBuffers.ByteSegmentRef(Ref.getOffset(ref) - valueLength, valueLength));
-    byte stateByte = Ref.getStateByte(ref);
-    if (!hasList) {
-      return stateByte;
-    }
-
-    // There're multiple records for the key; get the offset of the next one.
-    long nextTailOffset = writeBuffers.readFiveByteULong(lrPtrOffset);
-    // LOG.info("Next tail offset " + nextTailOffset);
-
-    while (nextTailOffset > 0) {
-      writeBuffers.setReadPoint(nextTailOffset);
-      valueLength = (int)writeBuffers.readVLong();
-      // LOG.info("Returning value at " + (nextTailOffset - valueLength) +  " length " + valueLength);
-      result.add(new WriteBuffers.ByteSegmentRef(nextTailOffset - valueLength, valueLength));
-      // Now read the relative offset to next record. Next record is always before the
-      // previous record in the write buffers (see writeBuffers javadoc).
-      long delta = writeBuffers.readVLong();
-      nextTailOffset = delta == 0 ? 0 : (nextTailOffset - delta);
-      // LOG.info("Delta " + delta +  ", next tail offset " + nextTailOffset);
-    }
-    return stateByte;
-  }
+    hashMapResult.set(this, Ref.getOffset(ref), hasList, offsetAfterListRecordKeyLen,
+            readPos);
 
+    return Ref.getStateByte(ref);
+  }
 
   /**
    * Take the segment reference from {@link #getValueRefs(byte[], int, List)}
@@ -341,6 +568,17 @@ public final class BytesBytesMultiHashMa
     this.keysAssigned = 0;
   }
 
+  public void expandAndRehashToTarget(int estimateNewRowCount) {
+    int oldRefsCount = refs.length;
+    int newRefsCount = oldRefsCount + estimateNewRowCount;
+    if (resizeThreshold <= newRefsCount) {
+      newRefsCount =
+          (Long.bitCount(newRefsCount) == 1) ? estimateNewRowCount : nextHighestPowerOfTwo(newRefsCount);
+      expandAndRehashImpl(newRefsCount);
+      LOG.info("Expand and rehash to " + newRefsCount + " from " + oldRefsCount);
+    }
+  }
+
   private static void validateCapacity(long capacity) {
     if (Long.bitCount(capacity) != 1) {
       throw new AssertionError("Capacity must be a power of two");
@@ -388,9 +626,10 @@ public final class BytesBytesMultiHashMa
    * @param length Read key length.
    * @return The ref to use for reading.
    */
-  private long findKeyRefToRead(byte[] key, int length) {
+  private long findKeyRefToRead(byte[] key, int offset, int length,
+          WriteBuffers.Position readPos) {
     final int bucketMask = (refs.length - 1);
-    int hashCode = writeBuffers.hashCode(key, 0, length);
+    int hashCode = writeBuffers.hashCode(key, offset, length);
     int slot = hashCode & bucketMask;
     // LOG.info("Read hash code for " + Utils.toStringBinary(key, 0, length)
     //   + " is " + Integer.toBinaryString(hashCode) + " - " + slot);
@@ -402,9 +641,10 @@ public final class BytesBytesMultiHashMa
       if (ref == 0) {
         return 0;
       }
-      if (isSameKey(key, length, ref, hashCode)) {
+      if (isSameKey(key, offset, length, ref, hashCode, readPos)) {
         return ref;
       }
+      ++metricGetConflict;
       probeSlot += (++i);
       if (i > largestNumberOfSteps) {
         // We know we never went that far when we were inserting.
@@ -453,7 +693,7 @@ public final class BytesBytesMultiHashMa
     if (!compareHashBits(ref, hashCode)) {
       return false; // Hash bits in ref don't match.
     }
-    writeBuffers.setReadPoint(getFirstRecordLengthsOffset(ref));
+    writeBuffers.setReadPoint(getFirstRecordLengthsOffset(ref, null));
     int valueLength = (int)writeBuffers.readVLong(), keyLength = (int)writeBuffers.readVLong();
     if (keyLength != cmpLength) {
       return false;
@@ -471,15 +711,21 @@ public final class BytesBytesMultiHashMa
   /**
    * Same as {@link #isSameKey(long, int, long, int)} but for externally stored key.
    */
-  private boolean isSameKey(byte[] key, int length, long ref, int hashCode) {
+  private boolean isSameKey(byte[] key, int offset, int length, long ref, int hashCode,
+      WriteBuffers.Position readPos) {
     if (!compareHashBits(ref, hashCode)) {
       return false;  // Hash bits don't match.
     }
-    writeBuffers.setReadPoint(getFirstRecordLengthsOffset(ref));
-    int valueLength = (int)writeBuffers.readVLong(), keyLength = (int)writeBuffers.readVLong();
+    writeBuffers.setReadPoint(getFirstRecordLengthsOffset(ref, readPos), readPos);
+    int valueLength = (int)writeBuffers.readVLong(readPos),
+        keyLength = (int)writeBuffers.readVLong(readPos);
     long keyOffset = Ref.getOffset(ref) - (valueLength + keyLength);
     // See the comment in the other isSameKey
-    return writeBuffers.isEqual(key, length, keyOffset, keyLength);
+    if (offset == 0) {
+      return writeBuffers.isEqual(key, length, keyOffset, keyLength);
+    } else {
+      return writeBuffers.isEqual(key, offset, length, keyOffset, keyLength);
+    }
   }
 
   private boolean compareHashBits(long ref, int hashCode) {
@@ -491,19 +737,24 @@ public final class BytesBytesMultiHashMa
    * @param ref Reference.
    * @return The offset to value and key length vlongs of the first record referenced by ref.
    */
-  private long getFirstRecordLengthsOffset(long ref) {
+  private long getFirstRecordLengthsOffset(long ref, WriteBuffers.Position readPos) {
     long tailOffset = Ref.getOffset(ref);
     if (Ref.hasList(ref)) {
-      long relativeOffset = writeBuffers.readFiveByteULong(tailOffset);
+      long relativeOffset = (readPos == null) ? writeBuffers.readNByteLong(tailOffset, 5)
+          : writeBuffers.readNByteLong(tailOffset, 5, readPos);
       tailOffset += relativeOffset;
     }
     return tailOffset;
   }
 
   private void expandAndRehash() {
-    long expandTime = System.nanoTime();
-    final long[] oldRefs = refs;
     long capacity = refs.length << 1;
+    expandAndRehashImpl(capacity);
+  }
+  
+  private void expandAndRehashImpl(long capacity) {
+    long expandTime = System.currentTimeMillis();
+    final long[] oldRefs = refs;
     validateCapacity(capacity);
     long[] newRefs = new long[(int)capacity];
 
@@ -522,10 +773,10 @@ public final class BytesBytesMultiHashMa
       // TODO: we could actually store a bit flag in ref indicating whether this is a hash
       //       match or a probe, and in the former case use hash bits (for a first few resizes).
       // int hashCodeOrPart = oldSlot | Ref.getNthHashBit(oldRef, startingHashBitCount, newHashBitCount);
-      writeBuffers.setReadPoint(getFirstRecordLengthsOffset(oldRef));
+      writeBuffers.setReadPoint(getFirstRecordLengthsOffset(oldRef, null));
       // Read the value and key length for the first record.
-      int hashCode = writeBuffers.readInt(Ref.getOffset(oldRef)
-          - writeBuffers.readVLong() - writeBuffers.readVLong() - 4);
+      int hashCode = (int)writeBuffers.readNByteLong(Ref.getOffset(oldRef)
+          - writeBuffers.readVLong() - writeBuffers.readVLong() - 4, 4);
       int probeSteps = relocateKeyRef(newRefs, oldRef, hashCode);
       maxSteps = Math.max(probeSteps, maxSteps);
     }
@@ -533,9 +784,8 @@ public final class BytesBytesMultiHashMa
     this.largestNumberOfSteps = maxSteps;
     this.hashBitCount = newHashBitCount;
     this.resizeThreshold = (int)(capacity * loadFactor);
-    metricExpandsUs += (System.nanoTime() - expandTime);
+    metricExpandsMs += (System.currentTimeMillis() - expandTime);
     ++metricExpands;
-
   }
 
   /**
@@ -576,7 +826,7 @@ public final class BytesBytesMultiHashMa
    */
   private void addRecordToList(long lrPtrOffset, long tailOffset) {
     // Now, insert this record into the list.
-    long prevHeadOffset = writeBuffers.readFiveByteULong(lrPtrOffset);
+    long prevHeadOffset = writeBuffers.readNByteLong(lrPtrOffset, 5);
     // LOG.info("Reading offset " + prevHeadOffset + " at " + lrPtrOffset);
     assert prevHeadOffset < tailOffset; // We replace an earlier element, must have lower offset.
     writeBuffers.writeFiveByteULong(lrPtrOffset, tailOffset);
@@ -632,11 +882,10 @@ public final class BytesBytesMultiHashMa
     return tailOffset;
   }
 
-  /** Writes the debug dump of the table into logs. */
+  /** Writes the debug dump of the table into logs. Not thread-safe. */
   public void debugDumpTable() {
     StringBuilder dump = new StringBuilder(keysAssigned + " keys\n");
     TreeMap<Long, Integer> byteIntervals = new TreeMap<Long, Integer>();
-    List<WriteBuffers.ByteSegmentRef> results = new ArrayList<WriteBuffers.ByteSegmentRef>();
     int examined = 0;
     for (int slot = 0; slot < refs.length; ++slot) {
       long ref = refs[slot];
@@ -644,10 +893,11 @@ public final class BytesBytesMultiHashMa
         continue;
       }
       ++examined;
-      long recOffset = getFirstRecordLengthsOffset(ref);
+      long recOffset = getFirstRecordLengthsOffset(ref, null);
       long tailOffset = Ref.getOffset(ref);
       writeBuffers.setReadPoint(recOffset);
-      int valueLength = (int)writeBuffers.readVLong(), keyLength = (int)writeBuffers.readVLong();
+      int valueLength = (int)writeBuffers.readVLong(),
+          keyLength = (int)writeBuffers.readVLong();
       long ptrOffset = writeBuffers.getReadPoint();
       if (Ref.hasList(ref)) {
         byteIntervals.put(recOffset, (int)(ptrOffset + 5 - recOffset));
@@ -658,9 +908,17 @@ public final class BytesBytesMultiHashMa
       byteIntervals.put(keyOffset - 4, keyLength + 4);
       writeBuffers.populateValue(fakeRef);
       System.arraycopy(fakeRef.getBytes(), (int)fakeRef.getOffset(), key, 0, keyLength);
-      getValueRefs(key, key.length, results);
       dump.append(Utils.toStringBinary(key, 0, key.length)).append(" ref [").append(dumpRef(ref))
-        .append("]: ").append(results.size()).append(" rows\n");
+        .append("]: ");
+      Result hashMapResult = new Result();
+      getValueResult(key, 0, key.length, hashMapResult);
+      List<WriteBuffers.ByteSegmentRef> results = new ArrayList<WriteBuffers.ByteSegmentRef>();
+      WriteBuffers.ByteSegmentRef byteSegmentRef = hashMapResult.first();
+      while (byteSegmentRef != null) {
+        results.add(hashMapResult.byteSegmentRef);
+        byteSegmentRef = hashMapResult.next();
+      }
+      dump.append(results.size()).append(" rows\n");
       for (int i = 0; i < results.size(); ++i) {
         WriteBuffers.ByteSegmentRef segment = results.get(i);
         byteIntervals.put(segment.getOffset(),
@@ -753,7 +1011,8 @@ public final class BytesBytesMultiHashMa
   public void debugDumpMetrics() {
     LOG.info("Map metrics: keys allocated " + this.refs.length +", keys assigned " + keysAssigned
         + ", write conflict " + metricPutConflict  + ", write max dist " + largestNumberOfSteps
-        + ", expanded " + metricExpands + " times in " + metricExpandsUs + "us");
+        + ", read conflict " + metricGetConflict
+        + ", expanded " + metricExpands + " times in " + metricExpandsMs + "ms");
   }
 
   private void debugDumpKeyProbe(long keyOffset, int keyLength, int hashCode, int finalSlot) {

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/FlatRowContainer.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/FlatRowContainer.java?rev=1673583&r1=1673582&r2=1673583&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/FlatRowContainer.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/FlatRowContainer.java Tue Apr 14 23:36:02 2015
@@ -208,6 +208,16 @@ public class FlatRowContainer extends Ab
   }
 
   @Override
+  public boolean hasRows() throws HiveException {
+    return rowCount() > 0;
+  }
+
+  @Override
+  public boolean isSingleRow() throws HiveException {
+    return rowCount() == 1;
+  }
+
+  @Override
   public int rowCount() throws HiveException {
     return rowLength > 0 ? (array.length / rowLength) : -rowLength; // see rowLength javadoc
   }

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/HybridHashTableContainer.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/HybridHashTableContainer.java?rev=1673583&r1=1673582&r2=1673583&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/HybridHashTableContainer.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/HybridHashTableContainer.java Tue Apr 14 23:36:02 2015
@@ -20,6 +20,7 @@ package org.apache.hadoop.hive.ql.exec.p
 
 
 import com.esotericsoftware.kryo.Kryo;
+
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -123,15 +124,20 @@ public class HybridHashTableContainer im
     /* Restore the hashmap from disk by deserializing it.
      * Currently Kryo is used for this purpose.
      */
-    public BytesBytesMultiHashMap getHashMapFromDisk()
+    public BytesBytesMultiHashMap getHashMapFromDisk(int initialCapacity)
         throws IOException, ClassNotFoundException {
       if (hashMapSpilledOnCreation) {
-        return new BytesBytesMultiHashMap(threshold, loadFactor, wbSize, -1);
+        return new BytesBytesMultiHashMap(Math.max(threshold, initialCapacity) , loadFactor, wbSize, -1);
       } else {
         InputStream inputStream = Files.newInputStream(hashMapLocalPath);
         com.esotericsoftware.kryo.io.Input input = new com.esotericsoftware.kryo.io.Input(inputStream);
         Kryo kryo = Utilities.runtimeSerializationKryo.get();
         BytesBytesMultiHashMap restoredHashMap = kryo.readObject(input, BytesBytesMultiHashMap.class);
+
+        if (initialCapacity > 0) {
+          restoredHashMap.expandAndRehashToTarget(initialCapacity);
+        }
+
         input.close();
         inputStream.close();
         Files.delete(hashMapLocalPath);
@@ -163,7 +169,8 @@ public class HybridHashTableContainer im
 
   public HybridHashTableContainer(Configuration hconf, long keyCount, long memUsage, long tableSize)
       throws SerDeException {
-    this(HiveConf.getIntVar(hconf, HiveConf.ConfVars.HIVEHASHTABLETHRESHOLD),
+    this(HiveConf.getFloatVar(hconf, HiveConf.ConfVars.HIVEHASHTABLEKEYCOUNTADJUSTMENT),
+         HiveConf.getIntVar(hconf, HiveConf.ConfVars.HIVEHASHTABLETHRESHOLD),
          HiveConf.getFloatVar(hconf, HiveConf.ConfVars.HIVEHASHTABLELOADFACTOR),
          HiveConf.getIntVar(hconf, HiveConf.ConfVars.HIVEHASHTABLEWBSIZE),
          HiveConf.getLongVar(hconf, HiveConf.ConfVars.HIVECONVERTJOINNOCONDITIONALTASKTHRESHOLD),
@@ -171,22 +178,27 @@ public class HybridHashTableContainer im
          tableSize, keyCount, memUsage);
   }
 
-  private HybridHashTableContainer(int threshold, float loadFactor, int wbSize,
+  private HybridHashTableContainer(float keyCountAdj, int threshold, float loadFactor, int wbSize,
                                    long noConditionalTaskThreshold, int memCheckFreq, long tableSize,
                                    long keyCount, long memUsage) throws SerDeException {
+
+    int newKeyCount = HashMapWrapper.calculateTableSize(
+        keyCountAdj, threshold, loadFactor, keyCount);
+
     memoryThreshold = noConditionalTaskThreshold;
-    tableRowSize = tableSize / keyCount;
+    tableRowSize = tableSize / newKeyCount;
     memoryCheckFrequency = memCheckFreq;
 
     int numPartitions = calcNumPartitions(tableSize, wbSize); // estimate # of partitions to create
     hashPartitions = new HashPartition[numPartitions];
     int numPartitionsSpilledOnCreation = 0;
     long memoryAllocated = 0;
+    int initialCapacity = Math.max(newKeyCount / numPartitions, threshold / numPartitions);
     for (int i = 0; i < numPartitions; i++) {
       if (i == 0) { // We unconditionally create a hashmap for the first hash partition
-        hashPartitions[i] = new HashPartition(threshold, loadFactor, wbSize, memUsage, true);
+        hashPartitions[i] = new HashPartition(initialCapacity, loadFactor, wbSize, memUsage, true);
       } else {
-        hashPartitions[i] = new HashPartition(threshold, loadFactor, wbSize, memUsage,
+        hashPartitions[i] = new HashPartition(initialCapacity, loadFactor, wbSize, memUsage,
                                               memoryAllocated + wbSize < memoryThreshold);
       }
       if (isHashMapSpilledOnCreation(i)) {
@@ -555,7 +567,7 @@ public class HybridHashTableContainer im
 
     @Override
     public MapJoinRowContainer getCurrentRows() {
-      return currentValue.isEmpty() ? null : currentValue;
+      return !currentValue.hasRows() ? null : currentValue;
     }
 
     @Override
@@ -568,8 +580,8 @@ public class HybridHashTableContainer im
   private class ReusableRowContainer
     implements MapJoinRowContainer, AbstractRowContainer.RowIterator<List<Object>> {
     private byte aliasFilter;
-    private List<WriteBuffers.ByteSegmentRef> refs;
-    private int currentRow;
+    private BytesBytesMultiHashMap.Result hashMapResult;
+
     /**
      * Sometimes, when container is empty in multi-table mapjoin, we need to add a dummy row.
      * This container does not normally support adding rows; this is for the dummy row.
@@ -589,6 +601,7 @@ public class HybridHashTableContainer im
         valueStruct = null; // No rows?
       }
       uselessIndirection = new ByteArrayRef();
+      hashMapResult = new BytesBytesMultiHashMap.Result();
       clearRows();
     }
 
@@ -600,57 +613,58 @@ public class HybridHashTableContainer im
      *        the evaluation for this big table row will be postponed.
      */
     public JoinUtil.JoinResult setFromOutput(Output output) throws HiveException {
-      if (refs == null) {
-        refs = new ArrayList<WriteBuffers.ByteSegmentRef>(0);
-      }
-
       int keyHash = WriteBuffers.murmurHash(output.getData(), 0, output.getLength());
       partitionId = keyHash & (hashPartitions.length - 1);
 
       // If the target hash table is on disk, spill this row to disk as well to be processed later
       if (isOnDisk(partitionId)) {
         toSpillPartitionId = partitionId;
-        refs.clear();
+        hashMapResult.forget();
         return JoinUtil.JoinResult.SPILL;
       }
       else {
-        byte aliasFilter = hashPartitions[partitionId].hashMap.getValueRefs(
-            output.getData(), output.getLength(), refs);
-        this.aliasFilter = refs.isEmpty() ? (byte) 0xff : aliasFilter;
-        this.dummyRow = null;
-        if (refs.isEmpty()) {
-          return JoinUtil.JoinResult.NOMATCH;
-        }
-        else {
+        aliasFilter = hashPartitions[partitionId].hashMap.getValueResult(output.getData(), 0, output.getLength(), hashMapResult);
+        dummyRow = null;
+        if (hashMapResult.hasRows()) {
           return JoinUtil.JoinResult.MATCH;
+        } else {
+          aliasFilter = (byte) 0xff;
+          return JoinUtil.JoinResult.NOMATCH;
         }
       }
     }
 
-    public boolean isEmpty() {
-      return refs.isEmpty() && (dummyRow == null);
+    @Override
+    public boolean hasRows() {
+      return hashMapResult.hasRows() || (dummyRow != null);
+    }
+
+    @Override
+    public boolean isSingleRow() {
+      if (!hashMapResult.hasRows()) {
+        return (dummyRow != null);
+      }
+      return hashMapResult.isSingleRow();
     }
 
     // Implementation of row container
     @Override
-    public RowIterator<List<Object>> rowIter() throws HiveException {
-      currentRow = -1;
+    public AbstractRowContainer.RowIterator<List<Object>> rowIter() throws HiveException {
       return this;
     }
 
     @Override
     public int rowCount() throws HiveException {
-      return dummyRow != null ? 1 : refs.size();
+      // For performance reasons we do not want to chase the values to the end to determine
+      // the count.  Use hasRows and isSingleRow instead.
+      throw new UnsupportedOperationException("Getting the row count not supported");
     }
 
     @Override
     public void clearRows() {
       // Doesn't clear underlying hashtable
-      if (refs != null) {
-        refs.clear();
-      }
+      hashMapResult.forget();
       dummyRow = null;
-      currentRow = -1;
       aliasFilter = (byte) 0xff;
     }
 
@@ -667,36 +681,47 @@ public class HybridHashTableContainer im
     // Implementation of row iterator
     @Override
     public List<Object> first() throws HiveException {
-      currentRow = 0;
-      return next();
-    }
-
 
-    @Override
-    public List<Object> next() throws HiveException {
+      // A little strange that we forget the dummy row on read.
       if (dummyRow != null) {
         List<Object> result = dummyRow;
         dummyRow = null;
         return result;
       }
-      if (currentRow < 0 || refs.size() < currentRow) throw new HiveException("No rows");
-      if (refs.size() == currentRow) return null;
-      WriteBuffers.ByteSegmentRef ref = refs.get(currentRow++);
+
+      WriteBuffers.ByteSegmentRef byteSegmentRef = hashMapResult.first();
+      if (byteSegmentRef == null) {
+        return null;
+      } else {
+        return uppack(byteSegmentRef);
+      }
+
+    }
+
+    @Override
+    public List<Object> next() throws HiveException {
+
+      WriteBuffers.ByteSegmentRef byteSegmentRef = hashMapResult.next();
+      if (byteSegmentRef == null) {
+        return null;
+      } else {
+        return uppack(byteSegmentRef);
+      }
+
+    }
+
+    private List<Object> uppack(WriteBuffers.ByteSegmentRef ref) throws HiveException {
       if (ref.getLength() == 0) {
         return EMPTY_LIST; // shortcut, 0 length means no fields
       }
-      if (ref.getBytes() == null) {
-        // partitionId is derived from previously calculated value in setFromOutput()
-        hashPartitions[partitionId].hashMap.populateValue(ref);
-      }
       uselessIndirection.setData(ref.getBytes());
       valueStruct.init(uselessIndirection, (int)ref.getOffset(), ref.getLength());
-      return valueStruct.getFieldsAsList();
+      return valueStruct.getFieldsAsList(); // TODO: should we unset bytes after that?
     }
 
     @Override
     public void addRow(List<Object> t) {
-      if (dummyRow != null || !refs.isEmpty()) {
+      if (dummyRow != null || hashMapResult.hasRows()) {
         throw new RuntimeException("Cannot add rows when not empty");
       }
       dummyRow = t;

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinBytesTableContainer.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinBytesTableContainer.java?rev=1673583&r1=1673582&r2=1673583&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinBytesTableContainer.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinBytesTableContainer.java Tue Apr 14 23:36:02 2015
@@ -417,7 +417,7 @@ public class MapJoinBytesTableContainer
 
     @Override
     public MapJoinRowContainer getCurrentRows() {
-      return currentValue.isEmpty() ? null : currentValue;
+      return !currentValue.hasRows() ? null : currentValue;
     }
 
     @Override
@@ -430,8 +430,11 @@ public class MapJoinBytesTableContainer
   private class ReusableRowContainer
     implements MapJoinRowContainer, AbstractRowContainer.RowIterator<List<Object>> {
     private byte aliasFilter;
-    private List<WriteBuffers.ByteSegmentRef> refs;
-    private int currentRow;
+
+    /** Hash table wrapper specific to the container. */
+    private final BytesBytesMultiHashMap.ThreadSafeGetter threadSafeHashMapGetter;
+    private BytesBytesMultiHashMap.Result hashMapResult;
+
     /**
      * Sometimes, when container is empty in multi-table mapjoin, we need to add a dummy row.
      * This container does not normally support adding rows; this is for the dummy row.
@@ -449,48 +452,56 @@ public class MapJoinBytesTableContainer
         valueStruct = null; // No rows?
       }
       uselessIndirection = new ByteArrayRef();
+      threadSafeHashMapGetter = hashMap.createGetterForThread();
+      hashMapResult = new BytesBytesMultiHashMap.Result();
       clearRows();
     }
 
     public JoinUtil.JoinResult setFromOutput(Output output) {
-      if (refs == null) {
-        refs = new ArrayList<WriteBuffers.ByteSegmentRef>();
-      }
-      byte aliasFilter = hashMap.getValueRefs(output.getData(), output.getLength(), refs);
-      this.aliasFilter = refs.isEmpty() ? (byte) 0xff : aliasFilter;
-      this.dummyRow = null;
-      if (refs.isEmpty()) {
-        return JoinUtil.JoinResult.NOMATCH;
-      }
-      else {
+
+      aliasFilter = threadSafeHashMapGetter.getValueResult(
+              output.getData(), 0, output.getLength(), hashMapResult);
+      dummyRow = null;
+      if (hashMapResult.hasRows()) {
         return JoinUtil.JoinResult.MATCH;
+      } else {
+        aliasFilter = (byte) 0xff;
+        return JoinUtil.JoinResult.NOMATCH;
       }
+
+   }
+
+    @Override
+    public boolean hasRows() {
+      return hashMapResult.hasRows() || (dummyRow != null);
     }
 
-    public boolean isEmpty() {
-      return refs.isEmpty() && (dummyRow == null);
+    @Override
+    public boolean isSingleRow() {
+      if (!hashMapResult.hasRows()) {
+        return (dummyRow != null);
+      }
+      return hashMapResult.isSingleRow();
     }
 
     // Implementation of row container
     @Override
     public AbstractRowContainer.RowIterator<List<Object>> rowIter() throws HiveException {
-      currentRow = -1;
       return this;
     }
 
     @Override
     public int rowCount() throws HiveException {
-      return dummyRow != null ? 1 : refs.size();
+      // For performance reasons we do not want to chase the values to the end to determine
+      // the count.  Use hasRows and isSingleRow instead.
+      throw new UnsupportedOperationException("Getting the row count not supported");
     }
 
     @Override
     public void clearRows() {
       // Doesn't clear underlying hashtable
-      if (refs != null) {
-        refs.clear();
-      }
+      hashMapResult.forget();
       dummyRow = null;
-      currentRow = -1;
       aliasFilter = (byte) 0xff;
     }
 
@@ -507,30 +518,39 @@ public class MapJoinBytesTableContainer
     // Implementation of row iterator
     @Override
     public List<Object> first() throws HiveException {
-      currentRow = 0;
-      return nextInternal();
-    }
 
-    @Override
-    public List<Object> next() throws HiveException {
-      return nextInternal();
-    }
-
-    private List<Object> nextInternal() throws HiveException {
+      // A little strange that we forget the dummy row on read.
       if (dummyRow != null) {
         List<Object> result = dummyRow;
         dummyRow = null;
         return result;
       }
-      if (currentRow < 0 || refs.size() < currentRow) throw new HiveException("No rows");
-      if (refs.size() == currentRow) return null;
-      WriteBuffers.ByteSegmentRef ref = refs.get(currentRow++);
+
+      WriteBuffers.ByteSegmentRef byteSegmentRef = hashMapResult.first();
+      if (byteSegmentRef == null) {
+        return null;
+      } else {
+        return uppack(byteSegmentRef);
+      }
+
+    }
+
+    @Override
+    public List<Object> next() throws HiveException {
+
+      WriteBuffers.ByteSegmentRef byteSegmentRef = hashMapResult.next();
+      if (byteSegmentRef == null) {
+        return null;
+      } else {
+        return uppack(byteSegmentRef);
+      }
+
+    }
+
+    private List<Object> uppack(WriteBuffers.ByteSegmentRef ref) throws HiveException {
       if (ref.getLength() == 0) {
         return EMPTY_LIST; // shortcut, 0 length means no fields
       }
-      if (ref.getBytes() == null) {
-        hashMap.populateValue(ref);
-      }
       uselessIndirection.setData(ref.getBytes());
       valueStruct.init(uselessIndirection, (int)ref.getOffset(), ref.getLength());
       return valueStruct.getFieldsAsList(); // TODO: should we unset bytes after that?
@@ -538,7 +558,7 @@ public class MapJoinBytesTableContainer
 
     @Override
     public void addRow(List<Object> t) {
-      if (dummyRow != null || !refs.isEmpty()) {
+      if (dummyRow != null || hashMapResult.hasRows()) {
         throw new RuntimeException("Cannot add rows when not empty");
       }
       dummyRow = t;

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinEagerRowContainer.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinEagerRowContainer.java?rev=1673583&r1=1673582&r2=1673583&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinEagerRowContainer.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinEagerRowContainer.java Tue Apr 14 23:36:02 2015
@@ -81,6 +81,16 @@ public class MapJoinEagerRowContainer
     return null;
   }
 
+  @Override
+  public boolean hasRows() {
+    return list.size() > 0;
+  }
+
+  @Override
+  public boolean isSingleRow() {
+    return list.size() == 1;
+  }
+
   /**
    * Get the number of elements in the RowContainer.
    *

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/RowContainer.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/RowContainer.java?rev=1673583&r1=1673582&r2=1673583&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/RowContainer.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/RowContainer.java Tue Apr 14 23:36:02 2015
@@ -331,6 +331,17 @@ public class RowContainer<ROW extends Li
     }
   }
 
+
+  @Override
+  public boolean hasRows() {
+    return size > 0;
+  }
+
+  @Override
+  public boolean isSingleRow() {
+    return size == 1;
+  }
+
   /**
    * Get the number of elements in the RowContainer.
    *

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/UnwrapRowContainer.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/UnwrapRowContainer.java?rev=1673583&r1=1673582&r2=1673583&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/UnwrapRowContainer.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/UnwrapRowContainer.java Tue Apr 14 23:36:02 2015
@@ -95,6 +95,17 @@ public class UnwrapRowContainer
     internal.addRow(t);
   }
 
+
+  @Override
+  public boolean hasRows() throws HiveException {
+    return internal.hasRows();
+  }
+
+  @Override
+  public boolean isSingleRow() throws HiveException {
+    return internal.isSingleRow();
+  }
+
   @Override
   public int rowCount() throws HiveException {
     return internal.rowCount();



Mime
View raw message