hive-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jd...@apache.org
Subject [26/51] [abbrv] hive git commit: HIVE-12558: LLAP: output QueryFragmentCounters somewhere (Prasanth Jayachandran reviewed by Sergey Shelukhin)
Date Thu, 17 Mar 2016 22:47:30 GMT
HIVE-12558: LLAP: output QueryFragmentCounters somewhere (Prasanth Jayachandran reviewed by
Sergey Shelukhin)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/b6023c79
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/b6023c79
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/b6023c79

Branch: refs/heads/llap
Commit: b6023c796f0daa37aef2a59b57aa1a29548c8211
Parents: 456a91e
Author: Prasanth Jayachandran <j.prasanth.j@gmail.com>
Authored: Thu Mar 10 21:31:39 2016 -0600
Committer: Prasanth Jayachandran <j.prasanth.j@gmail.com>
Committed: Thu Mar 10 21:31:39 2016 -0600

----------------------------------------------------------------------
 .../TestOperationLoggingAPIWithTez.java         |   2 +-
 .../hive/llap/counters/LlapIOCounters.java      |  37 +++++
 .../hive/llap/counters/FragmentCountersMap.java |  46 +++++++
 .../llap/counters/QueryFragmentCounters.java    |  65 ++++-----
 .../hive/llap/daemon/impl/LlapTaskReporter.java |  14 +-
 .../llap/daemon/impl/TaskRunnerCallable.java    |  14 +-
 .../hive/llap/io/api/impl/LlapInputFormat.java  |  30 +++-
 .../llap/io/decode/OrcEncodedDataConsumer.java  |   9 +-
 .../llap/io/encoded/OrcEncodedDataReader.java   |  31 ++---
 .../hadoop/hive/ql/exec/tez/TezJobMonitor.java  | 136 +++++++++++++++----
 10 files changed, 291 insertions(+), 93 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/b6023c79/itests/hive-unit/src/test/java/org/apache/hive/service/cli/operation/TestOperationLoggingAPIWithTez.java
----------------------------------------------------------------------
diff --git a/itests/hive-unit/src/test/java/org/apache/hive/service/cli/operation/TestOperationLoggingAPIWithTez.java
b/itests/hive-unit/src/test/java/org/apache/hive/service/cli/operation/TestOperationLoggingAPIWithTez.java
index bee1447..8b5b516 100644
--- a/itests/hive-unit/src/test/java/org/apache/hive/service/cli/operation/TestOperationLoggingAPIWithTez.java
+++ b/itests/hive-unit/src/test/java/org/apache/hive/service/cli/operation/TestOperationLoggingAPIWithTez.java
@@ -54,7 +54,7 @@ public class TestOperationLoggingAPIWithTez extends OperationLoggingAPITestBase
       "org.apache.tez.common.counters.DAGCounter",
       "NUM_SUCCEEDED_TASKS",
       "TOTAL_LAUNCHED_TASKS",
-      "CPU_TIME_MILLIS"
+      "CPU_MILLISECONDS"
     };
     hiveConf = new HiveConf();
     hiveConf.set(ConfVars.HIVE_SERVER2_LOGGING_OPERATION_LEVEL.varname, "verbose");

http://git-wip-us.apache.org/repos/asf/hive/blob/b6023c79/llap-common/src/java/org/apache/hadoop/hive/llap/counters/LlapIOCounters.java
----------------------------------------------------------------------
diff --git a/llap-common/src/java/org/apache/hadoop/hive/llap/counters/LlapIOCounters.java
b/llap-common/src/java/org/apache/hadoop/hive/llap/counters/LlapIOCounters.java
new file mode 100644
index 0000000..365ddab
--- /dev/null
+++ b/llap-common/src/java/org/apache/hadoop/hive/llap/counters/LlapIOCounters.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.llap.counters;
+
+/**
+ * LLAP IO related counters.
+ */
+public enum LlapIOCounters {
+  NUM_VECTOR_BATCHES,
+  NUM_DECODED_BATCHES,
+  SELECTED_ROWGROUPS,
+  NUM_ERRORS,
+  ROWS_EMITTED,
+  METADATA_CACHE_HIT,
+  METADATA_CACHE_MISS,
+  CACHE_HIT_BYTES,
+  CACHE_MISS_BYTES,
+  ALLOCATED_BYTES,
+  ALLOCATED_USED_BYTES,
+  TOTAL_IO_TIME_NS,
+  DECODE_TIME_NS,
+  HDFS_TIME_NS,
+  CONSUMER_TIME_NS
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/b6023c79/llap-server/src/java/org/apache/hadoop/hive/llap/counters/FragmentCountersMap.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/counters/FragmentCountersMap.java
b/llap-server/src/java/org/apache/hadoop/hive/llap/counters/FragmentCountersMap.java
new file mode 100644
index 0000000..383b65f
--- /dev/null
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/counters/FragmentCountersMap.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.llap.counters;
+
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+import org.apache.tez.common.counters.TezCounters;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Maintains references to tez counters
+ */
+public class FragmentCountersMap {
+  private static final Logger LOG = LoggerFactory.getLogger(FragmentCountersMap.class);
+  private static final ConcurrentMap<String, TezCounters> perFragmentCounters = new
ConcurrentHashMap<>();
+
+  public static void registerCountersForFragment(String identifier, TezCounters tezCounters)
{
+    if (perFragmentCounters.putIfAbsent(identifier, tezCounters) != null) {
+      LOG.warn("Not registering duplicate counters for fragment with tez identifier string="
+
+          identifier);
+    }
+  }
+
+  public static TezCounters getCountersForFragment(String identifier) {
+    return perFragmentCounters.get(identifier);
+  }
+
+  public static void unregisterCountersForFragment(String identifier) {
+    perFragmentCounters.remove(identifier);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/b6023c79/llap-server/src/java/org/apache/hadoop/hive/llap/counters/QueryFragmentCounters.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/counters/QueryFragmentCounters.java
b/llap-server/src/java/org/apache/hadoop/hive/llap/counters/QueryFragmentCounters.java
index 5d16f72..a53ac61 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/counters/QueryFragmentCounters.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/counters/QueryFragmentCounters.java
@@ -23,6 +23,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
 import org.apache.hadoop.hive.llap.cache.LowLevelCacheCounters;
+import org.apache.tez.common.counters.TezCounters;
 
 /**
  * Per query counters.
@@ -30,24 +31,6 @@ import org.apache.hadoop.hive.llap.cache.LowLevelCacheCounters;
 public class QueryFragmentCounters implements LowLevelCacheCounters {
   private final boolean doUseTimeCounters;
 
-  public static enum Counter {
-    NUM_VECTOR_BATCHES,
-    NUM_DECODED_BATCHES,
-    SELECTED_ROWGROUPS,
-    NUM_ERRORS,
-    ROWS_EMITTED,
-    METADATA_CACHE_HIT,
-    METADATA_CACHE_MISS,
-    CACHE_HIT_BYTES,
-    CACHE_MISS_BYTES,
-    ALLOCATED_BYTES,
-    ALLOCATED_USED_BYTES,
-    TOTAL_IO_TIME_US,
-    DECODE_TIME_US,
-    HDFS_TIME_US,
-    CONSUMER_TIME_US
-  }
-
   public static enum Desc {
     MACHINE,
     TABLE,
@@ -57,25 +40,30 @@ public class QueryFragmentCounters implements LowLevelCacheCounters {
 
   private final AtomicLongArray fixedCounters;
   private final Object[] descs;
+  private final TezCounters tezCounters;
 
-  public QueryFragmentCounters(Configuration conf) {
-    fixedCounters = new AtomicLongArray(Counter.values().length);
+  public QueryFragmentCounters(Configuration conf, final TezCounters tezCounters) {
+    fixedCounters = new AtomicLongArray(LlapIOCounters.values().length);
     descs = new Object[Desc.values().length];
     doUseTimeCounters = HiveConf.getBoolVar(conf, ConfVars.LLAP_ORC_ENABLE_TIME_COUNTERS);
+    this.tezCounters = tezCounters;
     if (!doUseTimeCounters) {
-      setCounter(Counter.TOTAL_IO_TIME_US, -1);
-      setCounter(Counter.DECODE_TIME_US, -1);
-      setCounter(Counter.HDFS_TIME_US, -1);
-      setCounter(Counter.CONSUMER_TIME_US, -1);
+      setCounter(LlapIOCounters.TOTAL_IO_TIME_NS, -1);
+      setCounter(LlapIOCounters.DECODE_TIME_NS, -1);
+      setCounter(LlapIOCounters.HDFS_TIME_NS, -1);
+      setCounter(LlapIOCounters.CONSUMER_TIME_NS, -1);
     }
   }
 
-  public void incrCounter(Counter counter) {
+  public void incrCounter(LlapIOCounters counter) {
     incrCounter(counter, 1);
   }
 
-  public void incrCounter(Counter counter, long delta) {
+  public void incrCounter(LlapIOCounters counter, long delta) {
     fixedCounters.addAndGet(counter.ordinal(), delta);
+    if (tezCounters != null) {
+      tezCounters.findCounter(LlapIOCounters.values()[counter.ordinal()]).increment(delta);
+    }
   }
 
   @Override
@@ -83,13 +71,20 @@ public class QueryFragmentCounters implements LowLevelCacheCounters {
     return (doUseTimeCounters ? System.nanoTime() : 0);
   }
 
-  public void incrTimeCounter(Counter counter, long startTime) {
+  public void incrTimeCounter(LlapIOCounters counter, long startTime) {
     if (!doUseTimeCounters) return;
-    fixedCounters.addAndGet(counter.ordinal(), System.nanoTime() - startTime);
+    long delta = System.nanoTime() - startTime;
+    fixedCounters.addAndGet(counter.ordinal(), delta);
+    if (tezCounters != null) {
+      tezCounters.findCounter(LlapIOCounters.values()[counter.ordinal()]).increment(delta);
+    }
   }
 
-  public void setCounter(Counter counter, long value) {
+  public void setCounter(LlapIOCounters counter, long value) {
     fixedCounters.set(counter.ordinal(), value);
+    if (tezCounters != null) {
+      tezCounters.findCounter(LlapIOCounters.values()[counter.ordinal()]).setValue(value);
+    }
   }
 
   public void setDesc(Desc key, Object desc) {
@@ -98,23 +93,23 @@ public class QueryFragmentCounters implements LowLevelCacheCounters {
 
   @Override
   public void recordCacheHit(long bytesHit) {
-    incrCounter(Counter.CACHE_HIT_BYTES, bytesHit);
+    incrCounter(LlapIOCounters.CACHE_HIT_BYTES, bytesHit);
   }
 
   @Override
   public void recordCacheMiss(long bytesMissed) {
-    incrCounter(Counter.CACHE_MISS_BYTES, bytesMissed);
+    incrCounter(LlapIOCounters.CACHE_MISS_BYTES, bytesMissed);
   }
 
   @Override
   public void recordAllocBytes(long bytesUsed, long bytesAllocated) {
-    incrCounter(Counter.ALLOCATED_USED_BYTES, bytesUsed);
-    incrCounter(Counter.ALLOCATED_BYTES, bytesAllocated);
+    incrCounter(LlapIOCounters.ALLOCATED_USED_BYTES, bytesUsed);
+    incrCounter(LlapIOCounters.ALLOCATED_BYTES, bytesAllocated);
   }
 
   @Override
   public void recordHdfsTime(long startTime) {
-    incrTimeCounter(Counter.HDFS_TIME_US, startTime);
+    incrTimeCounter(LlapIOCounters.HDFS_TIME_NS, startTime);
   }
 
   @Override
@@ -135,7 +130,7 @@ public class QueryFragmentCounters implements LowLevelCacheCounters {
       if (i != 0) {
         sb.append(", ");
       }
-      sb.append(Counter.values()[i].name()).append("=").append(fixedCounters.get(i));
+      sb.append(LlapIOCounters.values()[i].name()).append("=").append(fixedCounters.get(i));
     }
     sb.append(" ]");
     return sb.toString();

http://git-wip-us.apache.org/repos/asf/hive/blob/b6023c79/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapTaskReporter.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapTaskReporter.java
b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapTaskReporter.java
index bb9f341..08c6f27 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapTaskReporter.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapTaskReporter.java
@@ -34,6 +34,7 @@ import java.util.concurrent.locks.Condition;
 import java.util.concurrent.locks.ReentrantLock;
 
 import org.apache.commons.lang.exception.ExceptionUtils;
+import org.apache.hadoop.hive.llap.counters.FragmentCountersMap;
 import org.apache.hadoop.hive.llap.protocol.LlapTaskUmbilicalProtocol;
 import org.apache.tez.common.counters.TezCounters;
 import org.apache.tez.dag.api.TezException;
@@ -43,11 +44,11 @@ import org.apache.tez.runtime.api.events.TaskAttemptCompletedEvent;
 import org.apache.tez.runtime.api.events.TaskAttemptFailedEvent;
 import org.apache.tez.runtime.api.events.TaskStatusUpdateEvent;
 import org.apache.tez.runtime.api.impl.EventMetaData;
+import org.apache.tez.runtime.api.impl.EventMetaData.EventProducerConsumerType;
 import org.apache.tez.runtime.api.impl.TaskStatistics;
 import org.apache.tez.runtime.api.impl.TezEvent;
 import org.apache.tez.runtime.api.impl.TezHeartbeatRequest;
 import org.apache.tez.runtime.api.impl.TezHeartbeatResponse;
-import org.apache.tez.runtime.api.impl.EventMetaData.EventProducerConsumerType;
 import org.apache.tez.runtime.internals.api.TaskReporterInterface;
 import org.apache.tez.runtime.task.ErrorReporter;
 import org.slf4j.Logger;
@@ -71,13 +72,13 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder;
 public class LlapTaskReporter implements TaskReporterInterface {
 
   private static final Logger LOG = LoggerFactory.getLogger(LlapTaskReporter.class);
-
   private final LlapTaskUmbilicalProtocol umbilical;
   private final long pollInterval;
   private final long sendCounterInterval;
   private final int maxEventsToGet;
   private final AtomicLong requestCounter;
   private final String containerIdStr;
+  private final String fragmentFullId;
 
   private final ListeningExecutorService heartbeatExecutor;
 
@@ -85,13 +86,15 @@ public class LlapTaskReporter implements TaskReporterInterface {
   HeartbeatCallable currentCallable;
 
   public LlapTaskReporter(LlapTaskUmbilicalProtocol umbilical, long amPollInterval,
-                      long sendCounterInterval, int maxEventsToGet, AtomicLong requestCounter,
String containerIdStr) {
+                      long sendCounterInterval, int maxEventsToGet, AtomicLong requestCounter,
+      String containerIdStr, final String fragFullId) {
     this.umbilical = umbilical;
     this.pollInterval = amPollInterval;
     this.sendCounterInterval = sendCounterInterval;
     this.maxEventsToGet = maxEventsToGet;
     this.requestCounter = requestCounter;
     this.containerIdStr = containerIdStr;
+    this.fragmentFullId = fragFullId;
     ExecutorService executor = Executors.newFixedThreadPool(1, new ThreadFactoryBuilder()
         .setDaemon(true).setNameFormat("TaskHeartbeatThread").build());
     heartbeatExecutor = MoreExecutors.listeningDecorator(executor);
@@ -103,6 +106,9 @@ public class LlapTaskReporter implements TaskReporterInterface {
   @Override
   public synchronized void registerTask(RuntimeTask task,
                                         ErrorReporter errorReporter) {
+    TezCounters tezCounters = task.addAndGetTezCounter(fragmentFullId);
+    FragmentCountersMap.registerCountersForFragment(fragmentFullId, tezCounters);
+    LOG.info("Registered counters for fragment: {} vertexName: {}", fragmentFullId, task.getVertexName());
     currentCallable = new HeartbeatCallable(task, umbilical, pollInterval, sendCounterInterval,
         maxEventsToGet, requestCounter, containerIdStr);
     ListenableFuture<Boolean> future = heartbeatExecutor.submit(currentCallable);
@@ -115,6 +121,8 @@ public class LlapTaskReporter implements TaskReporterInterface {
    */
   @Override
   public synchronized void unregisterTask(TezTaskAttemptID taskAttemptID) {
+    LOG.info("Unregistered counters for fragment: {}", fragmentFullId);
+    FragmentCountersMap.unregisterCountersForFragment(fragmentFullId);
     currentCallable.markComplete();
     currentCallable = null;
   }

http://git-wip-us.apache.org/repos/asf/hive/blob/b6023c79/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java
b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java
index d88d82a..a1cfbb8 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java
@@ -51,6 +51,10 @@ import org.apache.tez.common.TezCommonUtils;
 import org.apache.tez.common.security.JobTokenIdentifier;
 import org.apache.tez.common.security.TokenCache;
 import org.apache.tez.dag.api.TezConstants;
+import org.apache.tez.dag.records.TezDAGID;
+import org.apache.tez.dag.records.TezTaskAttemptID;
+import org.apache.tez.dag.records.TezTaskID;
+import org.apache.tez.dag.records.TezVertexID;
 import org.apache.tez.hadoop.shim.HadoopShim;
 import org.apache.tez.runtime.api.ExecutionContext;
 import org.apache.tez.runtime.api.impl.TaskSpec;
@@ -64,6 +68,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Joiner;
 import com.google.common.base.Stopwatch;
 import com.google.common.collect.HashMultimap;
 import com.google.common.collect.Multimap;
@@ -201,13 +206,20 @@ public class TaskRunnerCallable extends CallableWithNdc<TaskRunner2Result>
{
       }
     });
 
+    TezTaskAttemptID taskAttemptID = taskSpec.getTaskAttemptID();
+    TezTaskID taskId = taskAttemptID.getTaskID();
+    TezVertexID tezVertexID = taskId.getVertexID();
+    TezDAGID tezDAGID = tezVertexID.getDAGId();
+    String fragFullId = Joiner.on('_').join(tezDAGID.getId(), tezVertexID.getId(), taskId.getId(),
+        taskAttemptID.getId());
     taskReporter = new LlapTaskReporter(
         umbilical,
         confParams.amHeartbeatIntervalMsMax,
         confParams.amCounterHeartbeatInterval,
         confParams.amMaxEventsPerHeartbeat,
         new AtomicLong(0),
-        request.getContainerIdString());
+        request.getContainerIdString(),
+        fragFullId);
 
     String attemptId = fragmentInfo.getFragmentIdentifierString();
     IOContextMap.setThreadAttemptId(attemptId);

http://git-wip-us.apache.org/repos/asf/hive/blob/b6023c79/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapInputFormat.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapInputFormat.java
b/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapInputFormat.java
index a3d71c0..85cca97 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapInputFormat.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapInputFormat.java
@@ -27,15 +27,15 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.llap.ConsumerFeedback;
 import org.apache.hadoop.hive.llap.DebugUtils;
+import org.apache.hadoop.hive.llap.counters.FragmentCountersMap;
+import org.apache.hadoop.hive.llap.counters.LlapIOCounters;
 import org.apache.hadoop.hive.llap.counters.QueryFragmentCounters;
-import org.apache.hadoop.hive.llap.counters.QueryFragmentCounters.Counter;
 import org.apache.hadoop.hive.llap.io.decode.ColumnVectorProducer;
 import org.apache.hadoop.hive.llap.io.decode.ReadPipeline;
 import org.apache.hadoop.hive.ql.exec.Utilities;
 import org.apache.hadoop.hive.ql.exec.vector.VectorizedInputFormatInterface;
 import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
 import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatchCtx;
-import org.apache.hadoop.hive.ql.io.CombineHiveInputFormat;
 import org.apache.hadoop.hive.ql.io.CombineHiveInputFormat.AvoidSplitCombination;
 import org.apache.hadoop.hive.ql.io.LlapAwareSplit;
 import org.apache.hadoop.hive.ql.io.SelfDescribingInputFormatInterface;
@@ -53,7 +53,12 @@ import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.RecordReader;
 import org.apache.hadoop.mapred.Reporter;
 import org.apache.hive.common.util.HiveStringUtils;
+import org.apache.tez.common.counters.TezCounters;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
+import com.google.common.base.Joiner;
+import com.google.common.base.Preconditions;
 import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
@@ -120,6 +125,7 @@ public class LlapInputFormat implements InputFormat<NullWritable, VectorizedRowB
 
   private class LlapRecordReader
       implements RecordReader<NullWritable, VectorizedRowBatch>, Consumer<ColumnVectorBatch>
{
+    private final Logger LOG = LoggerFactory.getLogger(LlapRecordReader.class);
     private final FileSplit split;
     private final List<Integer> columnIds;
     private final SearchArgument sarg;
@@ -147,7 +153,21 @@ public class LlapInputFormat implements InputFormat<NullWritable,
VectorizedRowB
       this.columnIds = includedCols;
       this.sarg = ConvertAstToSearchArg.createFromConf(job);
       this.columnNames = ColumnProjectionUtils.getReadColumnNames(job);
-      this.counters = new QueryFragmentCounters(job);
+      String dagId = job.get("tez.mapreduce.dag.index");
+      String vertexId = job.get("tez.mapreduce.vertex.index");
+      String taskId = job.get("tez.mapreduce.task.index");
+      String taskAttemptId = job.get("tez.mapreduce.task.attempt.index");
+      TezCounters taskCounters = null;
+      if (dagId != null && vertexId != null && taskId != null &&
taskAttemptId != null) {
+        String fullId = Joiner.on('_').join(dagId, vertexId, taskId, taskAttemptId);
+        taskCounters = FragmentCountersMap.getCountersForFragment(fullId);
+        LOG.info("Received dagid_vertexid_taskid_attempid: {}", fullId);
+      } else {
+        LOG.warn("Not using tez counters as some identifier is null." +
+            " dagId: {} vertexId: {} taskId: {} taskAttempId: {}",
+            dagId, vertexId, taskId, taskAttemptId);
+      }
+      this.counters = new QueryFragmentCounters(job, taskCounters);
       this.counters.setDesc(QueryFragmentCounters.Desc.MACHINE, hostName);
 
       MapWork mapWork = Utilities.getMapWork(job);
@@ -192,7 +212,7 @@ public class LlapInputFormat implements InputFormat<NullWritable, VectorizedRowB
         if (wasFirst) {
           firstReturnTime = counters.startTimeCounter();
         }
-        counters.incrTimeCounter(Counter.CONSUMER_TIME_US, firstReturnTime);
+        counters.incrTimeCounter(LlapIOCounters.CONSUMER_TIME_NS, firstReturnTime);
         return false;
       }
       if (columnIds.size() != cvb.cols.length) {
@@ -330,7 +350,7 @@ public class LlapInputFormat implements InputFormat<NullWritable, VectorizedRowB
 
     @Override
     public void setError(Throwable t) {
-      counters.incrCounter(QueryFragmentCounters.Counter.NUM_ERRORS);
+      counters.incrCounter(LlapIOCounters.NUM_ERRORS);
       LlapIoImpl.LOG.info("setError called; closed " + isClosed
         + ", done " + isDone + ", err " + pendingError + ", pending " + pendingData.size());
       assert t != null;

http://git-wip-us.apache.org/repos/asf/hive/blob/b6023c79/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcEncodedDataConsumer.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcEncodedDataConsumer.java
b/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcEncodedDataConsumer.java
index 2597848..28cae87 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcEncodedDataConsumer.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcEncodedDataConsumer.java
@@ -21,6 +21,7 @@ import java.io.IOException;
 
 import org.apache.hadoop.hive.common.io.encoded.EncodedColumnBatch;
 import org.apache.hadoop.hive.common.io.encoded.EncodedColumnBatch.ColumnStreamData;
+import org.apache.hadoop.hive.llap.counters.LlapIOCounters;
 import org.apache.hadoop.hive.llap.counters.QueryFragmentCounters;
 import org.apache.hadoop.hive.llap.io.api.impl.ColumnVectorBatch;
 import org.apache.hadoop.hive.llap.io.metadata.OrcFileMetadata;
@@ -118,11 +119,11 @@ public class OrcEncodedDataConsumer
 
         // we are done reading a batch, send it to consumer for processing
         downstreamConsumer.consumeData(cvb);
-        counters.incrCounter(QueryFragmentCounters.Counter.ROWS_EMITTED, batchSize);
+        counters.incrCounter(LlapIOCounters.ROWS_EMITTED, batchSize);
       }
-      counters.incrTimeCounter(QueryFragmentCounters.Counter.DECODE_TIME_US, startTime);
-      counters.incrCounter(QueryFragmentCounters.Counter.NUM_VECTOR_BATCHES, maxBatchesRG);
-      counters.incrCounter(QueryFragmentCounters.Counter.NUM_DECODED_BATCHES);
+      counters.incrTimeCounter(LlapIOCounters.DECODE_TIME_NS, startTime);
+      counters.incrCounter(LlapIOCounters.NUM_VECTOR_BATCHES, maxBatchesRG);
+      counters.incrCounter(LlapIOCounters.NUM_DECODED_BATCHES);
     } catch (IOException e) {
       // Caller will return the batch.
       downstreamConsumer.setError(e);

http://git-wip-us.apache.org/repos/asf/hive/blob/b6023c79/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java
b/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java
index b36cf64..bcee56b 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java
@@ -25,6 +25,7 @@ import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 
+import org.apache.hadoop.hive.llap.counters.LlapIOCounters;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -47,7 +48,6 @@ import org.apache.hadoop.hive.llap.cache.Cache;
 import org.apache.hadoop.hive.llap.cache.LowLevelCache;
 import org.apache.hadoop.hive.llap.cache.LowLevelCache.Priority;
 import org.apache.hadoop.hive.llap.counters.QueryFragmentCounters;
-import org.apache.hadoop.hive.llap.counters.QueryFragmentCounters.Counter;
 import org.apache.hadoop.hive.llap.io.api.impl.LlapIoImpl;
 import org.apache.hadoop.hive.llap.io.decode.OrcEncodedDataConsumer;
 import org.apache.hadoop.hive.llap.io.metadata.OrcFileMetadata;
@@ -78,7 +78,6 @@ import org.apache.hadoop.hive.ql.io.orc.RecordReaderUtils;
 import org.apache.orc.StripeInformation;
 import org.apache.hadoop.hive.ql.io.sarg.SearchArgument;
 import org.apache.hadoop.mapred.FileSplit;
-import org.apache.hadoop.mapred.InputSplit;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hive.common.util.FixedSizedObjectPool;
 import org.apache.orc.OrcProto;
@@ -391,12 +390,12 @@ public class OrcEncodedDataReader extends CallableWithNdc<Void>
           }
           isFoundInCache = (stripeMetadata != null);
           if (!isFoundInCache) {
-            counters.incrCounter(Counter.METADATA_CACHE_MISS);
+            counters.incrCounter(LlapIOCounters.METADATA_CACHE_MISS);
             ensureMetadataReader();
             long startTimeHdfs = counters.startTimeCounter();
             stripeMetadata = new OrcStripeMetadata(new OrcBatchKey(fileId, stripeIx, 0),
                 metadataReader, stripe, stripeIncludes, sargColumns);
-            counters.incrTimeCounter(Counter.HDFS_TIME_US, startTimeHdfs);
+            counters.incrTimeCounter(LlapIOCounters.HDFS_TIME_NS, startTimeHdfs);
             if (hasFileId && metadataCache != null) {
               stripeMetadata = metadataCache.putStripeMetadata(stripeMetadata);
               if (DebugUtils.isTraceOrcEnabled()) {
@@ -413,11 +412,11 @@ public class OrcEncodedDataReader extends CallableWithNdc<Void>
                 + " metadata for includes: " + DebugUtils.toString(stripeIncludes));
           }
           assert isFoundInCache;
-          counters.incrCounter(Counter.METADATA_CACHE_MISS);
+          counters.incrCounter(LlapIOCounters.METADATA_CACHE_MISS);
           ensureMetadataReader();
           updateLoadedIndexes(stripeMetadata, stripe, stripeIncludes, sargColumns);
         } else if (isFoundInCache) {
-          counters.incrCounter(Counter.METADATA_CACHE_HIT);
+          counters.incrCounter(LlapIOCounters.METADATA_CACHE_HIT);
         }
       } catch (Throwable t) {
         consumer.setError(t);
@@ -462,7 +461,7 @@ public class OrcEncodedDataReader extends CallableWithNdc<Void>
   }
 
   private void recordReaderTime(long startTime) {
-    counters.incrTimeCounter(Counter.TOTAL_IO_TIME_US, startTime);
+    counters.incrTimeCounter(LlapIOCounters.TOTAL_IO_TIME_NS, startTime);
   }
 
   private static String getDbAndTableName(Path path) {
@@ -571,7 +570,7 @@ public class OrcEncodedDataReader extends CallableWithNdc<Void>
       if (stripeMetadata.hasAllIndexes(stripeIncludes)) return;
       long startTime = counters.startTimeCounter();
       stripeMetadata.loadMissingIndexes(metadataReader, stripe, stripeIncludes, sargColumns);
-      counters.incrTimeCounter(Counter.HDFS_TIME_US, startTime);
+      counters.incrTimeCounter(LlapIOCounters.HDFS_TIME_NS, startTime);
     }
   }
 
@@ -610,7 +609,7 @@ public class OrcEncodedDataReader extends CallableWithNdc<Void>
     long startTime = counters.startTimeCounter();
     ReaderOptions opts = OrcFile.readerOptions(conf).filesystem(fs).fileMetadata(fileMetadata);
     orcReader = EncodedOrcFile.createReader(path, opts);
-    counters.incrTimeCounter(Counter.HDFS_TIME_US, startTime);
+    counters.incrTimeCounter(LlapIOCounters.HDFS_TIME_NS, startTime);
   }
 
   /**
@@ -621,10 +620,10 @@ public class OrcEncodedDataReader extends CallableWithNdc<Void>
     if (fileId != null && metadataCache != null) {
       metadata = metadataCache.getFileMetadata(fileId);
       if (metadata != null) {
-        counters.incrCounter(Counter.METADATA_CACHE_HIT);
+        counters.incrCounter(LlapIOCounters.METADATA_CACHE_HIT);
         return metadata;
       } else {
-        counters.incrCounter(Counter.METADATA_CACHE_MISS);
+        counters.incrCounter(LlapIOCounters.METADATA_CACHE_MISS);
       }
     }
     ensureOrcReader();
@@ -651,14 +650,14 @@ public class OrcEncodedDataReader extends CallableWithNdc<Void>
         value = metadataCache.getStripeMetadata(stripeKey);
       }
       if (value == null || !value.hasAllIndexes(globalInc)) {
-        counters.incrCounter(Counter.METADATA_CACHE_MISS);
+        counters.incrCounter(LlapIOCounters.METADATA_CACHE_MISS);
         ensureMetadataReader();
         StripeInformation si = fileMetadata.getStripes().get(stripeIx);
         if (value == null) {
           long startTime = counters.startTimeCounter();
           value = new OrcStripeMetadata(new OrcBatchKey(fileId, stripeIx, 0),
               metadataReader, si, globalInc, sargColumns);
-          counters.incrTimeCounter(Counter.HDFS_TIME_US, startTime);
+          counters.incrTimeCounter(LlapIOCounters.HDFS_TIME_NS, startTime);
           if (hasFileId && metadataCache != null) {
             value = metadataCache.putStripeMetadata(value);
             if (DebugUtils.isTraceOrcEnabled()) {
@@ -676,7 +675,7 @@ public class OrcEncodedDataReader extends CallableWithNdc<Void>
           updateLoadedIndexes(value, si, globalInc, sargColumns);
         }
       } else {
-        counters.incrCounter(Counter.METADATA_CACHE_HIT);
+        counters.incrCounter(LlapIOCounters.METADATA_CACHE_HIT);
       }
       result.add(value);
       consumer.setStripeMetadata(value);
@@ -689,7 +688,7 @@ public class OrcEncodedDataReader extends CallableWithNdc<Void>
     if (metadataReader != null) return;
     long startTime = counters.startTimeCounter();
     metadataReader = orcReader.metadata();
-    counters.incrTimeCounter(Counter.HDFS_TIME_US, startTime);
+    counters.incrTimeCounter(LlapIOCounters.HDFS_TIME_NS, startTime);
   }
 
   @Override
@@ -772,7 +771,7 @@ public class OrcEncodedDataReader extends CallableWithNdc<Void>
     } else if (!isNone) {
       count = rgCount;
     }
-    counters.setCounter(QueryFragmentCounters.Counter.SELECTED_ROWGROUPS, count);
+    counters.setCounter(LlapIOCounters.SELECTED_ROWGROUPS, count);
   }
 
 

http://git-wip-us.apache.org/repos/asf/hive/blob/b6023c79/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezJobMonitor.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezJobMonitor.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezJobMonitor.java
index c8d135e..418a03e 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezJobMonitor.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezJobMonitor.java
@@ -26,7 +26,6 @@ import java.io.InterruptedIOException;
 import java.io.PrintStream;
 import java.text.DecimalFormat;
 import java.text.NumberFormat;
-import java.util.Collections;
 import java.util.EnumSet;
 import java.util.HashSet;
 import java.util.LinkedList;
@@ -38,6 +37,7 @@ import java.util.SortedSet;
 import java.util.TreeSet;
 
 import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.llap.counters.LlapIOCounters;
 import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
 import org.apache.hadoop.hive.ql.exec.MapOperator;
 import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator;
@@ -72,9 +72,13 @@ public class TezJobMonitor {
 
   private static final int COLUMN_1_WIDTH = 16;
   private static final int SEPARATOR_WIDTH = InPlaceUpdates.MIN_TERMINAL_WIDTH;
+  private static final String SEPARATOR = new String(new char[SEPARATOR_WIDTH]).replace("\0",
"-");
+  private static final String PREP_SUMMARY_HEADER = "DAG Preparation Summary";
+  private static final String TASK_SUMMARY_HEADER = "Task Execution Summary";
+  private static final String LLAP_IO_SUMMARY_HEADER = "LLAP IO Summary";
 
   // keep this within 80 chars width. If more columns needs to be added then update min terminal
-  // width requirement and separator width accordingly
+  // width requirement and SEPARATOR width accordingly
   private static final String HEADER_FORMAT = "%16s%10s %13s  %5s  %9s  %7s  %7s  %6s  %6s
 ";
   private static final String VERTEX_FORMAT = "%-16s%10s %13s  %5s  %9s  %7s  %7s  %6s  %6s
 ";
   private static final String FOOTER_FORMAT = "%-15s  %-30s %-4s  %-25s";
@@ -82,12 +86,15 @@ public class TezJobMonitor {
       "VERTICES", "MODE", "STATUS", "TOTAL", "COMPLETED", "RUNNING", "PENDING", "FAILED",
"KILLED");
 
   // method and dag summary format
-  private static final String SUMMARY_HEADER_FORMAT = "%-16s %-12s %-12s %-12s %-19s %-19s
%-15s %-15s %-15s";
-  private static final String SUMMARY_VERTEX_FORMAT = "%-16s %11s %16s %12s %16s %18s %18s
%14s %16s";
+  private static final String SUMMARY_HEADER_FORMAT = "%10s %14s %13s %12s %14s %15s";
   private static final String SUMMARY_HEADER = String.format(SUMMARY_HEADER_FORMAT,
-      "VERTICES", "TOTAL_TASKS", "FAILED_ATTEMPTS", "KILLED_TASKS", "DURATION_SECONDS",
-      "CPU_TIME_MILLIS", "GC_TIME_MILLIS", "INPUT_RECORDS", "OUTPUT_RECORDS");
+      "VERTICES", "DURATION(ms)", "CPU_TIME(ms)", "GC_TIME(ms)", "INPUT_RECORDS", "OUTPUT_RECORDS");
 
+  // LLAP counters
+  private static final String LLAP_SUMMARY_HEADER_FORMAT = "%10s %9s %9s %10s %9s %10s %11s
%8s %9s";
+  private static final String LLAP_SUMMARY_HEADER = String.format(LLAP_SUMMARY_HEADER_FORMAT,
+      "VERTICES", "ROWGROUPS", "META_HIT", "META_MISS", "DATA_HIT", "DATA_MISS",
+      "ALLOCATION", "USED", "TOTAL_IO");
   private static final String TOTAL_PREP_TIME = "TotalPrepTime";
   private static final String METHOD = "METHOD";
   private static final String DURATION = "DURATION(ms)";
@@ -95,7 +102,6 @@ public class TezJobMonitor {
   // in-place progress update related variables
   private int lines;
   private final PrintStream out;
-  private String separator;
 
   private transient LogHelper console;
   private final PerfLogger perfLogger = SessionState.getPerfLogger();
@@ -142,10 +148,6 @@ public class TezJobMonitor {
     // all progress updates are written to info stream and log file. In-place updates can
only be
     // done to info stream (console)
     out = console.getInfoStream();
-    separator = "";
-    for (int i = 0; i < SEPARATOR_WIDTH; i++) {
-      separator += "-";
-    }
   }
 
   /**
@@ -219,7 +221,8 @@ public class TezJobMonitor {
     Set<StatusGetOpts> opts = new HashSet<StatusGetOpts>();
     long startTime = 0;
     boolean isProfileEnabled = HiveConf.getBoolVar(conf, HiveConf.ConfVars.TEZ_EXEC_SUMMARY)
||
-      Utilities.isPerfOrAboveLogging(conf);
+        Utilities.isPerfOrAboveLogging(conf);
+    boolean llapIoEnabled = HiveConf.getBoolVar(conf, HiveConf.ConfVars.LLAP_IO_ENABLED);
 
     boolean inPlaceEligible = InPlaceUpdates.inPlaceEligible(conf);
     synchronized(shutdownList) {
@@ -285,8 +288,23 @@ public class TezJobMonitor {
                   + String.format("%.2f seconds", duration));
               console.printInfo("\n");
 
+              console.printInfo(PREP_SUMMARY_HEADER);
               printMethodsSummary();
+              console.printInfo(SEPARATOR);
+              console.printInfo("");
+
+              console.printInfo(TASK_SUMMARY_HEADER);
               printDagSummary(progressMap, console, dagClient, conf, dag);
+              console.printInfo(SEPARATOR);
+              console.printInfo("");
+
+              if (llapIoEnabled) {
+                console.printInfo(LLAP_IO_SUMMARY_HEADER);
+                printLlapIOSummary(progressMap, console, dagClient);
+                console.printInfo(SEPARATOR);
+              }
+
+              console.printInfo("\n");
             }
             running = false;
             done = true;
@@ -408,7 +426,9 @@ public class TezJobMonitor {
 
     /* Build the method summary header */
     String methodBreakdownHeader = String.format("%-30s %-13s", METHOD, DURATION);
-    console.printInfo(methodBreakdownHeader);
+    console.printInfo(SEPARATOR);
+    reprintLineWithColorAsBold(methodBreakdownHeader, Ansi.Color.CYAN);
+    console.printInfo(SEPARATOR);
 
     for (String method : perfLoggerReportMethods) {
       long duration = perfLogger.getDuration(method);
@@ -423,7 +443,7 @@ public class TezJobMonitor {
     totalInPrepTime = perfLogger.getStartTime(PerfLogger.TEZ_RUN_DAG) -
         perfLogger.getStartTime(PerfLogger.TIME_TO_SUBMIT);
 
-    console.printInfo(String.format("%-30s %11s\n", TOTAL_PREP_TIME, commaFormat.format(
+    console.printInfo(String.format("%-30s %11s", TOTAL_PREP_TIME, commaFormat.format(
         totalInPrepTime)));
   }
 
@@ -448,18 +468,16 @@ public class TezJobMonitor {
     }
 
     /* Print the per Vertex summary */
-    console.printInfo(SUMMARY_HEADER);
+    console.printInfo(SEPARATOR);
+    reprintLineWithColorAsBold(SUMMARY_HEADER, Ansi.Color.CYAN);
+    console.printInfo(SEPARATOR);
     SortedSet<String> keys = new TreeSet<String>(progressMap.keySet());
     Set<StatusGetOpts> statusOptions = new HashSet<StatusGetOpts>(1);
     statusOptions.add(StatusGetOpts.GET_COUNTERS);
     for (String vertexName : keys) {
       Progress progress = progressMap.get(vertexName);
       if (progress != null) {
-        final int totalTasks = progress.getTotalTaskCount();
-        final int failedTaskAttempts = progress.getFailedTaskAttemptCount();
-        final int killedTaskAttempts = progress.getKilledTaskAttemptCount();
-        final double duration =
-            perfLogger.getDuration(PerfLogger.TEZ_RUN_VERTEX + vertexName) / 1000.0;
+        final double duration = perfLogger.getDuration(PerfLogger.TEZ_RUN_VERTEX + vertexName);
         VertexStatus vertexStatus = null;
         try {
           vertexStatus = dagClient.getVertexStatus(vertexName, statusOptions);
@@ -540,11 +558,8 @@ public class TezJobMonitor {
                     + vertexName.replace(" ", "_"))
                 + hiveOutputIntermediateRecords;
 
-        String vertexExecutionStats = String.format(SUMMARY_VERTEX_FORMAT,
+        String vertexExecutionStats = String.format(SUMMARY_HEADER_FORMAT,
             vertexName,
-            totalTasks,
-            failedTaskAttempts,
-            killedTaskAttempts,
             secondsFormat.format((duration)),
             commaFormat.format(cpuTimeMillis),
             commaFormat.format(gcTimeMillis),
@@ -555,6 +570,71 @@ public class TezJobMonitor {
     }
   }
 
+
+  private String humanReadableByteCount(long bytes) {
+    int unit = 1000; // use binary units instead?
+    if (bytes < unit) {
+      return bytes + "B";
+    }
+    int exp = (int) (Math.log(bytes) / Math.log(unit));
+    String suffix = "KMGTPE".charAt(exp-1) + "";
+    return String.format("%.2f%sB", bytes / Math.pow(unit, exp), suffix);
+  }
+
+  private void printLlapIOSummary(Map<String, Progress> progressMap, LogHelper console,
+      DAGClient dagClient) throws Exception {
+    SortedSet<String> keys = new TreeSet<>(progressMap.keySet());
+    Set<StatusGetOpts> statusOptions = new HashSet<>(1);
+    statusOptions.add(StatusGetOpts.GET_COUNTERS);
+    boolean first = false;
+    String counterGroup = LlapIOCounters.class.getName();
+    for (String vertexName : keys) {
+      // Reducers do not benefit from LLAP IO so no point in printing
+      if (vertexName.startsWith("Reducer")) {
+        continue;
+      }
+      TezCounters vertexCounters = dagClient.getVertexStatus(vertexName, statusOptions)
+          .getVertexCounters();
+      if (vertexCounters != null) {
+        final long selectedRowgroups = getCounterValueByGroupName(vertexCounters,
+            counterGroup, LlapIOCounters.SELECTED_ROWGROUPS.name());
+        final long metadataCacheHit = getCounterValueByGroupName(vertexCounters,
+            counterGroup, LlapIOCounters.METADATA_CACHE_HIT.name());
+        final long metadataCacheMiss = getCounterValueByGroupName(vertexCounters,
+            counterGroup, LlapIOCounters.METADATA_CACHE_MISS.name());
+        final long cacheHitBytes = getCounterValueByGroupName(vertexCounters,
+            counterGroup, LlapIOCounters.CACHE_HIT_BYTES.name());
+        final long cacheMissBytes = getCounterValueByGroupName(vertexCounters,
+            counterGroup, LlapIOCounters.CACHE_MISS_BYTES.name());
+        final long allocatedBytes = getCounterValueByGroupName(vertexCounters,
+            counterGroup, LlapIOCounters.ALLOCATED_BYTES.name());
+        final long allocatedUsedBytes = getCounterValueByGroupName(vertexCounters,
+            counterGroup, LlapIOCounters.ALLOCATED_USED_BYTES.name());
+        final long totalIoTime = getCounterValueByGroupName(vertexCounters,
+            counterGroup, LlapIOCounters.TOTAL_IO_TIME_NS.name());
+
+        if (!first) {
+          console.printInfo(SEPARATOR);
+          reprintLineWithColorAsBold(LLAP_SUMMARY_HEADER, Ansi.Color.CYAN);
+          console.printInfo(SEPARATOR);
+          first = true;
+        }
+
+        String queryFragmentStats = String.format(LLAP_SUMMARY_HEADER_FORMAT,
+            vertexName,
+            selectedRowgroups,
+            metadataCacheHit,
+            metadataCacheMiss,
+            humanReadableByteCount(cacheHitBytes),
+            humanReadableByteCount(cacheMissBytes),
+            humanReadableByteCount(allocatedBytes),
+            humanReadableByteCount(allocatedUsedBytes),
+            secondsFormat.format(totalIoTime / 1000_000_000.0) + "s");
+        console.printInfo(queryFragmentStats);
+      }
+    }
+  }
+
   private void printStatusInPlace(Map<String, Progress> progressMap, long startTime,
       boolean vextexStatusFromAM, DAGClient dagClient) {
     StringBuilder reportBuffer = new StringBuilder();
@@ -568,9 +648,9 @@ public class TezJobMonitor {
     // -------------------------------------------------------------------------------
     //         VERTICES     STATUS  TOTAL  COMPLETED  RUNNING  PENDING  FAILED  KILLED
     // -------------------------------------------------------------------------------
-    reprintLine(separator);
+    reprintLine(SEPARATOR);
     reprintLineWithColorAsBold(HEADER, Ansi.Color.CYAN);
-    reprintLine(separator);
+    reprintLine(SEPARATOR);
 
     SortedSet<String> keys = new TreeSet<String>(progressMap.keySet());
     int idx = 0;
@@ -663,11 +743,11 @@ public class TezJobMonitor {
     // -------------------------------------------------------------------------------
     // VERTICES: 03/04            [=================>>-----] 86%  ELAPSED TIME: 1.71
s
     // -------------------------------------------------------------------------------
-    reprintLine(separator);
+    reprintLine(SEPARATOR);
     final float progress = (sumTotal == 0) ? 0.0f : (float) sumComplete / (float) sumTotal;
     String footer = getFooter(keys.size(), completed.size(), progress, startTime);
     reprintLineWithColorAsBold(footer, Ansi.Color.RED);
-    reprintLine(separator);
+    reprintLine(SEPARATOR);
   }
 
   private String getMode(String name, Map<String, BaseWork> workMap) {


Mime
View raw message