hive-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gunt...@apache.org
Subject svn commit: r1671671 - in /hive/branches/llap: ./ ql/src/java/org/apache/hadoop/hive/ql/exec/ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/
Date Mon, 06 Apr 2015 19:42:58 GMT
Author: gunther
Date: Mon Apr  6 19:42:58 2015
New Revision: 1671671

URL: http://svn.apache.org/r1671671
Log:
Merge latest trunk into branch. (Gunther Hagleitner)

Modified:
    hive/branches/llap/   (props changed)
    hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/CommonMergeJoinOperator.java
    hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java
    hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezJobMonitor.java
    hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java

Propchange: hive/branches/llap/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Mon Apr  6 19:42:58 2015
@@ -4,4 +4,4 @@
 /hive/branches/spark:1608589-1660298
 /hive/branches/tez:1494760-1622766
 /hive/branches/vectorization:1466908-1527856
-/hive/trunk:1624170-1671606
+/hive/trunk:1624170-1671667

Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/CommonMergeJoinOperator.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/CommonMergeJoinOperator.java?rev=1671671&r1=1671670&r2=1671671&view=diff
==============================================================================
--- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/CommonMergeJoinOperator.java
(original)
+++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/CommonMergeJoinOperator.java
Mon Apr  6 19:42:58 2015
@@ -35,6 +35,7 @@ import org.apache.hadoop.hive.ql.exec.te
 import org.apache.hadoop.hive.ql.exec.tez.TezContext;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.plan.CommonMergeJoinDesc;
+import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
 import org.apache.hadoop.hive.ql.plan.OperatorDesc;
 import org.apache.hadoop.hive.ql.plan.api.OperatorType;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils;
@@ -78,6 +79,8 @@ public class CommonMergeJoinOperator ext
   transient List<Object> otherKey = null;
   transient List<Object> values = null;
   transient RecordSource[] sources;
+  transient WritableComparator[][] keyComparators;
+
   transient List<Operator<? extends OperatorDesc>> originalParents =
       new ArrayList<Operator<? extends OperatorDesc>>();
 
@@ -105,6 +108,11 @@ public class CommonMergeJoinOperator ext
     nextKeyWritables = new ArrayList[maxAlias];
     fetchDone = new boolean[maxAlias];
     foundNextKeyGroup = new boolean[maxAlias];
+    keyComparators = new WritableComparator[maxAlias][];
+
+    for (Entry<Byte, List<ExprNodeDesc>> entry : conf.getKeys().entrySet()) {
+      keyComparators[entry.getKey().intValue()] = new WritableComparator[entry.getValue().size()];
+    }
 
     int bucketSize;
 
@@ -279,7 +287,7 @@ public class CommonMergeJoinOperator ext
         result[pos] = -1;
         continue;
       }
-      result[pos] = compareKeys(key, smallestOne);
+      result[pos] = compareKeys(pos, key, smallestOne);
       if (result[pos] < 0) {
         smallestOne = key;
       }
@@ -411,14 +419,16 @@ public class CommonMergeJoinOperator ext
     this.nextGroupStorage[t] = oldRowContainer;
   }
 
+  @SuppressWarnings("rawtypes")
   private boolean processKey(byte alias, List<Object> key) throws HiveException {
     List<Object> keyWritable = keyWritables[alias];
     if (keyWritable == null) {
       // the first group.
       keyWritables[alias] = key;
+      keyComparators[alias] = new WritableComparator[key.size()];
       return false;
     } else {
-      int cmp = compareKeys(key, keyWritable);
+      int cmp = compareKeys(alias, key, keyWritable);
       if (cmp != 0) {
         nextKeyWritables[alias] = key;
         return true;
@@ -428,30 +438,42 @@ public class CommonMergeJoinOperator ext
   }
 
   @SuppressWarnings("rawtypes")
-  private int compareKeys(List<Object> k1, List<Object> k2) {
-    int ret = 0;
+  private int compareKeys(byte alias, List<Object> k1, List<Object> k2) {
+    final WritableComparator[] comparators = keyComparators[alias];
 
     // join keys have difference sizes?
-    ret = k1.size() - k2.size();
-    if (ret != 0) {
-      return ret;
+    if (k1.size() != k2.size()) {
+      return k1.size() - k2.size();
+    }
+
+    if (comparators.length == 0) {
+      // cross-product - no keys really
+      return 0;
     }
 
-    for (int i = 0; i < k1.size(); i++) {
+    if (comparators.length > 1) {
+      // rare case
+      return compareKeysMany(comparators, k1, k2);
+    } else {
+      return compareKey(comparators, 0,
+          (WritableComparable) k1.get(0),
+          (WritableComparable) k2.get(0),
+          nullsafes != null ? nullsafes[0]: false);
+    }
+  }
+
+  @SuppressWarnings("rawtypes")
+  private int compareKeysMany(WritableComparator[] comparators,
+      final List<Object> k1,
+      final List<Object> k2) {
+    // invariant: k1.size == k2.size
+    int ret = 0;
+    final int size = k1.size();
+    for (int i = 0; i < size; i++) {
       WritableComparable key_1 = (WritableComparable) k1.get(i);
       WritableComparable key_2 = (WritableComparable) k2.get(i);
-      if (key_1 == null && key_2 == null) {
-        if (nullsafes != null && nullsafes[i]) {
-          continue;
-        } else {
-          return -1;
-        }
-      } else if (key_1 == null) {
-        return -1;
-      } else if (key_2 == null) {
-        return 1;
-      }
-      ret = WritableComparator.get(key_1.getClass()).compare(key_1, key_2);
+      ret = compareKey(comparators, i, key_1, key_2,
+          nullsafes != null ? nullsafes[i] : false);
       if (ret != 0) {
         return ret;
       }
@@ -459,6 +481,30 @@ public class CommonMergeJoinOperator ext
     return ret;
   }
 
+  @SuppressWarnings("rawtypes")
+  private int compareKey(final WritableComparator comparators[], final int pos,
+      final WritableComparable key_1,
+      final WritableComparable key_2,
+      final boolean nullsafe) {
+
+    if (key_1 == null && key_2 == null) {
+      if (nullsafe) {
+        return 0;
+      } else {
+        return -1;
+      }
+    } else if (key_1 == null) {
+      return -1;
+    } else if (key_2 == null) {
+      return 1;
+    }
+
+    if (comparators[pos] == null) {
+      comparators[pos] = WritableComparator.get(key_1.getClass());
+    }
+    return comparators[pos].compare(key_1, key_2);
+  }
+
   @SuppressWarnings("unchecked")
   private List<Object> mergeJoinComputeKeys(Object row, Byte alias) throws HiveException
{
     if ((joinKeysObjectInspectors != null) && (joinKeysObjectInspectors[alias] !=
null)) {

Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java?rev=1671671&r1=1671670&r2=1671671&view=diff
==============================================================================
--- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java (original)
+++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java Mon Apr  6 19:42:58
2015
@@ -187,6 +187,7 @@ public abstract class Task<T extends Ser
     this.childTasks = childTasks;
   }
 
+  @Override
   public List<? extends Node> getChildren() {
     return getChildTasks();
   }
@@ -521,7 +522,7 @@ public abstract class Task<T extends Ser
     return exception;
   }
 
-  void setException(Throwable ex) {
+  protected void setException(Throwable ex) {
     exception = ex;
   }
 
@@ -542,10 +543,12 @@ public abstract class Task<T extends Ser
     return getId() + ":" + getType();
   }
 
+  @Override
   public int hashCode() {
     return toString().hashCode();
   }
 
+  @Override
   public boolean equals(Object obj) {
     return toString().equals(String.valueOf(obj));
   }

Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezJobMonitor.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezJobMonitor.java?rev=1671671&r1=1671670&r2=1671671&view=diff
==============================================================================
--- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezJobMonitor.java (original)
+++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezJobMonitor.java Mon
Apr  6 19:42:58 2015
@@ -99,7 +99,7 @@ public class TezJobMonitor {
 
   // in-place progress update related variables
   private int lines;
-  private PrintStream out;
+  private final PrintStream out;
   private String separator;
 
   private transient LogHelper console;
@@ -117,6 +117,8 @@ public class TezJobMonitor {
   private static final List<DAGClient> shutdownList;
   private Map<String, BaseWork> workMap;
 
+  private StringBuffer diagnostics;
+
   static {
     shutdownList = Collections.synchronizedList(new LinkedList<DAGClient>());
     Runtime.getRuntime().addShutdownHook(new Thread() {
@@ -254,6 +256,7 @@ public class TezJobMonitor {
       DAG dag) throws InterruptedException {
     DAGStatus status = null;
     completed = new HashSet<String>();
+    diagnostics = new StringBuffer();
 
     boolean running = false;
     boolean done = false;
@@ -399,6 +402,7 @@ public class TezJobMonitor {
           if (rc != 0 && status != null) {
             for (String diag : status.getDiagnostics()) {
               console.printError(diag);
+              diagnostics.append(diag);
             }
           }
           shutdownList.remove(dagClient);
@@ -821,11 +825,11 @@ public class TezJobMonitor {
           perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.TEZ_RUN_VERTEX + s);
         }
         if(complete < total && (complete > 0 || running > 0 || failed >
0)) {
-          
+
           if (!perfLogger.startTimeHasMethod(PerfLogger.TEZ_RUN_VERTEX + s)) {
             perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.TEZ_RUN_VERTEX + s);
           }
-          
+
           /* vertex is started, but not complete */
           if (failed > 0) {
             reportBuffer.append(String.format("%s: %d(+%d,-%d)/%d\t", s, complete, running,
failed, total));
@@ -846,4 +850,8 @@ public class TezJobMonitor {
 
     return reportBuffer.toString();
   }
+
+  public String getDiagnostics() {
+    return diagnostics.toString();
+  }
 }

Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java?rev=1671671&r1=1671670&r2=1671671&view=diff
==============================================================================
--- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java (original)
+++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java Mon Apr
 6 19:42:58 2015
@@ -36,6 +36,7 @@ import org.apache.hadoop.hive.ql.exec.Op
 import org.apache.hadoop.hive.ql.exec.Task;
 import org.apache.hadoop.hive.ql.exec.Utilities;
 import org.apache.hadoop.hive.ql.log.PerfLogger;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.plan.BaseWork;
 import org.apache.hadoop.hive.ql.plan.MapWork;
 import org.apache.hadoop.hive.ql.plan.MergeJoinWork;
@@ -164,6 +165,9 @@ public class TezTask extends Task<TezWor
       // finally monitor will print progress until the job is done
       TezJobMonitor monitor = new TezJobMonitor(work.getWorkMap());
       rc = monitor.monitorExecution(client, ctx.getHiveTxnManager(), conf, dag);
+      if (rc != 0) {
+        this.setException(new HiveException(monitor.getDiagnostics()));
+      }
 
       // fetch the counters
       Set<StatusGetOpts> statusGetOpts = EnumSet.of(StatusGetOpts.GET_COUNTERS);



Mime
View raw message