hive-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From w...@apache.org
Subject [47/51] [partial] hive git commit: HIVE-14671 : merge master into hive-14535 (Wei Zheng)
Date Fri, 05 May 2017 17:32:34 GMT
http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/common/src/java/org/apache/hadoop/hive/common/JvmMetrics.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/common/JvmMetrics.java b/common/src/java/org/apache/hadoop/hive/common/JvmMetrics.java
new file mode 100644
index 0000000..64f2819
--- /dev/null
+++ b/common/src/java/org/apache/hadoop/hive/common/JvmMetrics.java
@@ -0,0 +1,187 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.common;
+
+import static org.apache.hadoop.hive.common.JvmMetricsInfo.*;
+
+import org.apache.hadoop.log.metrics.EventCounter;
+import org.apache.hadoop.metrics2.MetricsCollector;
+import org.apache.hadoop.metrics2.MetricsInfo;
+import org.apache.hadoop.metrics2.MetricsRecordBuilder;
+import org.apache.hadoop.metrics2.MetricsSource;
+import org.apache.hadoop.metrics2.MetricsSystem;
+import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
+import org.apache.hadoop.metrics2.lib.Interns;
+
+import java.lang.management.GarbageCollectorMXBean;
+import java.lang.management.ManagementFactory;
+import java.lang.management.MemoryMXBean;
+import java.lang.management.MemoryUsage;
+import java.lang.management.ThreadInfo;
+import java.lang.management.ThreadMXBean;
+import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
+
+import static org.apache.hadoop.metrics2.impl.MsInfo.ProcessName;
+import static org.apache.hadoop.metrics2.impl.MsInfo.SessionId;
+
+/**
+ * JVM and logging related metrics. Ported from Hadoop JvmMetrics.
+ * Mostly used by various servers as a part of the metrics they export.
+ */
+public class JvmMetrics implements MetricsSource {
+  enum Singleton {
+    INSTANCE;
+
+    JvmMetrics impl;
+
+    synchronized JvmMetrics init(String processName, String sessionId) {
+      if (impl == null) {
+        impl = create(processName, sessionId, DefaultMetricsSystem.instance());
+      }
+      return impl;
+    }
+  }
+
+  static final float M = 1024*1024;
+
+  final MemoryMXBean memoryMXBean = ManagementFactory.getMemoryMXBean();
+  final List<GarbageCollectorMXBean> gcBeans =
+      ManagementFactory.getGarbageCollectorMXBeans();
+  final ThreadMXBean threadMXBean = ManagementFactory.getThreadMXBean();
+  final String processName, sessionId;
+  private JvmPauseMonitor pauseMonitor = null;
+  final ConcurrentHashMap<String, MetricsInfo[]> gcInfoCache =
+      new ConcurrentHashMap<String, MetricsInfo[]>();
+
+  JvmMetrics(String processName, String sessionId) {
+    this.processName = processName;
+    this.sessionId = sessionId;
+  }
+
+  public void setPauseMonitor(final JvmPauseMonitor pauseMonitor) {
+    this.pauseMonitor = pauseMonitor;
+  }
+
+  public static JvmMetrics create(String processName, String sessionId, MetricsSystem ms) {
+    return ms.register(JvmMetrics.name(), JvmMetrics.description(),
+        new JvmMetrics(processName, sessionId));
+  }
+
+  public static JvmMetrics initSingleton(String processName, String sessionId) {
+    return Singleton.INSTANCE.init(processName, sessionId);
+  }
+
+  @Override
+  public void getMetrics(MetricsCollector collector, boolean all) {
+    MetricsRecordBuilder rb = collector.addRecord(JvmMetrics)
+        .setContext("jvm").tag(ProcessName, processName)
+        .tag(SessionId, sessionId);
+    getMemoryUsage(rb);
+    getGcUsage(rb);
+    getThreadUsage(rb);
+    getEventCounters(rb);
+  }
+
+  private void getMemoryUsage(MetricsRecordBuilder rb) {
+    MemoryUsage memNonHeap = memoryMXBean.getNonHeapMemoryUsage();
+    MemoryUsage memHeap = memoryMXBean.getHeapMemoryUsage();
+    Runtime runtime = Runtime.getRuntime();
+    rb.addGauge(MemNonHeapUsedM, memNonHeap.getUsed() / M)
+        .addGauge(MemNonHeapCommittedM, memNonHeap.getCommitted() / M)
+        .addGauge(MemNonHeapMaxM, memNonHeap.getMax() / M)
+        .addGauge(MemHeapUsedM, memHeap.getUsed() / M)
+        .addGauge(MemHeapCommittedM, memHeap.getCommitted() / M)
+        .addGauge(MemHeapMaxM, memHeap.getMax() / M)
+        .addGauge(MemMaxM, runtime.maxMemory() / M);
+  }
+
+  private void getGcUsage(MetricsRecordBuilder rb) {
+    long count = 0;
+    long timeMillis = 0;
+    for (GarbageCollectorMXBean gcBean : gcBeans) {
+      long c = gcBean.getCollectionCount();
+      long t = gcBean.getCollectionTime();
+      MetricsInfo[] gcInfo = getGcInfo(gcBean.getName());
+      rb.addCounter(gcInfo[0], c).addCounter(gcInfo[1], t);
+      count += c;
+      timeMillis += t;
+    }
+    rb.addCounter(GcCount, count)
+        .addCounter(GcTimeMillis, timeMillis);
+
+    if (pauseMonitor != null) {
+      rb.addCounter(GcNumWarnThresholdExceeded,
+          pauseMonitor.getNumGcWarnThreadholdExceeded());
+      rb.addCounter(GcNumInfoThresholdExceeded,
+          pauseMonitor.getNumGcInfoThresholdExceeded());
+      rb.addCounter(GcTotalExtraSleepTime,
+          pauseMonitor.getTotalGcExtraSleepTime());
+    }
+  }
+
+  private MetricsInfo[] getGcInfo(String gcName) {
+    MetricsInfo[] gcInfo = gcInfoCache.get(gcName);
+    if (gcInfo == null) {
+      gcInfo = new MetricsInfo[2];
+      gcInfo[0] = Interns.info("GcCount" + gcName, "GC Count for " + gcName);
+      gcInfo[1] = Interns
+          .info("GcTimeMillis" + gcName, "GC Time for " + gcName);
+      MetricsInfo[] previousGcInfo = gcInfoCache.putIfAbsent(gcName, gcInfo);
+      if (previousGcInfo != null) {
+        return previousGcInfo;
+      }
+    }
+    return gcInfo;
+  }
+
+  private void getThreadUsage(MetricsRecordBuilder rb) {
+    int threadsNew = 0;
+    int threadsRunnable = 0;
+    int threadsBlocked = 0;
+    int threadsWaiting = 0;
+    int threadsTimedWaiting = 0;
+    int threadsTerminated = 0;
+    long threadIds[] = threadMXBean.getAllThreadIds();
+    for (ThreadInfo threadInfo : threadMXBean.getThreadInfo(threadIds, 0)) {
+      if (threadInfo == null) continue; // race protection
+      switch (threadInfo.getThreadState()) {
+        case NEW:           threadsNew++;           break;
+        case RUNNABLE:      threadsRunnable++;      break;
+        case BLOCKED:       threadsBlocked++;       break;
+        case WAITING:       threadsWaiting++;       break;
+        case TIMED_WAITING: threadsTimedWaiting++;  break;
+        case TERMINATED:    threadsTerminated++;    break;
+      }
+    }
+    rb.addGauge(ThreadsNew, threadsNew)
+        .addGauge(ThreadsRunnable, threadsRunnable)
+        .addGauge(ThreadsBlocked, threadsBlocked)
+        .addGauge(ThreadsWaiting, threadsWaiting)
+        .addGauge(ThreadsTimedWaiting, threadsTimedWaiting)
+        .addGauge(ThreadsTerminated, threadsTerminated);
+  }
+
+  private void getEventCounters(MetricsRecordBuilder rb) {
+    rb.addCounter(LogFatal, EventCounter.getFatal())
+        .addCounter(LogError, EventCounter.getError())
+        .addCounter(LogWarn, EventCounter.getWarn())
+        .addCounter(LogInfo, EventCounter.getInfo());
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/common/src/java/org/apache/hadoop/hive/common/JvmMetricsInfo.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/common/JvmMetricsInfo.java b/common/src/java/org/apache/hadoop/hive/common/JvmMetricsInfo.java
new file mode 100644
index 0000000..3ab73c5
--- /dev/null
+++ b/common/src/java/org/apache/hadoop/hive/common/JvmMetricsInfo.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.common;
+
+import com.google.common.base.Objects;
+
+import org.apache.hadoop.metrics2.MetricsInfo;
+
+/**
+ * JVM and logging related metrics info instances. Ported from Hadoop JvmMetricsInfo.
+ */
+public enum JvmMetricsInfo implements MetricsInfo {
+  JvmMetrics("JVM related metrics etc."), // record info
+  // metrics
+  MemNonHeapUsedM("Non-heap memory used in MB"),
+  MemNonHeapCommittedM("Non-heap memory committed in MB"),
+  MemNonHeapMaxM("Non-heap memory max in MB"),
+  MemHeapUsedM("Heap memory used in MB"),
+  MemHeapCommittedM("Heap memory committed in MB"),
+  MemHeapMaxM("Heap memory max in MB"),
+  MemMaxM("Max memory size in MB"),
+  GcCount("Total GC count"),
+  GcTimeMillis("Total GC time in milliseconds"),
+  ThreadsNew("Number of new threads"),
+  ThreadsRunnable("Number of runnable threads"),
+  ThreadsBlocked("Number of blocked threads"),
+  ThreadsWaiting("Number of waiting threads"),
+  ThreadsTimedWaiting("Number of timed waiting threads"),
+  ThreadsTerminated("Number of terminated threads"),
+  LogFatal("Total number of fatal log events"),
+  LogError("Total number of error log events"),
+  LogWarn("Total number of warning log events"),
+  LogInfo("Total number of info log events"),
+  GcNumWarnThresholdExceeded("Number of times that the GC warn threshold is exceeded"),
+  GcNumInfoThresholdExceeded("Number of times that the GC info threshold is exceeded"),
+  GcTotalExtraSleepTime("Total GC extra sleep time in milliseconds");
+
+  private final String desc;
+
+  JvmMetricsInfo(String desc) { this.desc = desc; }
+
+  @Override public String description() { return desc; }
+
+  @Override public String toString() {
+    return Objects.toStringHelper(this)
+        .add("name", name()).add("description", desc)
+        .toString();
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/common/src/java/org/apache/hadoop/hive/common/LogUtils.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/common/LogUtils.java b/common/src/java/org/apache/hadoop/hive/common/LogUtils.java
index c2a0d9a..83f3af7 100644
--- a/common/src/java/org/apache/hadoop/hive/common/LogUtils.java
+++ b/common/src/java/org/apache/hadoop/hive/common/LogUtils.java
@@ -25,11 +25,12 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
 import org.apache.logging.log4j.LogManager;
-import org.apache.logging.log4j.core.LoggerContext;
 import org.apache.logging.log4j.core.config.Configurator;
 import org.apache.logging.log4j.core.impl.Log4jContextFactory;
+import org.apache.logging.log4j.spi.DefaultThreadContextMap;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import org.slf4j.MDC;
 
 import com.google.common.annotations.VisibleForTesting;
 
@@ -45,8 +46,15 @@ public class LogUtils {
   /**
    * Constants for log masking
    */
-  private static String KEY_TO_MASK_WITH = "password";
-  private static String MASKED_VALUE = "###_MASKED_###";
+  private static final String KEY_TO_MASK_WITH = "password";
+  private static final String MASKED_VALUE = "###_MASKED_###";
+
+  /**
+   * Constants of the key strings for the logging ThreadContext.
+   */
+  public static final String SESSIONID_LOG_KEY = "sessionId";
+  public static final String QUERYID_LOG_KEY = "queryId";
+  public static final String OPERATIONLOG_LEVEL_KEY = "operationLogLevel";
 
   @SuppressWarnings("serial")
   public static class LogInitializationException extends Exception {
@@ -110,6 +118,8 @@ public class LogUtils {
           System.setProperty(HiveConf.ConfVars.HIVEQUERYID.toString(), queryId);
         }
         final boolean async = checkAndSetAsyncLogging(conf);
+        // required for MDC based routing appender so that child threads can inherit the MDC context
+        System.setProperty(DefaultThreadContextMap.INHERITABLE_MAP, "true");
         Configurator.initialize(null, log4jFileName);
         logConfigLocation(conf);
         return "Logging initialized using configuration in " + log4jConfigFile + " Async: " + async;
@@ -152,6 +162,7 @@ public class LogUtils {
     }
     if (hive_l4j != null) {
       final boolean async = checkAndSetAsyncLogging(conf);
+      System.setProperty(DefaultThreadContextMap.INHERITABLE_MAP, "true");
       Configurator.initialize(null, hive_l4j.toString());
       logConfigLocation(conf);
       return (logMessage + "\n" + "Logging initialized using configuration in " + hive_l4j +
@@ -193,4 +204,22 @@ public class LogUtils {
     }
     return value;
   }
+
+  /**
+   * Register logging context so that log system can print QueryId, SessionId, etc for each message
+   */
+  public static void registerLoggingContext(Configuration conf) {
+    MDC.put(SESSIONID_LOG_KEY, HiveConf.getVar(conf, HiveConf.ConfVars.HIVESESSIONID));
+    MDC.put(QUERYID_LOG_KEY, HiveConf.getVar(conf, HiveConf.ConfVars.HIVEQUERYID));
+    if (HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_SERVER2_LOGGING_OPERATION_ENABLED)) {
+      MDC.put(OPERATIONLOG_LEVEL_KEY, HiveConf.getVar(conf, HiveConf.ConfVars.HIVE_SERVER2_LOGGING_OPERATION_LEVEL));
+    }
+  }
+
+  /**
+   * Unregister logging context
+   */
+  public static void unregisterLoggingContext() {
+    MDC.clear();
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/common/src/java/org/apache/hadoop/hive/common/MemoryEstimate.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/common/MemoryEstimate.java b/common/src/java/org/apache/hadoop/hive/common/MemoryEstimate.java
new file mode 100644
index 0000000..36ae56f
--- /dev/null
+++ b/common/src/java/org/apache/hadoop/hive/common/MemoryEstimate.java
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.common;
+
+/**
+ * Interface that can be used to provide size estimates based on data structures held in memory for an object instance.
+ */
+public interface MemoryEstimate {
+  /**
+   * Returns estimated memory size based {@link org.apache.hadoop.hive.ql.util.JavaDataModel}
+   *
+   * @return estimated memory size in bytes
+   */
+  long getEstimatedMemorySize();
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/common/src/java/org/apache/hadoop/hive/common/StatsSetupConst.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/common/StatsSetupConst.java b/common/src/java/org/apache/hadoop/hive/common/StatsSetupConst.java
index 926b4a6..a9e17c2 100644
--- a/common/src/java/org/apache/hadoop/hive/common/StatsSetupConst.java
+++ b/common/src/java/org/apache/hadoop/hive/common/StatsSetupConst.java
@@ -49,7 +49,7 @@ import com.fasterxml.jackson.databind.annotation.JsonSerialize;
 
 public class StatsSetupConst {
 
-  protected final static Logger LOG = LoggerFactory.getLogger(StatsSetupConst.class.getName());
+  protected static final Logger LOG = LoggerFactory.getLogger(StatsSetupConst.class.getName());
 
   public enum StatDB {
     fs {

http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/common/src/java/org/apache/hadoop/hive/common/StringInternUtils.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/common/StringInternUtils.java b/common/src/java/org/apache/hadoop/hive/common/StringInternUtils.java
index c729991..92d37e8 100644
--- a/common/src/java/org/apache/hadoop/hive/common/StringInternUtils.java
+++ b/common/src/java/org/apache/hadoop/hive/common/StringInternUtils.java
@@ -104,13 +104,21 @@ public class StringInternUtils {
    * This method interns all the strings in the given list in place. That is,
    * it iterates over the list, replaces each element with the interned copy
    * and eventually returns the same list.
+   *
+   * Note that the provided List implementation should return an iterator
+   * (via list.listIterator()) method, and that iterator should implement
+   * the set(Object) method. That's what all List implementations in the JDK
+   * provide. However, if some custom List implementation doesn't have this
+   * functionality, this method will return without interning its elements.
    */
   public static List<String> internStringsInList(List<String> list) {
     if (list != null) {
-      ListIterator<String> it = list.listIterator();
-      while (it.hasNext()) {
-        it.set(it.next().intern());
-      }
+      try {
+        ListIterator<String> it = list.listIterator();
+        while (it.hasNext()) {
+          it.set(it.next().intern());
+        }
+      } catch (UnsupportedOperationException e) { } // set() not implemented - ignore
     }
     return list;
   }

http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/common/src/java/org/apache/hadoop/hive/common/ValidCompactorTxnList.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/common/ValidCompactorTxnList.java b/common/src/java/org/apache/hadoop/hive/common/ValidCompactorTxnList.java
index 334b93e..8f55354 100644
--- a/common/src/java/org/apache/hadoop/hive/common/ValidCompactorTxnList.java
+++ b/common/src/java/org/apache/hadoop/hive/common/ValidCompactorTxnList.java
@@ -19,6 +19,7 @@
 package org.apache.hadoop.hive.common;
 
 import java.util.Arrays;
+import java.util.BitSet;
 
 /**
  * An implementation of {@link org.apache.hadoop.hive.common.ValidTxnList} for use by the compactor.
@@ -40,11 +41,12 @@ public class ValidCompactorTxnList extends ValidReadTxnList {
   }
   /**
    * @param abortedTxnList list of all aborted transactions
+   * @param abortedBits bitset marking whether the corresponding transaction is aborted
    * @param highWatermark highest committed transaction to be considered for compaction,
    *                      equivalently (lowest_open_txn - 1).
    */
-  public ValidCompactorTxnList(long[] abortedTxnList, long highWatermark) {
-    super(abortedTxnList, highWatermark);
+  public ValidCompactorTxnList(long[] abortedTxnList, BitSet abortedBits, long highWatermark) {
+    super(abortedTxnList, abortedBits, highWatermark); // abortedBits should be all true as everything in exceptions are aborted txns
     if(this.exceptions.length <= 0) {
       return;
     }
@@ -75,4 +77,9 @@ public class ValidCompactorTxnList extends ValidReadTxnList {
   public RangeResponse isTxnRangeValid(long minTxnId, long maxTxnId) {
     return highWatermark >= maxTxnId ? RangeResponse.ALL : RangeResponse.NONE;
   }
+
+  @Override
+  public boolean isTxnAborted(long txnid) {
+    return Arrays.binarySearch(exceptions, txnid) >= 0;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/common/src/java/org/apache/hadoop/hive/common/ValidReadTxnList.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/common/ValidReadTxnList.java b/common/src/java/org/apache/hadoop/hive/common/ValidReadTxnList.java
index 2f35917..4e57772 100644
--- a/common/src/java/org/apache/hadoop/hive/common/ValidReadTxnList.java
+++ b/common/src/java/org/apache/hadoop/hive/common/ValidReadTxnList.java
@@ -21,6 +21,7 @@ package org.apache.hadoop.hive.common;
 import com.google.common.annotations.VisibleForTesting;
 
 import java.util.Arrays;
+import java.util.BitSet;
 
 /**
  * An implementation of {@link org.apache.hadoop.hive.common.ValidTxnList} for use by readers.
@@ -30,32 +31,27 @@ import java.util.Arrays;
 public class ValidReadTxnList implements ValidTxnList {
 
   protected long[] exceptions;
+  protected BitSet abortedBits; // BitSet for flagging aborted transactions. Bit is true if aborted, false if open
   //default value means there are no open txn in the snapshot
   private long minOpenTxn = Long.MAX_VALUE;
   protected long highWatermark;
 
   public ValidReadTxnList() {
-    this(new long[0], Long.MAX_VALUE, Long.MAX_VALUE);
+    this(new long[0], new BitSet(), Long.MAX_VALUE, Long.MAX_VALUE);
   }
 
   /**
    * Used if there are no open transactions in the snapshot
    */
-  public ValidReadTxnList(long[] exceptions, long highWatermark) {
-    this(exceptions, highWatermark, Long.MAX_VALUE);
+  public ValidReadTxnList(long[] exceptions, BitSet abortedBits, long highWatermark) {
+    this(exceptions, abortedBits, highWatermark, Long.MAX_VALUE);
   }
-  public ValidReadTxnList(long[] exceptions, long highWatermark, long minOpenTxn) {
-    if (exceptions.length == 0) {
-      this.exceptions = exceptions;
-    } else {
-      this.exceptions = exceptions.clone();
-      Arrays.sort(this.exceptions);
+  public ValidReadTxnList(long[] exceptions, BitSet abortedBits, long highWatermark, long minOpenTxn) {
+    if (exceptions.length > 0) {
       this.minOpenTxn = minOpenTxn;
-      if(this.exceptions[0] <= 0) {
-        //should never happen of course
-        throw new IllegalArgumentException("Invalid txnid: " + this.exceptions[0] + " found");
-      }
     }
+    this.exceptions = exceptions;
+    this.abortedBits = abortedBits;
     this.highWatermark = highWatermark;
   }
 
@@ -118,12 +114,28 @@ public class ValidReadTxnList implements ValidTxnList {
     buf.append(':');
     buf.append(minOpenTxn);
     if (exceptions.length == 0) {
-      buf.append(':');
+      buf.append(':');  // separator for open txns
+      buf.append(':');  // separator for aborted txns
     } else {
-      for(long except: exceptions) {
-        buf.append(':');
-        buf.append(except);
+      StringBuilder open = new StringBuilder();
+      StringBuilder abort = new StringBuilder();
+      for (int i = 0; i < exceptions.length; i++) {
+        if (abortedBits.get(i)) {
+          if (abort.length() > 0) {
+            abort.append(',');
+          }
+          abort.append(exceptions[i]);
+        } else {
+          if (open.length() > 0) {
+            open.append(',');
+          }
+          open.append(exceptions[i]);
+        }
       }
+      buf.append(':');
+      buf.append(open);
+      buf.append(':');
+      buf.append(abort);
     }
     return buf.toString();
   }
@@ -133,13 +145,41 @@ public class ValidReadTxnList implements ValidTxnList {
     if (src == null || src.length() == 0) {
       highWatermark = Long.MAX_VALUE;
       exceptions = new long[0];
+      abortedBits = new BitSet();
     } else {
       String[] values = src.split(":");
       highWatermark = Long.parseLong(values[0]);
       minOpenTxn = Long.parseLong(values[1]);
-      exceptions = new long[values.length - 2];
-      for(int i = 2; i < values.length; ++i) {
-        exceptions[i-2] = Long.parseLong(values[i]);
+      String[] openTxns = new String[0];
+      String[] abortedTxns = new String[0];
+      if (values.length < 3) {
+        openTxns = new String[0];
+        abortedTxns = new String[0];
+      } else if (values.length == 3) {
+        if (!values[2].isEmpty()) {
+          openTxns = values[2].split(",");
+        }
+      } else {
+        if (!values[2].isEmpty()) {
+          openTxns = values[2].split(",");
+        }
+        if (!values[3].isEmpty()) {
+          abortedTxns = values[3].split(",");
+        }
+      }
+      exceptions = new long[openTxns.length + abortedTxns.length];
+      int i = 0;
+      for (String open : openTxns) {
+        exceptions[i++] = Long.parseLong(open);
+      }
+      for (String abort : abortedTxns) {
+        exceptions[i++] = Long.parseLong(abort);
+      }
+      Arrays.sort(exceptions);
+      abortedBits = new BitSet(exceptions.length);
+      for (String abort : abortedTxns) {
+        int index = Arrays.binarySearch(exceptions, Long.parseLong(abort));
+        abortedBits.set(index);
       }
     }
   }
@@ -157,5 +197,40 @@ public class ValidReadTxnList implements ValidTxnList {
   public long getMinOpenTxn() {
     return minOpenTxn;
   }
+
+  @Override
+  public boolean isTxnAborted(long txnid) {
+    int index = Arrays.binarySearch(exceptions, txnid);
+    return index >= 0 && abortedBits.get(index);
+  }
+
+  @Override
+  public RangeResponse isTxnRangeAborted(long minTxnId, long maxTxnId) {
+    // check the easy cases first
+    if (highWatermark < minTxnId) {
+      return RangeResponse.NONE;
+    }
+
+    int count = 0;  // number of aborted txns found in exceptions
+
+    // traverse the aborted txns list, starting at first aborted txn index
+    for (int i = abortedBits.nextSetBit(0); i >= 0; i = abortedBits.nextSetBit(i + 1)) {
+      long abortedTxnId = exceptions[i];
+      if (abortedTxnId > maxTxnId) {  // we've already gone beyond the specified range
+        break;
+      }
+      if (abortedTxnId >= minTxnId && abortedTxnId <= maxTxnId) {
+        count++;
+      }
+    }
+
+    if (count == 0) {
+      return RangeResponse.NONE;
+    } else if (count == (maxTxnId - minTxnId + 1)) {
+      return RangeResponse.ALL;
+    } else {
+      return RangeResponse.SOME;
+    }
+  }
 }
 

http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/common/src/java/org/apache/hadoop/hive/common/ValidTxnList.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/common/ValidTxnList.java b/common/src/java/org/apache/hadoop/hive/common/ValidTxnList.java
index 5e1e4ee..d4ac02c 100644
--- a/common/src/java/org/apache/hadoop/hive/common/ValidTxnList.java
+++ b/common/src/java/org/apache/hadoop/hive/common/ValidTxnList.java
@@ -71,7 +71,7 @@ public interface ValidTxnList {
 
   /**
    * Populate this validTxnList from the string.  It is assumed that the string
-   * was created via {@link #writeToString()}.
+   * was created via {@link #writeToString()} and the exceptions list is sorted.
    * @param src source string.
    */
   public void readFromString(String src);
@@ -89,4 +89,20 @@ public interface ValidTxnList {
    * @return a list of invalid transaction ids
    */
   public long[] getInvalidTransactions();
+
+  /**
+   * Indicates whether a given transaction is aborted.
+   * @param txnid id for the transaction
+   * @return true if aborted, false otherwise
+   */
+  public boolean isTxnAborted(long txnid);
+
+  /**
+   * Find out if a range of transaction ids are aborted.
+   * @param minTxnId minimum txnid to look for, inclusive
+   * @param maxTxnId maximum txnid to look for, inclusive
+   * @return Indicate whether none, some, or all of these transactions are aborted.
+   */
+  public RangeResponse isTxnRangeAborted(long minTxnId, long maxTxnId);
+
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/common/src/java/org/apache/hadoop/hive/common/jsonexplain/Connection.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/common/jsonexplain/Connection.java b/common/src/java/org/apache/hadoop/hive/common/jsonexplain/Connection.java
new file mode 100644
index 0000000..0df6f4c
--- /dev/null
+++ b/common/src/java/org/apache/hadoop/hive/common/jsonexplain/Connection.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.common.jsonexplain;
+
+public final class Connection implements Comparable<Connection>{
+  public final String type;
+  public final Vertex from;
+
+  public Connection(String type, Vertex from) {
+    super();
+    this.type = type;
+    this.from = from;
+  }
+
+  @Override
+  public int compareTo(Connection o) {
+    return from.compareTo(o.from);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/common/src/java/org/apache/hadoop/hive/common/jsonexplain/DagJsonParser.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/common/jsonexplain/DagJsonParser.java b/common/src/java/org/apache/hadoop/hive/common/jsonexplain/DagJsonParser.java
new file mode 100644
index 0000000..1f01685
--- /dev/null
+++ b/common/src/java/org/apache/hadoop/hive/common/jsonexplain/DagJsonParser.java
@@ -0,0 +1,167 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.common.jsonexplain;
+
+import java.io.PrintStream;
+import java.util.ArrayList;
+import java.util.LinkedHashMap;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+
+import org.apache.hadoop.hive.common.jsonexplain.JsonParser;
+import org.json.JSONObject;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public abstract class DagJsonParser implements JsonParser {
+  public final Map<String, Stage> stages = new LinkedHashMap<>();
+  protected final Logger LOG;
+  // the objects that have been printed.
+  public final Set<Object> printSet = new LinkedHashSet<>();
+  // the vertex that should be inlined. <Operator, list of Vertex that is
+  // inlined>
+  public final Map<Op, List<Connection>> inlineMap = new LinkedHashMap<>();
+
+  public DagJsonParser() {
+    super();
+    LOG = LoggerFactory.getLogger(this.getClass().getName());
+  }
+
+  public void extractStagesAndPlans(JSONObject inputObject) throws Exception {
+    // extract stages
+    JSONObject dependency = inputObject.getJSONObject("STAGE DEPENDENCIES");
+    if (dependency != null && dependency.length() > 0) {
+      // iterate for the first time to get all the names of stages.
+      for (String stageName : JSONObject.getNames(dependency)) {
+        this.stages.put(stageName, new Stage(stageName, this));
+      }
+      // iterate for the second time to get all the dependency.
+      for (String stageName : JSONObject.getNames(dependency)) {
+        JSONObject dependentStageNames = dependency.getJSONObject(stageName);
+        this.stages.get(stageName).addDependency(dependentStageNames, this.stages);
+      }
+    }
+    // extract stage plans
+    JSONObject stagePlans = inputObject.getJSONObject("STAGE PLANS");
+    if (stagePlans != null && stagePlans.length() > 0) {
+      for (String stageName : JSONObject.getNames(stagePlans)) {
+        JSONObject stagePlan = stagePlans.getJSONObject(stageName);
+        this.stages.get(stageName).extractVertex(stagePlan);
+      }
+    }
+  }
+
+  /**
+   * @param indentFlag
+   *          help to generate correct indent
+   * @return
+   */
+  public static String prefixString(int indentFlag) {
+    StringBuilder sb = new StringBuilder();
+    for (int index = 0; index < indentFlag; index++) {
+      sb.append("  ");
+    }
+    return sb.toString();
+  }
+
+  /**
+   * @param indentFlag
+   * @param tail
+   *          help to generate correct indent with a specific tail
+   * @return
+   */
+  public static String prefixString(int indentFlag, String tail) {
+    StringBuilder sb = new StringBuilder();
+    for (int index = 0; index < indentFlag; index++) {
+      sb.append("  ");
+    }
+    int len = sb.length();
+    return sb.replace(len - tail.length(), len, tail).toString();
+  }
+
+  @Override
+  public void print(JSONObject inputObject, PrintStream outputStream) throws Exception {
+    LOG.info("JsonParser is parsing:" + inputObject.toString());
+    this.extractStagesAndPlans(inputObject);
+    Printer printer = new Printer();
+    // print out the cbo info
+    if (inputObject.has("cboInfo")) {
+      printer.println(inputObject.getString("cboInfo"));
+      printer.println();
+    }
+    // print out the vertex dependency in root stage
+    for (Stage candidate : this.stages.values()) {
+      if (candidate.tezStageDependency != null && candidate.tezStageDependency.size() > 0) {
+        printer.println("Vertex dependency in root stage");
+        for (Entry<Vertex, List<Connection>> entry : candidate.tezStageDependency.entrySet()) {
+          StringBuilder sb = new StringBuilder();
+          sb.append(entry.getKey().name);
+          sb.append(" <- ");
+          boolean printcomma = false;
+          for (Connection connection : entry.getValue()) {
+            if (printcomma) {
+              sb.append(", ");
+            } else {
+              printcomma = true;
+            }
+            sb.append(connection.from.name + " (" + connection.type + ")");
+          }
+          printer.println(sb.toString());
+        }
+        printer.println();
+      }
+    }
+    // print out all the stages that have no childStages.
+    for (Stage candidate : this.stages.values()) {
+      if (candidate.childStages.isEmpty()) {
+        candidate.print(printer, 0);
+      }
+    }
+    outputStream.println(printer.toString());
+  }
+
+  public void addInline(Op op, Connection connection) {
+    List<Connection> list = inlineMap.get(op);
+    if (list == null) {
+      list = new ArrayList<>();
+      list.add(connection);
+      inlineMap.put(op, list);
+    } else {
+      list.add(connection);
+    }
+  }
+
+  public boolean isInline(Vertex v) {
+    for (List<Connection> list : inlineMap.values()) {
+      for (Connection connection : list) {
+        if (connection.from.equals(v)) {
+          return true;
+        }
+      }
+    }
+    return false;
+  }
+
+  public abstract String mapEdgeType(String edgeName);
+
+  public abstract String getFrameworkName();
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/common/src/java/org/apache/hadoop/hive/common/jsonexplain/DagJsonParserUtils.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/common/jsonexplain/DagJsonParserUtils.java b/common/src/java/org/apache/hadoop/hive/common/jsonexplain/DagJsonParserUtils.java
new file mode 100644
index 0000000..a518ac1
--- /dev/null
+++ b/common/src/java/org/apache/hadoop/hive/common/jsonexplain/DagJsonParserUtils.java
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.common.jsonexplain;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+
+
+public class DagJsonParserUtils {
+
+  public static List<String> OperatorNoStats = Arrays.asList(new String[] { "File Output Operator",
+      "Reduce Output Operator" });
+
+  public static String renameReduceOutputOperator(String operatorName, Vertex vertex) {
+    if (operatorName.equals("Reduce Output Operator") && vertex.edgeType != null) {
+      return vertex.edgeType;
+    } else {
+      return operatorName;
+    }
+  }
+
+  public static String attrsToString(Map<String, String> attrs) {
+    StringBuffer sb = new StringBuffer();
+    boolean first = true;
+    for (Entry<String, String> entry : attrs.entrySet()) {
+      if (first) {
+        first = false;
+      } else {
+        sb.append(",");
+      }
+      sb.append(entry.getKey() + entry.getValue());
+    }
+    return sb.toString();
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/common/src/java/org/apache/hadoop/hive/common/jsonexplain/JsonParserFactory.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/common/jsonexplain/JsonParserFactory.java b/common/src/java/org/apache/hadoop/hive/common/jsonexplain/JsonParserFactory.java
index db118bf..2a5d47a 100644
--- a/common/src/java/org/apache/hadoop/hive/common/jsonexplain/JsonParserFactory.java
+++ b/common/src/java/org/apache/hadoop/hive/common/jsonexplain/JsonParserFactory.java
@@ -18,6 +18,7 @@
 
 package org.apache.hadoop.hive.common.jsonexplain;
 
+import org.apache.hadoop.hive.common.jsonexplain.spark.SparkJsonParser;
 import org.apache.hadoop.hive.common.jsonexplain.tez.TezJsonParser;
 import org.apache.hadoop.hive.conf.HiveConf;
 
@@ -35,6 +36,9 @@ public class JsonParserFactory {
     if (HiveConf.getVar(conf, HiveConf.ConfVars.HIVE_EXECUTION_ENGINE).equals("tez")) {
       return new TezJsonParser();
     }
+    if (HiveConf.getVar(conf, HiveConf.ConfVars.HIVE_EXECUTION_ENGINE).equals("spark")) {
+      return new SparkJsonParser();
+    }
     return null;
   }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/common/src/java/org/apache/hadoop/hive/common/jsonexplain/Op.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/common/jsonexplain/Op.java b/common/src/java/org/apache/hadoop/hive/common/jsonexplain/Op.java
new file mode 100644
index 0000000..03c5981
--- /dev/null
+++ b/common/src/java/org/apache/hadoop/hive/common/jsonexplain/Op.java
@@ -0,0 +1,358 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.common.jsonexplain;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+
+import org.apache.hadoop.hive.common.jsonexplain.Vertex.VertexType;
+import org.json.JSONArray;
+import org.json.JSONException;
+import org.json.JSONObject;
+
+public final class Op {
+  public final String name;
+  // tezJsonParser
+  public final DagJsonParser parser;
+  public final String operatorId;
+  public Op parent;
+  public final List<Op> children;
+  public final Map<String, String> attrs;
+  // the jsonObject for this operator
+  public final JSONObject opObject;
+  // the vertex that this operator belongs to
+  public final Vertex vertex;
+  // the vertex that this operator output to
+  public final String outputVertexName;
+  // the Operator type
+  public final OpType type;
+
+  public enum OpType {
+    MAPJOIN, MERGEJOIN, RS, OTHERS
+  };
+
+  public Op(String name, String id, String outputVertexName, List<Op> children,
+      Map<String, String> attrs, JSONObject opObject, Vertex vertex, DagJsonParser tezJsonParser)
+      throws JSONException {
+    super();
+    this.name = name;
+    this.operatorId = id;
+    this.type = deriveOpType(operatorId);
+    this.outputVertexName = outputVertexName;
+    this.children = children;
+    this.attrs = attrs;
+    this.opObject = opObject;
+    this.vertex = vertex;
+    this.parser = tezJsonParser;
+  }
+
+  private OpType deriveOpType(String operatorId) {
+    if (operatorId != null) {
+      if (operatorId.startsWith(OpType.MAPJOIN.toString())) {
+        return OpType.MAPJOIN;
+      } else if (operatorId.startsWith(OpType.MERGEJOIN.toString())) {
+        return OpType.MERGEJOIN;
+      } else if (operatorId.startsWith(OpType.RS.toString())) {
+        return OpType.RS;
+      } else {
+        return OpType.OTHERS;
+      }
+    } else {
+      return OpType.OTHERS;
+    }
+  }
+
+  private void inlineJoinOp() throws Exception {
+    // inline map join operator
+    if (this.type == OpType.MAPJOIN) {
+      JSONObject joinObj = opObject.getJSONObject(this.name);
+      // get the map for posToVertex
+      Map<String, Vertex> posToVertex = new LinkedHashMap<>();
+      if (joinObj.has("input vertices:")) {
+        JSONObject verticeObj = joinObj.getJSONObject("input vertices:");
+        for (String pos : JSONObject.getNames(verticeObj)) {
+          String vertexName = verticeObj.getString(pos);
+          // update the connection
+          Connection c = null;
+          for (Connection connection : vertex.parentConnections) {
+            if (connection.from.name.equals(vertexName)) {
+              posToVertex.put(pos, connection.from);
+              c = connection;
+              break;
+            }
+          }
+          if (c != null) {
+            parser.addInline(this, c);
+          }
+        }
+        // update the attrs
+        this.attrs.remove("input vertices:");
+      }
+      // update the keys to use operator name
+      JSONObject keys = joinObj.getJSONObject("keys:");
+      // find out the vertex for the big table
+      Set<Vertex> parentVertexes = new HashSet<>();
+      for (Connection connection : vertex.parentConnections) {
+        parentVertexes.add(connection.from);
+      }
+      parentVertexes.removeAll(posToVertex.values());
+      Map<String, String> posToOpId = new LinkedHashMap<>();
+      if (keys.length() != 0) {
+        for (String key : JSONObject.getNames(keys)) {
+          // first search from the posToVertex
+          if (posToVertex.containsKey(key)) {
+            Vertex v = posToVertex.get(key);
+            if (v.rootOps.size() == 1) {
+              posToOpId.put(key, v.rootOps.get(0).operatorId);
+            } else if ((v.rootOps.size() == 0 && v.vertexType == VertexType.UNION)) {
+              posToOpId.put(key, v.name);
+            } else {
+              Op joinRSOp = v.getJoinRSOp(vertex);
+              if (joinRSOp != null) {
+                posToOpId.put(key, joinRSOp.operatorId);
+              } else {
+                throw new Exception(
+                    "Can not find join reduceSinkOp for " + v.name + " to join " + vertex.name
+                        + " when hive explain user is trying to identify the operator id.");
+              }
+            }
+          }
+          // then search from parent
+          else if (parent != null) {
+            posToOpId.put(key, parent.operatorId);
+          }
+          // then assume it is from its own vertex
+          else if (parentVertexes.size() == 1) {
+            Vertex v = parentVertexes.iterator().next();
+            parentVertexes.clear();
+            if (v.rootOps.size() == 1) {
+              posToOpId.put(key, v.rootOps.get(0).operatorId);
+            } else if ((v.rootOps.size() == 0 && v.vertexType == VertexType.UNION)) {
+              posToOpId.put(key, v.name);
+            } else {
+              Op joinRSOp = v.getJoinRSOp(vertex);
+              if (joinRSOp != null) {
+                posToOpId.put(key, joinRSOp.operatorId);
+              } else {
+                throw new Exception(
+                    "Can not find join reduceSinkOp for " + v.name + " to join " + vertex.name
+                        + " when hive explain user is trying to identify the operator id.");
+              }
+            }
+          }
+          // finally throw an exception
+          else {
+            throw new Exception(
+                "Can not find the source operator on one of the branches of map join.");
+          }
+        }
+      }
+      this.attrs.remove("keys:");
+      StringBuffer sb = new StringBuffer();
+      JSONArray conditionMap = joinObj.getJSONArray("condition map:");
+      for (int index = 0; index < conditionMap.length(); index++) {
+        JSONObject cond = conditionMap.getJSONObject(index);
+        String k = (String) cond.keys().next();
+        JSONObject condObject = new JSONObject((String)cond.get(k));
+        String type = condObject.getString("type");
+        String left = condObject.getString("left");
+        String right = condObject.getString("right");
+        if (keys.length() != 0) {
+          sb.append(posToOpId.get(left) + "." + keys.get(left) + "=" + posToOpId.get(right) + "."
+              + keys.get(right) + "(" + type + "),");
+        } else {
+          // probably a cross product
+          sb.append("(" + type + "),");
+        }
+      }
+      this.attrs.remove("condition map:");
+      this.attrs.put("Conds:", sb.substring(0, sb.length() - 1));
+    }
+    // should be merge join
+    else {
+      Map<String, String> posToOpId = new LinkedHashMap<>();
+      if (vertex.mergeJoinDummyVertexs.size() == 0) {
+        if (vertex.tagToInput.size() != vertex.parentConnections.size()) {
+          throw new Exception("tagToInput size " + vertex.tagToInput.size()
+              + " is different from parentConnections size " + vertex.parentConnections.size());
+        }
+        for (Entry<String, String> entry : vertex.tagToInput.entrySet()) {
+          Connection c = null;
+          for (Connection connection : vertex.parentConnections) {
+            if (connection.from.name.equals(entry.getValue())) {
+              Vertex v = connection.from;
+              if (v.rootOps.size() == 1) {
+                posToOpId.put(entry.getKey(), v.rootOps.get(0).operatorId);
+              } else if ((v.rootOps.size() == 0 && v.vertexType == VertexType.UNION)) {
+                posToOpId.put(entry.getKey(), v.name);
+              } else {
+                Op joinRSOp = v.getJoinRSOp(vertex);
+                if (joinRSOp != null) {
+                  posToOpId.put(entry.getKey(), joinRSOp.operatorId);
+                } else {
+                  throw new Exception(
+                      "Can not find join reduceSinkOp for " + v.name + " to join " + vertex.name
+                          + " when hive explain user is trying to identify the operator id.");
+                }
+              }
+              c = connection;
+              break;
+            }
+          }
+          if (c == null) {
+            throw new Exception("Can not find " + entry.getValue()
+                + " while parsing keys of merge join operator");
+          }
+        }
+      } else {
+        posToOpId.put(vertex.tag, this.parent.operatorId);
+        for (Vertex v : vertex.mergeJoinDummyVertexs) {
+          if (v.rootOps.size() != 1) {
+            throw new Exception("Can not find a single root operators in a single vertex " + v.name
+                + " when hive explain user is trying to identify the operator id.");
+          }
+          posToOpId.put(v.tag, v.rootOps.get(0).operatorId);
+        }
+      }
+      JSONObject joinObj = opObject.getJSONObject(this.name);
+      // update the keys to use operator name
+      JSONObject keys = joinObj.getJSONObject("keys:");
+      if (keys.length() != 0) {
+        for (String key : JSONObject.getNames(keys)) {
+          if (!posToOpId.containsKey(key)) {
+            throw new Exception(
+                "Can not find the source operator on one of the branches of merge join.");
+          }
+        }
+        // inline merge join operator in a self-join
+        if (this.vertex != null) {
+          for (Vertex v : this.vertex.mergeJoinDummyVertexs) {
+            parser.addInline(this, new Connection(null, v));
+          }
+        }
+      }
+      // update the attrs
+      this.attrs.remove("keys:");
+      StringBuffer sb = new StringBuffer();
+      JSONArray conditionMap = joinObj.getJSONArray("condition map:");
+      for (int index = 0; index < conditionMap.length(); index++) {
+        JSONObject cond = conditionMap.getJSONObject(index);
+        String k = (String) cond.keys().next();
+        JSONObject condObject = new JSONObject((String)cond.get(k));
+        String type = condObject.getString("type");
+        String left = condObject.getString("left");
+        String right = condObject.getString("right");
+        if (keys.length() != 0) {
+          sb.append(posToOpId.get(left) + "." + keys.get(left) + "=" + posToOpId.get(right) + "."
+              + keys.get(right) + "(" + type + "),");
+        } else {
+          // probably a cross product
+          sb.append("(" + type + "),");
+        }
+      }
+      this.attrs.remove("condition map:");
+      this.attrs.put("Conds:", sb.substring(0, sb.length() - 1));
+    }
+  }
+
+  private String getNameWithOpIdStats() {
+    StringBuffer sb = new StringBuffer();
+    sb.append(DagJsonParserUtils.renameReduceOutputOperator(name, vertex));
+    if (operatorId != null) {
+      sb.append(" [" + operatorId + "]");
+    }
+    if (!DagJsonParserUtils.OperatorNoStats.contains(name) && attrs.containsKey("Statistics:")) {
+      sb.append(" (" + attrs.get("Statistics:") + ")");
+    }
+    attrs.remove("Statistics:");
+    return sb.toString();
+  }
+
+  /**
+   * @param printer
+   * @param indentFlag
+   * @param branchOfJoinOp
+   *          This parameter is used to show if it is a branch of a Join
+   *          operator so that we can decide the corresponding indent.
+   * @throws Exception
+   */
+  public void print(Printer printer, int indentFlag, boolean branchOfJoinOp) throws Exception {
+    // print name
+    if (parser.printSet.contains(this)) {
+      printer.println(DagJsonParser.prefixString(indentFlag) + " Please refer to the previous "
+          + this.getNameWithOpIdStats());
+      return;
+    }
+    parser.printSet.add(this);
+    if (!branchOfJoinOp) {
+      printer.println(DagJsonParser.prefixString(indentFlag) + this.getNameWithOpIdStats());
+    } else {
+      printer.println(DagJsonParser.prefixString(indentFlag, "<-") + this.getNameWithOpIdStats());
+    }
+    branchOfJoinOp = false;
+    // if this operator is a Map Join Operator or a Merge Join Operator
+    if (this.type == OpType.MAPJOIN || this.type == OpType.MERGEJOIN) {
+      inlineJoinOp();
+      branchOfJoinOp = true;
+    }
+    // if this operator is the last operator, we summarize the non-inlined
+    // vertex
+    List<Connection> noninlined = new ArrayList<>();
+    if (this.parent == null) {
+      if (this.vertex != null) {
+        for (Connection connection : this.vertex.parentConnections) {
+          if (!parser.isInline(connection.from)) {
+            noninlined.add(connection);
+          }
+        }
+      }
+    }
+    // print attr
+    indentFlag++;
+    if (!attrs.isEmpty()) {
+      printer.println(DagJsonParser.prefixString(indentFlag)
+          + DagJsonParserUtils.attrsToString(attrs));
+    }
+    // print inline vertex
+    if (parser.inlineMap.containsKey(this)) {
+      List<Connection> connections = parser.inlineMap.get(this);
+      Collections.sort(connections);
+      for (Connection connection : connections) {
+        connection.from.print(printer, indentFlag, connection.type, this.vertex);
+      }
+    }
+    // print parent op, i.e., where data comes from
+    if (this.parent != null) {
+      this.parent.print(printer, indentFlag, branchOfJoinOp);
+    }
+    // print next vertex
+    else {
+      Collections.sort(noninlined);
+      for (Connection connection : noninlined) {
+        connection.from.print(printer, indentFlag, connection.type, this.vertex);
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/common/src/java/org/apache/hadoop/hive/common/jsonexplain/Printer.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/common/jsonexplain/Printer.java b/common/src/java/org/apache/hadoop/hive/common/jsonexplain/Printer.java
new file mode 100644
index 0000000..6f040f6
--- /dev/null
+++ b/common/src/java/org/apache/hadoop/hive/common/jsonexplain/Printer.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.common.jsonexplain;
+
+public final class Printer {
+  public static final String lineSeparator = System.getProperty("line.separator");;
+  private final StringBuilder builder = new StringBuilder();
+
+  public void print(String string) {
+    builder.append(string);
+  }
+
+  public void println(String string) {
+    builder.append(string);
+    builder.append(lineSeparator);
+  }
+
+  public void println() {
+    builder.append(lineSeparator);
+  }
+  
+  public String toString() {
+    return builder.toString();
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/common/src/java/org/apache/hadoop/hive/common/jsonexplain/Stage.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/common/jsonexplain/Stage.java b/common/src/java/org/apache/hadoop/hive/common/jsonexplain/Stage.java
new file mode 100644
index 0000000..d21a565
--- /dev/null
+++ b/common/src/java/org/apache/hadoop/hive/common/jsonexplain/Stage.java
@@ -0,0 +1,262 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.common.jsonexplain;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.common.jsonexplain.Vertex.VertexType;
+import org.json.JSONArray;
+import org.json.JSONException;
+import org.json.JSONObject;
+
+public final class Stage {
+  //external name is used to show at the console
+  String externalName;
+  //internal name is used to track the stages
+  public final String internalName;
+  //tezJsonParser
+  public final DagJsonParser parser;
+  // upstream stages, e.g., root stage
+  public final List<Stage> parentStages = new ArrayList<>();
+  // downstream stages.
+  public final List<Stage> childStages = new ArrayList<>();
+  public final Map<String, Vertex> vertexs =new LinkedHashMap<>();
+  public final Map<String, String> attrs = new TreeMap<>();
+  Map<Vertex, List<Connection>> tezStageDependency;
+  // some stage may contain only a single operator, e.g., create table operator,
+  // fetch operator.
+  Op op;
+
+  public Stage(String name, DagJsonParser tezJsonParser) {
+    super();
+    internalName = name;
+    externalName = name;
+    parser = tezJsonParser;
+  }
+
+  public void addDependency(JSONObject object, Map<String, Stage> stages) throws JSONException {
+    if (object.has("DEPENDENT STAGES")) {
+      String names = object.getString("DEPENDENT STAGES");
+      for (String name : names.split(",")) {
+        Stage parent = stages.get(name.trim());
+        this.parentStages.add(parent);
+        parent.childStages.add(this);
+      }
+    }
+    if (object.has("CONDITIONAL CHILD TASKS")) {
+      String names = object.getString("CONDITIONAL CHILD TASKS");
+      this.externalName = this.internalName + "(CONDITIONAL CHILD TASKS: " + names + ")";
+      for (String name : names.split(",")) {
+        Stage child = stages.get(name.trim());
+        child.externalName = child.internalName + "(CONDITIONAL)";
+        child.parentStages.add(this);
+        this.childStages.add(child);
+      }
+    }
+  }
+
+  /**
+   * @param object
+   * @throws Exception
+   *           If the object of stage contains "Tez", we need to extract the
+   *           vertices and edges Else we need to directly extract operators
+   *           and/or attributes.
+   */
+  public void extractVertex(JSONObject object) throws Exception {
+    if (object.has(this.parser.getFrameworkName())) {
+      this.tezStageDependency = new TreeMap<>();
+      JSONObject tez = (JSONObject) object.get(this.parser.getFrameworkName());
+      JSONObject vertices = tez.getJSONObject("Vertices:");
+      if (tez.has("Edges:")) {
+        JSONObject edges = tez.getJSONObject("Edges:");
+        // iterate for the first time to get all the vertices
+        for (String to : JSONObject.getNames(edges)) {
+          vertexs.put(to, new Vertex(to, vertices.getJSONObject(to), parser));
+        }
+        // iterate for the second time to get all the vertex dependency
+        for (String to : JSONObject.getNames(edges)) {
+          Object o = edges.get(to);
+          Vertex v = vertexs.get(to);
+          // 1 to 1 mapping
+          if (o instanceof JSONObject) {
+            JSONObject obj = (JSONObject) o;
+            String parent = obj.getString("parent");
+            Vertex parentVertex = vertexs.get(parent);
+            if (parentVertex == null) {
+              parentVertex = new Vertex(parent, vertices.getJSONObject(parent), parser);
+              vertexs.put(parent, parentVertex);
+            }
+            String type = obj.getString("type");
+            // for union vertex, we reverse the dependency relationship
+            if (!"CONTAINS".equals(type)) {
+              v.addDependency(new Connection(type, parentVertex));
+              parentVertex.setType(type);
+              parentVertex.children.add(v);
+            } else {
+              parentVertex.addDependency(new Connection(type, v));
+              v.children.add(parentVertex);
+            }
+            this.tezStageDependency.put(v, Arrays.asList(new Connection(type, parentVertex)));
+          } else {
+            // 1 to many mapping
+            JSONArray from = (JSONArray) o;
+            List<Connection> list = new ArrayList<>();
+            for (int index = 0; index < from.length(); index++) {
+              JSONObject obj = from.getJSONObject(index);
+              String parent = obj.getString("parent");
+              Vertex parentVertex = vertexs.get(parent);
+              if (parentVertex == null) {
+                parentVertex = new Vertex(parent, vertices.getJSONObject(parent), parser);
+                vertexs.put(parent, parentVertex);
+              }
+              String type = obj.getString("type");
+              if (!"CONTAINS".equals(type)) {
+                v.addDependency(new Connection(type, parentVertex));
+                parentVertex.setType(type);
+                parentVertex.children.add(v);
+              } else {
+                parentVertex.addDependency(new Connection(type, v));
+                v.children.add(parentVertex);
+              }
+              list.add(new Connection(type, parentVertex));
+            }
+            this.tezStageDependency.put(v, list);
+          }
+        }
+      } else {
+        for (String vertexName : JSONObject.getNames(vertices)) {
+          vertexs.put(vertexName, new Vertex(vertexName, vertices.getJSONObject(vertexName), parser));
+        }
+      }
+      // The opTree in vertex is extracted
+      for (Vertex v : vertexs.values()) {
+        if (v.vertexType == VertexType.MAP || v.vertexType == VertexType.REDUCE) {
+          v.extractOpTree();
+          v.checkMultiReduceOperator();
+        }
+      }
+    } else {
+      String[] names = JSONObject.getNames(object);
+      if (names != null) {
+        for (String name : names) {
+          if (name.contains("Operator")) {
+            this.op = extractOp(name, object.getJSONObject(name));
+          } else {
+            if (!object.get(name).toString().isEmpty()) {
+              attrs.put(name, object.get(name).toString());
+            }
+          }
+        }
+      }
+    }
+  }
+
+  /**
+   * @param opName
+   * @param opObj
+   * @return
+   * @throws Exception
+   *           This method address the create table operator, fetch operator,
+   *           etc
+   */
+  Op extractOp(String opName, JSONObject opObj) throws Exception {
+    Map<String, String> attrs = new TreeMap<>();
+    Vertex v = null;
+    if (opObj.length() > 0) {
+      String[] names = JSONObject.getNames(opObj);
+      for (String name : names) {
+        Object o = opObj.get(name);
+        if (isPrintable(o) && !o.toString().isEmpty()) {
+          attrs.put(name, o.toString());
+        } else if (o instanceof JSONObject) {
+          JSONObject attrObj = (JSONObject) o;
+          if (attrObj.length() > 0) {
+            if (name.equals("Processor Tree:")) {
+              JSONObject object = new JSONObject(new LinkedHashMap<>());
+              object.put(name, attrObj);
+              v = new Vertex(null, object, parser);
+              v.extractOpTree();
+            } else {
+              for (String attrName : JSONObject.getNames(attrObj)) {
+                if (!attrObj.get(attrName).toString().isEmpty()) {
+                  attrs.put(attrName, attrObj.get(attrName).toString());
+                }
+              }
+            }
+          }
+        } else {
+          throw new Exception("Unsupported object in " + this.internalName);
+        }
+      }
+    }
+    Op op = new Op(opName, null, null, null, attrs, null, v, parser);
+    if (v != null) {
+      parser.addInline(op, new Connection(null, v));
+    }
+    return op;
+  }
+
+  private boolean isPrintable(Object val) {
+    if (val instanceof Boolean || val instanceof String || val instanceof Integer
+        || val instanceof Long || val instanceof Byte || val instanceof Float
+        || val instanceof Double || val instanceof Path) {
+      return true;
+    }
+    if (val != null && val.getClass().isPrimitive()) {
+      return true;
+    }
+    return false;
+  }
+
+  public void print(Printer printer, int indentFlag) throws Exception {
+    // print stagename
+    if (parser.printSet.contains(this)) {
+      printer.println(DagJsonParser.prefixString(indentFlag) + " Please refer to the previous "
+          + externalName);
+      return;
+    }
+    parser.printSet.add(this);
+    printer.println(DagJsonParser.prefixString(indentFlag) + externalName);
+    // print vertexes
+    indentFlag++;
+    for (Vertex candidate : this.vertexs.values()) {
+      if (!parser.isInline(candidate) && candidate.children.isEmpty()) {
+        candidate.print(printer, indentFlag, null, null);
+      }
+    }
+    if (!attrs.isEmpty()) {
+      printer.println(DagJsonParser.prefixString(indentFlag)
+          + DagJsonParserUtils.attrsToString(attrs));
+    }
+    if (op != null) {
+      op.print(printer, indentFlag, false);
+    }
+    indentFlag++;
+    // print dependent stages
+    for (Stage stage : this.parentStages) {
+      stage.print(printer, indentFlag);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/common/src/java/org/apache/hadoop/hive/common/jsonexplain/Vertex.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/common/jsonexplain/Vertex.java b/common/src/java/org/apache/hadoop/hive/common/jsonexplain/Vertex.java
new file mode 100644
index 0000000..c93059d
--- /dev/null
+++ b/common/src/java/org/apache/hadoop/hive/common/jsonexplain/Vertex.java
@@ -0,0 +1,323 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.common.jsonexplain;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+
+import org.apache.hadoop.hive.common.jsonexplain.Op.OpType;
+import org.codehaus.jackson.JsonParseException;
+import org.codehaus.jackson.map.JsonMappingException;
+import org.json.JSONArray;
+import org.json.JSONException;
+import org.json.JSONObject;
+
+public final class Vertex implements Comparable<Vertex>{
+  public final String name;
+  //tezJsonParser
+  public final DagJsonParser parser;
+  // vertex's parent connections.
+  public final List<Connection> parentConnections = new ArrayList<>();
+  // vertex's children vertex.
+  public final List<Vertex> children = new ArrayList<>();
+  // the jsonObject for this vertex
+  public final JSONObject vertexObject;
+  // whether this vertex is dummy (which does not really exists but is created),
+  // e.g., a dummy vertex for a mergejoin branch
+  public boolean dummy;
+  // the rootOps in this vertex
+  public final List<Op> rootOps = new ArrayList<>();
+  // we create a dummy vertex for a mergejoin branch for a self join if this
+  // vertex is a mergejoin
+  public final List<Vertex> mergeJoinDummyVertexs = new ArrayList<>();
+  // this vertex has multiple reduce operators
+  public int numReduceOp = 0;
+  // execution mode
+  public String executionMode = "";
+  // tagToInput for reduce work
+  public Map<String, String> tagToInput = new LinkedHashMap<>();
+  // tag
+  public String tag;
+
+  public static enum VertexType {
+    MAP, REDUCE, UNION, UNKNOWN
+  };
+  public VertexType vertexType;
+
+  public static enum EdgeType {
+    BROADCAST, SHUFFLE, MULTICAST, PARTITION_ONLY_SHUFFLE, UNKNOWN
+  };
+  public String edgeType;
+
+  public Vertex(String name, JSONObject vertexObject, DagJsonParser dagJsonParser) {
+    super();
+    this.name = name;
+    if (this.name != null) {
+      if (this.name.contains("Map")) {
+        this.vertexType = VertexType.MAP;
+      } else if (this.name.contains("Reduce")) {
+        this.vertexType = VertexType.REDUCE;
+      } else if (this.name.contains("Union")) {
+        this.vertexType = VertexType.UNION;
+      } else {
+        this.vertexType = VertexType.UNKNOWN;
+      }
+    } else {
+      this.vertexType = VertexType.UNKNOWN;
+    }
+    this.dummy = false;
+    this.vertexObject = vertexObject;
+    parser = dagJsonParser;
+  }
+
+  public void addDependency(Connection connection) throws JSONException {
+    this.parentConnections.add(connection);
+  }
+
+  /**
+   * @throws JSONException
+   * @throws JsonParseException
+   * @throws JsonMappingException
+   * @throws IOException
+   * @throws Exception
+   *           We assume that there is a single top-level Map Operator Tree or a
+   *           Reduce Operator Tree in a vertex
+   */
+  public void extractOpTree() throws JSONException, JsonParseException, JsonMappingException,
+      IOException, Exception {
+    if (vertexObject.length() != 0) {
+      for (String key : JSONObject.getNames(vertexObject)) {
+        if (key.equals("Map Operator Tree:")) {
+          extractOp(vertexObject.getJSONArray(key).getJSONObject(0));
+        } else if (key.equals("Reduce Operator Tree:") || key.equals("Processor Tree:")) {
+          extractOp(vertexObject.getJSONObject(key));
+        } else if (key.equals("Join:")) {
+          // this is the case when we have a map-side SMB join
+          // one input of the join is treated as a dummy vertex
+          JSONArray array = vertexObject.getJSONArray(key);
+          for (int index = 0; index < array.length(); index++) {
+            JSONObject mpOpTree = array.getJSONObject(index);
+            Vertex v = new Vertex(null, mpOpTree, parser);
+            v.extractOpTree();
+            v.dummy = true;
+            mergeJoinDummyVertexs.add(v);
+          }
+        } else if (key.equals("Merge File Operator")) {
+          JSONObject opTree = vertexObject.getJSONObject(key);
+          if (opTree.has("Map Operator Tree:")) {
+            extractOp(opTree.getJSONArray("Map Operator Tree:").getJSONObject(0));
+          } else {
+            throw new Exception("Merge File Operator does not have a Map Operator Tree");
+          }
+        } else if (key.equals("Execution mode:")) {
+          executionMode = " " + vertexObject.getString(key);
+        } else if (key.equals("tagToInput:")) {
+          JSONObject tagToInput = vertexObject.getJSONObject(key);
+          for (String tag : JSONObject.getNames(tagToInput)) {
+            this.tagToInput.put(tag, (String) tagToInput.get(tag));
+          }
+        } else if (key.equals("tag:")) {
+          this.tag = vertexObject.getString(key);
+        } else if (key.equals("Local Work:")) {
+          extractOp(vertexObject.getJSONObject(key));
+        } else {
+          throw new Exception("Unsupported operator tree in vertex " + this.name);
+        }
+      }
+    }
+  }
+
+  /**
+   * @param operator
+   * @param parent
+   * @return
+   * @throws JSONException
+   * @throws JsonParseException
+   * @throws JsonMappingException
+   * @throws IOException
+   * @throws Exception
+   *           assumption: each operator only has one parent but may have many
+   *           children
+   */
+  Op extractOp(JSONObject operator) throws JSONException, JsonParseException, JsonMappingException,
+      IOException, Exception {
+    String[] names = JSONObject.getNames(operator);
+    if (names.length != 1) {
+      throw new Exception("Expect only one operator in " + operator.toString());
+    } else {
+      String opName = names[0];
+      JSONObject attrObj = (JSONObject) operator.get(opName);
+      Map<String, String> attrs = new TreeMap<>();
+      List<Op> children = new ArrayList<>();
+      String id = null;
+      String outputVertexName = null;
+      if (JSONObject.getNames(attrObj) != null) {
+        for (String attrName : JSONObject.getNames(attrObj)) {
+          if (attrName.equals("children")) {
+            Object childrenObj = attrObj.get(attrName);
+            if (childrenObj instanceof JSONObject) {
+              if (((JSONObject) childrenObj).length() != 0) {
+                children.add(extractOp((JSONObject) childrenObj));
+              }
+            } else if (childrenObj instanceof JSONArray) {
+              if (((JSONArray) childrenObj).length() != 0) {
+                JSONArray array = ((JSONArray) childrenObj);
+                for (int index = 0; index < array.length(); index++) {
+                  children.add(extractOp(array.getJSONObject(index)));
+                }
+              }
+            } else {
+              throw new Exception("Unsupported operator " + this.name
+                      + "'s children operator is neither a jsonobject nor a jsonarray");
+            }
+          } else {
+            if (attrName.equals("OperatorId:")) {
+              id = attrObj.get(attrName).toString();
+            } else if (attrName.equals("outputname:")) {
+              outputVertexName = attrObj.get(attrName).toString();
+            } else {
+              if (!attrObj.get(attrName).toString().isEmpty()) {
+                attrs.put(attrName, attrObj.get(attrName).toString());
+              }
+            }
+          }
+        }
+      }
+      Op op = new Op(opName, id, outputVertexName, children, attrs, operator, this, parser);
+      if (!children.isEmpty()) {
+        for (Op child : children) {
+          child.parent = op;
+        }
+      } else {
+        this.rootOps.add(op);
+      }
+      return op;
+    }
+  }
+
+  public void print(Printer printer, int indentFlag, String type, Vertex callingVertex)
+      throws JSONException, Exception {
+    // print vertexname
+    if (parser.printSet.contains(this) && numReduceOp <= 1) {
+      if (type != null) {
+        printer.println(DagJsonParser.prefixString(indentFlag, "<-")
+            + " Please refer to the previous " + this.name + " [" + type + "]");
+      } else {
+        printer.println(DagJsonParser.prefixString(indentFlag, "<-")
+            + " Please refer to the previous " + this.name);
+      }
+      return;
+    }
+    parser.printSet.add(this);
+    if (type != null) {
+      printer.println(DagJsonParser.prefixString(indentFlag, "<-") + this.name + " [" + type + "]"
+          + this.executionMode);
+    } else if (this.name != null) {
+      printer.println(DagJsonParser.prefixString(indentFlag) + this.name + this.executionMode);
+    }
+    // print operators
+    if (numReduceOp > 1 && !(callingVertex.vertexType == VertexType.UNION)) {
+      // find the right op
+      Op choose = null;
+      for (Op op : this.rootOps) {
+        if (op.outputVertexName.equals(callingVertex.name)) {
+          choose = op;
+        }
+      }
+      if (choose != null) {
+        choose.print(printer, indentFlag, false);
+      } else {
+        throw new Exception("Can not find the right reduce output operator for vertex " + this.name);
+      }
+    } else {
+      for (Op op : this.rootOps) {
+        // dummy vertex is treated as a branch of a join operator
+        if (this.dummy) {
+          op.print(printer, indentFlag, true);
+        } else {
+          op.print(printer, indentFlag, false);
+        }
+      }
+    }
+    if (vertexType == VertexType.UNION) {
+      // print dependent vertexs
+      indentFlag++;
+      for (int index = 0; index < this.parentConnections.size(); index++) {
+        Connection connection = this.parentConnections.get(index);
+        connection.from.print(printer, indentFlag, connection.type, this);
+      }
+    }
+  }
+
+  /**
+   * We check if a vertex has multiple reduce operators.
+   */
+  public void checkMultiReduceOperator() {
+    // check if it is a reduce vertex and its children is more than 1;
+    if (this.rootOps.size() < 2) {
+      return;
+    }
+    // check if all the child ops are reduce output operators
+    for (Op op : this.rootOps) {
+      if (op.type == OpType.RS) {
+        numReduceOp++;
+      }
+    }
+  }
+
+  public void setType(String type) {
+    this.edgeType = this.parser.mapEdgeType(type);
+  }
+
+  // The following code should be gone after HIVE-11075 using topological order
+  @Override
+  public int compareTo(Vertex o) {
+    // we print the vertex that has more rs before the vertex that has fewer rs.
+    if (numReduceOp != o.numReduceOp) {
+      return -(numReduceOp - o.numReduceOp);
+    } else {
+      return this.name.compareTo(o.name);
+    }
+  }
+
+  public Op getJoinRSOp(Vertex joinVertex) {
+    if (rootOps.size() == 0) {
+      return null;
+    } else if (rootOps.size() == 1) {
+      if (rootOps.get(0).type == OpType.RS) {
+        return rootOps.get(0);
+      } else {
+        return null;
+      }
+    } else {
+      for (Op op : rootOps) {
+        if (op.type == OpType.RS) {
+          if (op.outputVertexName.equals(joinVertex.name)) {
+            return op;
+          }
+        }
+      }
+      return null;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/common/src/java/org/apache/hadoop/hive/common/jsonexplain/spark/SparkJsonParser.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/common/jsonexplain/spark/SparkJsonParser.java b/common/src/java/org/apache/hadoop/hive/common/jsonexplain/spark/SparkJsonParser.java
new file mode 100644
index 0000000..9485aa4
--- /dev/null
+++ b/common/src/java/org/apache/hadoop/hive/common/jsonexplain/spark/SparkJsonParser.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.common.jsonexplain.spark;
+
+import org.apache.hadoop.hive.common.jsonexplain.DagJsonParser;
+
+
+public class SparkJsonParser extends DagJsonParser {
+
+  @Override
+  public String mapEdgeType(String edgeName) {
+    return edgeName;
+  }
+
+  @Override
+  public String getFrameworkName() {
+    return "Spark";
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/common/src/java/org/apache/hadoop/hive/common/jsonexplain/tez/Connection.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/common/jsonexplain/tez/Connection.java b/common/src/java/org/apache/hadoop/hive/common/jsonexplain/tez/Connection.java
deleted file mode 100644
index d341cb1..0000000
--- a/common/src/java/org/apache/hadoop/hive/common/jsonexplain/tez/Connection.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hive.common.jsonexplain.tez;
-
-public final class Connection {
-  public final String type;
-  public final Vertex from;
-
-  public Connection(String type, Vertex from) {
-    super();
-    this.type = type;
-    this.from = from;
-  }
-}


Mime
View raw message