hive-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From prasan...@apache.org
Subject [3/3] hive git commit: HIVE-13258: LLAP: Add hdfs bytes read and spilled bytes to tez print summary (Prasanth Jayachandran reviewed by Siddharth Seth)
Date Wed, 13 Jul 2016 16:42:39 GMT
HIVE-13258: LLAP: Add hdfs bytes read and spilled bytes to tez print summary (Prasanth Jayachandran reviewed by Siddharth Seth)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/5b2c36a4
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/5b2c36a4
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/5b2c36a4

Branch: refs/heads/master
Commit: 5b2c36a4a88d279f134f3a8e5c928bb1473b9673
Parents: f9adb4a
Author: Prasanth Jayachandran <prasanthj@apache.org>
Authored: Wed Jul 13 09:42:22 2016 -0700
Committer: Prasanth Jayachandran <prasanthj@apache.org>
Committed: Wed Jul 13 09:42:22 2016 -0700

----------------------------------------------------------------------
 itests/qtest/pom.xml                            |    2 +-
 .../test/resources/testconfiguration.properties |    3 +
 .../org/apache/hadoop/hive/llap/LlapUtil.java   |  108 ++
 .../hive/llap/counters/LlapIOCounters.java      |   50 +-
 .../llap/counters/QueryFragmentCounters.java    |    4 +
 .../hive/llap/daemon/impl/LlapTaskReporter.java |   16 +-
 .../daemon/impl/StatsRecordingThreadPool.java   |  189 +++
 .../llap/daemon/impl/TaskRunnerCallable.java    |   22 +-
 .../hive/llap/io/api/impl/LlapInputFormat.java  |   48 +-
 .../hive/llap/io/api/impl/LlapIoImpl.java       |   13 +-
 .../llap/io/encoded/OrcEncodedDataReader.java   |    5 +
 .../hive/llap/tezplugins/LlapTezUtils.java      |   28 +
 .../hadoop/hive/ql/exec/tez/TezJobMonitor.java  |   66 +
 .../ql/hooks/PostExecTezSummaryPrinter.java     |   22 +-
 .../queries/clientpositive/orc_llap_counters.q  |  182 +++
 .../queries/clientpositive/orc_llap_counters1.q |   83 ++
 .../results/clientpositive/llap/orc_llap.q.out  |  252 ++--
 .../clientpositive/llap/orc_llap_counters.q.out | 1301 ++++++++++++++++++
 .../llap/orc_llap_counters1.q.out               |  331 +++++
 .../clientpositive/llap/orc_ppd_basic.q.out     |  551 ++++++++
 .../clientpositive/tez/orc_ppd_basic.q.out      |  252 ++++
 21 files changed, 3374 insertions(+), 154 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/5b2c36a4/itests/qtest/pom.xml
----------------------------------------------------------------------
diff --git a/itests/qtest/pom.xml b/itests/qtest/pom.xml
index 306a0ed..17968e6 100644
--- a/itests/qtest/pom.xml
+++ b/itests/qtest/pom.xml
@@ -545,7 +545,7 @@
                   logFile="${project.build.directory}/testminitezclidrivergen.log"
                   logDirectory="${project.build.directory}/qfile-results/clientpositive/"
                   hadoopVersion="${hadoop.version}"
-                  initScript="q_test_init.sql"
+		  initScript="${initScript}"
                   cleanupScript="q_test_cleanup.sql"/>
 
                 <qtestgen hiveRootDirectory="${basedir}/${hive.path.to.root}/"

http://git-wip-us.apache.org/repos/asf/hive/blob/5b2c36a4/itests/src/test/resources/testconfiguration.properties
----------------------------------------------------------------------
diff --git a/itests/src/test/resources/testconfiguration.properties b/itests/src/test/resources/testconfiguration.properties
index eb0d1d7..73fcb03 100644
--- a/itests/src/test/resources/testconfiguration.properties
+++ b/itests/src/test/resources/testconfiguration.properties
@@ -526,6 +526,9 @@ minillap.shared.query.files=bucket_map_join_tez1.q,\
 
 
 minillap.query.files=llap_udf.q,\
+  orc_llap.q,\
+  orc_llap_counters.q,\
+  orc_llap_counters1.q,\
   orc_llap_nonvector.q
 
 

http://git-wip-us.apache.org/repos/asf/hive/blob/5b2c36a4/llap-common/src/java/org/apache/hadoop/hive/llap/LlapUtil.java
----------------------------------------------------------------------
diff --git a/llap-common/src/java/org/apache/hadoop/hive/llap/LlapUtil.java b/llap-common/src/java/org/apache/hadoop/hive/llap/LlapUtil.java
index 505ddb1..298be76 100644
--- a/llap-common/src/java/org/apache/hadoop/hive/llap/LlapUtil.java
+++ b/llap-common/src/java/org/apache/hadoop/hive/llap/LlapUtil.java
@@ -14,9 +14,14 @@
 package org.apache.hadoop.hive.llap;
 
 import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
 import java.util.regex.Pattern;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
 import org.apache.hadoop.security.SecurityUtil;
@@ -56,4 +61,107 @@ public class LlapUtil {
     String[] components = principal.split("[/@]");
     return (components == null || components.length != 3) ? principal : components[0];
   }
+
+  public static List<StatisticsData> getStatisticsForScheme(final String scheme,
+      final List<StatisticsData> stats) {
+    List<StatisticsData> result = new ArrayList<>();
+    if (stats != null && scheme != null) {
+      for (StatisticsData s : stats) {
+        if (s.getScheme().equalsIgnoreCase(scheme)) {
+          result.add(s);
+        }
+      }
+    }
+    return result;
+  }
+
+  public static Map<String, FileSystem.Statistics> getCombinedFileSystemStatistics() {
+    final List<FileSystem.Statistics> allStats = FileSystem.getAllStatistics();
+    final Map<String, FileSystem.Statistics> result = new HashMap<>();
+    for (FileSystem.Statistics statistics : allStats) {
+      final String scheme = statistics.getScheme();
+      if (result.containsKey(scheme)) {
+        FileSystem.Statistics existing = result.get(scheme);
+        FileSystem.Statistics combined = combineFileSystemStatistics(existing, statistics);
+        result.put(scheme, combined);
+      } else {
+        result.put(scheme, statistics);
+      }
+    }
+    return result;
+  }
+
+  private static FileSystem.Statistics combineFileSystemStatistics(final FileSystem.Statistics s1,
+      final FileSystem.Statistics s2) {
+    FileSystem.Statistics result = new FileSystem.Statistics(s1);
+    result.incrementReadOps(s2.getReadOps());
+    result.incrementLargeReadOps(s2.getLargeReadOps());
+    result.incrementWriteOps(s2.getWriteOps());
+    result.incrementBytesRead(s2.getBytesRead());
+    result.incrementBytesWritten(s2.getBytesWritten());
+    return result;
+  }
+
+  public static List<StatisticsData> cloneThreadLocalFileSystemStatistics() {
+    List<StatisticsData> result = new ArrayList<>();
+    // thread local filesystem stats is private and cannot be cloned. So make a copy to new class
+    for (FileSystem.Statistics statistics : FileSystem.getAllStatistics()) {
+      result.add(new StatisticsData(statistics.getScheme(), statistics.getThreadStatistics()));
+    }
+    return result;
+  }
+
+  public static class StatisticsData {
+    long bytesRead;
+    long bytesWritten;
+    int readOps;
+    int largeReadOps;
+    int writeOps;
+    String scheme;
+
+    public StatisticsData(String scheme, FileSystem.Statistics.StatisticsData fsStats) {
+      this.scheme = scheme;
+      this.bytesRead = fsStats.getBytesRead();
+      this.bytesWritten = fsStats.getBytesWritten();
+      this.readOps = fsStats.getReadOps();
+      this.largeReadOps = fsStats.getLargeReadOps();
+      this.writeOps = fsStats.getWriteOps();
+    }
+
+    public long getBytesRead() {
+      return bytesRead;
+    }
+
+    public long getBytesWritten() {
+      return bytesWritten;
+    }
+
+    public int getReadOps() {
+      return readOps;
+    }
+
+    public int getLargeReadOps() {
+      return largeReadOps;
+    }
+
+    public int getWriteOps() {
+      return writeOps;
+    }
+
+    public String getScheme() {
+      return scheme;
+    }
+
+    @Override
+    public String toString() {
+      StringBuilder sb = new StringBuilder();
+      sb.append(" scheme: ").append(scheme);
+      sb.append(" bytesRead: ").append(bytesRead);
+      sb.append(" bytesWritten: ").append(bytesWritten);
+      sb.append(" readOps: ").append(readOps);
+      sb.append(" largeReadOps: ").append(largeReadOps);
+      sb.append(" writeOps: ").append(writeOps);
+      return sb.toString();
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/5b2c36a4/llap-common/src/java/org/apache/hadoop/hive/llap/counters/LlapIOCounters.java
----------------------------------------------------------------------
diff --git a/llap-common/src/java/org/apache/hadoop/hive/llap/counters/LlapIOCounters.java b/llap-common/src/java/org/apache/hadoop/hive/llap/counters/LlapIOCounters.java
index 365ddab..1ed23ba 100644
--- a/llap-common/src/java/org/apache/hadoop/hive/llap/counters/LlapIOCounters.java
+++ b/llap-common/src/java/org/apache/hadoop/hive/llap/counters/LlapIOCounters.java
@@ -15,23 +15,43 @@
  */
 package org.apache.hadoop.hive.llap.counters;
 
+import java.util.ArrayList;
+import java.util.List;
+
 /**
  * LLAP IO related counters.
  */
 public enum LlapIOCounters {
-  NUM_VECTOR_BATCHES,
-  NUM_DECODED_BATCHES,
-  SELECTED_ROWGROUPS,
-  NUM_ERRORS,
-  ROWS_EMITTED,
-  METADATA_CACHE_HIT,
-  METADATA_CACHE_MISS,
-  CACHE_HIT_BYTES,
-  CACHE_MISS_BYTES,
-  ALLOCATED_BYTES,
-  ALLOCATED_USED_BYTES,
-  TOTAL_IO_TIME_NS,
-  DECODE_TIME_NS,
-  HDFS_TIME_NS,
-  CONSUMER_TIME_NS
+  NUM_VECTOR_BATCHES(true),
+  NUM_DECODED_BATCHES(true),
+  SELECTED_ROWGROUPS(true),
+  NUM_ERRORS(true),
+  ROWS_EMITTED(true),
+  METADATA_CACHE_HIT(true),
+  METADATA_CACHE_MISS(true),
+  CACHE_HIT_BYTES(true),
+  CACHE_MISS_BYTES(true),
+  ALLOCATED_BYTES(true),
+  ALLOCATED_USED_BYTES(true),
+  TOTAL_IO_TIME_NS(false),
+  DECODE_TIME_NS(false),
+  HDFS_TIME_NS(false),
+  CONSUMER_TIME_NS(false);
+
+  // flag to indicate if these counters are subject to change across different test runs
+  private boolean testSafe;
+
+  LlapIOCounters(final boolean testSafe) {
+    this.testSafe = testSafe;
+  }
+
+  public static List<String> testSafeCounterNames() {
+    List<String> testSafeCounters = new ArrayList<>();
+    for (LlapIOCounters counter : values()) {
+      if (counter.testSafe) {
+        testSafeCounters.add(counter.name());
+      }
+    }
+    return testSafeCounters;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/5b2c36a4/llap-server/src/java/org/apache/hadoop/hive/llap/counters/QueryFragmentCounters.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/counters/QueryFragmentCounters.java b/llap-server/src/java/org/apache/hadoop/hive/llap/counters/QueryFragmentCounters.java
index a53ac61..0c858eb 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/counters/QueryFragmentCounters.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/counters/QueryFragmentCounters.java
@@ -135,4 +135,8 @@ public class QueryFragmentCounters implements LowLevelCacheCounters {
     sb.append(" ]");
     return sb.toString();
   }
+
+  public TezCounters getTezCounters() {
+    return tezCounters;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/5b2c36a4/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapTaskReporter.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapTaskReporter.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapTaskReporter.java
index 9c34ada..39b4b0e 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapTaskReporter.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapTaskReporter.java
@@ -80,7 +80,7 @@ public class LlapTaskReporter implements TaskReporterInterface {
   private final int maxEventsToGet;
   private final AtomicLong requestCounter;
   private final String containerIdStr;
-  private final String fragmentFullId;
+  private final String fragmentId;
   private final TezEvent initialEvent;
 
   private final ListeningExecutorService heartbeatExecutor;
@@ -90,14 +90,14 @@ public class LlapTaskReporter implements TaskReporterInterface {
 
   public LlapTaskReporter(LlapTaskUmbilicalProtocol umbilical, long amPollInterval,
                       long sendCounterInterval, int maxEventsToGet, AtomicLong requestCounter,
-      String containerIdStr, final String fragFullId, TezEvent initialEvent) {
+      String containerIdStr, final String fragmentId, TezEvent initialEvent) {
     this.umbilical = umbilical;
     this.pollInterval = amPollInterval;
     this.sendCounterInterval = sendCounterInterval;
     this.maxEventsToGet = maxEventsToGet;
     this.requestCounter = requestCounter;
     this.containerIdStr = containerIdStr;
-    this.fragmentFullId = fragFullId;
+    this.fragmentId = fragmentId;
     this.initialEvent = initialEvent;
     ExecutorService executor = Executors.newFixedThreadPool(1, new ThreadFactoryBuilder()
         .setDaemon(true).setNameFormat("TaskHeartbeatThread").build());
@@ -110,9 +110,9 @@ public class LlapTaskReporter implements TaskReporterInterface {
   @Override
   public synchronized void registerTask(RuntimeTask task,
                                         ErrorReporter errorReporter) {
-    TezCounters tezCounters = task.addAndGetTezCounter(fragmentFullId);
-    FragmentCountersMap.registerCountersForFragment(fragmentFullId, tezCounters);
-    LOG.info("Registered counters for fragment: {} vertexName: {}", fragmentFullId, task.getVertexName());
+    TezCounters tezCounters = task.addAndGetTezCounter(fragmentId);
+    FragmentCountersMap.registerCountersForFragment(fragmentId, tezCounters);
+    LOG.info("Registered counters for fragment: {} vertexName: {}", fragmentId, task.getVertexName());
     currentCallable = new HeartbeatCallable(task, umbilical, pollInterval, sendCounterInterval,
         maxEventsToGet, requestCounter, containerIdStr, initialEvent);
     ListenableFuture<Boolean> future = heartbeatExecutor.submit(currentCallable);
@@ -125,8 +125,8 @@ public class LlapTaskReporter implements TaskReporterInterface {
    */
   @Override
   public synchronized void unregisterTask(TezTaskAttemptID taskAttemptID) {
-    LOG.info("Unregistered counters for fragment: {}", fragmentFullId);
-    FragmentCountersMap.unregisterCountersForFragment(fragmentFullId);
+    LOG.info("Unregistered counters for fragment: {}", fragmentId);
+    FragmentCountersMap.unregisterCountersForFragment(fragmentId);
     currentCallable.markComplete();
     currentCallable = null;
   }

http://git-wip-us.apache.org/repos/asf/hive/blob/5b2c36a4/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/StatsRecordingThreadPool.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/StatsRecordingThreadPool.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/StatsRecordingThreadPool.java
new file mode 100644
index 0000000..9b3ce7e
--- /dev/null
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/StatsRecordingThreadPool.java
@@ -0,0 +1,189 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.llap.daemon.impl;
+
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Callable;
+import java.util.concurrent.FutureTask;
+import java.util.concurrent.RunnableFuture;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.hive.llap.LlapUtil;
+import org.apache.hadoop.hive.llap.io.encoded.OrcEncodedDataReader;
+import org.apache.tez.common.counters.FileSystemCounter;
+import org.apache.tez.common.counters.TezCounters;
+import org.apache.tez.runtime.task.TaskRunner2Callable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Custom thread pool implementation that records per thread file system statistics in TezCounters.
+ * The way it works is we capture before and after snapshots of file system thread statistics,
+ * compute the delta difference in statistics and update them in tez task counters.
+ */
+public class StatsRecordingThreadPool extends ThreadPoolExecutor {
+  private static final Logger LOG = LoggerFactory.getLogger(StatsRecordingThreadPool.class);
+  // uncaught exception handler that will be set for all threads before execution
+  private Thread.UncaughtExceptionHandler uncaughtExceptionHandler;
+
+  public StatsRecordingThreadPool(final int corePoolSize, final int maximumPoolSize,
+      final long keepAliveTime,
+      final TimeUnit unit,
+      final BlockingQueue<Runnable> workQueue,
+      final ThreadFactory threadFactory) {
+    this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, null);
+  }
+
+  public StatsRecordingThreadPool(final int corePoolSize, final int maximumPoolSize,
+      final long keepAliveTime,
+      final TimeUnit unit,
+      final BlockingQueue<Runnable> workQueue,
+      final ThreadFactory threadFactory, Thread.UncaughtExceptionHandler handler) {
+    super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory);
+    this.uncaughtExceptionHandler = handler;
+  }
+
+  @Override
+  protected <T> RunnableFuture<T> newTaskFor(final Callable<T> callable) {
+    return new FutureTask(new WrappedCallable(callable, uncaughtExceptionHandler));
+  }
+
+  public void setUncaughtExceptionHandler(Thread.UncaughtExceptionHandler handler) {
+    this.uncaughtExceptionHandler = handler;
+  }
+
+  /**
+   * Callable that wraps the actual callable submitted to the thread pool and invokes completion
+   * listener in finally block.
+   *
+   * @param <V> - actual callable
+   */
+  private static class WrappedCallable<V> implements Callable<V> {
+    private Callable<V> actualCallable;
+    private Thread.UncaughtExceptionHandler uncaughtExceptionHandler;
+
+    WrappedCallable(final Callable<V> callable,
+        final Thread.UncaughtExceptionHandler uncaughtExceptionHandler) {
+      this.actualCallable = callable;
+      this.uncaughtExceptionHandler = uncaughtExceptionHandler;
+    }
+
+    @Override
+    public V call() throws Exception {
+      Thread thread = Thread.currentThread();
+
+      // setup uncaught exception handler for the current thread
+      if (uncaughtExceptionHandler != null) {
+        thread.setUncaughtExceptionHandler(uncaughtExceptionHandler);
+      }
+
+      // clone thread local file system statistics
+      List<LlapUtil.StatisticsData> statsBefore = LlapUtil.cloneThreadLocalFileSystemStatistics();
+
+      try {
+        return actualCallable.call();
+      } finally {
+        updateFileSystemCounters(statsBefore, actualCallable);
+      }
+    }
+
+    private void updateFileSystemCounters(final List<LlapUtil.StatisticsData> statsBefore,
+        final Callable<V> actualCallable) {
+      Thread thread = Thread.currentThread();
+      TezCounters tezCounters = null;
+      // add tez counters for task execution and llap io
+      if (actualCallable instanceof TaskRunner2Callable) {
+        TaskRunner2Callable taskRunner2Callable = (TaskRunner2Callable) actualCallable;
+        // counters for task execution side
+        tezCounters = taskRunner2Callable.addAndGetTezCounter(FileSystemCounter.class.getName());
+      } else if (actualCallable instanceof OrcEncodedDataReader) {
+        // counters for llap io side
+        tezCounters = ((OrcEncodedDataReader) actualCallable).getTezCounters();
+      }
+
+      if (tezCounters != null) {
+        if (statsBefore != null) {
+          // if there are multiple stats for the same scheme (from different NameNode), this
+          // method will squash them together
+          Map<String, FileSystem.Statistics> schemeToStats = LlapUtil
+              .getCombinedFileSystemStatistics();
+          for (Map.Entry<String, FileSystem.Statistics> entry : schemeToStats.entrySet()) {
+            final String scheme = entry.getKey();
+            FileSystem.Statistics statistics = entry.getValue();
+            FileSystem.Statistics.StatisticsData threadFSStats = statistics
+                .getThreadStatistics();
+            List<LlapUtil.StatisticsData> allStatsBefore = LlapUtil
+                .getStatisticsForScheme(scheme, statsBefore);
+            long bytesReadDelta = 0;
+            long bytesWrittenDelta = 0;
+            long readOpsDelta = 0;
+            long largeReadOpsDelta = 0;
+            long writeOpsDelta = 0;
+            // there could be more scheme after execution as execution might be accessing a
+            // different filesystem. So if we don't find a matching scheme before execution we
+            // just use the after execution values directly without computing delta difference
+            if (allStatsBefore != null && !allStatsBefore.isEmpty()) {
+              for (LlapUtil.StatisticsData sb : allStatsBefore) {
+                bytesReadDelta += threadFSStats.getBytesRead() - sb.getBytesRead();
+                bytesWrittenDelta += threadFSStats.getBytesWritten() - sb.getBytesWritten();
+                readOpsDelta += threadFSStats.getReadOps() - sb.getReadOps();
+                largeReadOpsDelta += threadFSStats.getLargeReadOps() - sb.getLargeReadOps();
+                writeOpsDelta += threadFSStats.getWriteOps() - sb.getWriteOps();
+              }
+            } else {
+              bytesReadDelta = threadFSStats.getBytesRead();
+              bytesWrittenDelta = threadFSStats.getBytesWritten();
+              readOpsDelta = threadFSStats.getReadOps();
+              largeReadOpsDelta = threadFSStats.getLargeReadOps();
+              writeOpsDelta = threadFSStats.getWriteOps();
+            }
+            tezCounters.findCounter(scheme, FileSystemCounter.BYTES_READ)
+                .increment(bytesReadDelta);
+            tezCounters.findCounter(scheme, FileSystemCounter.BYTES_WRITTEN)
+                .increment(bytesWrittenDelta);
+            tezCounters.findCounter(scheme, FileSystemCounter.READ_OPS).increment(readOpsDelta);
+            tezCounters.findCounter(scheme, FileSystemCounter.LARGE_READ_OPS)
+                .increment(largeReadOpsDelta);
+            tezCounters.findCounter(scheme, FileSystemCounter.WRITE_OPS)
+                .increment(writeOpsDelta);
+
+            if (LOG.isDebugEnabled()) {
+              LOG.debug("Updated stats: instance: {} thread name: {} thread id: {} scheme: {} " +
+                      "bytesRead: {} bytesWritten: {} readOps: {} largeReadOps: {} writeOps: {}",
+                  actualCallable.getClass().getSimpleName(), thread.getName(), thread.getId(),
+                  scheme, bytesReadDelta, bytesWrittenDelta, readOpsDelta, largeReadOpsDelta,
+                  writeOpsDelta);
+            }
+          }
+        } else {
+          LOG.warn("File system statistics snapshot before execution of thread is null." +
+                  "Thread name: {} id: {} allStats: {}", thread.getName(), thread.getId(),
+              statsBefore);
+        }
+      } else {
+        LOG.warn("TezCounters is null for callable type: {}",
+            actualCallable.getClass().getSimpleName());
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/5b2c36a4/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java
index 143e755..fb64f0b 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java
@@ -23,7 +23,9 @@ import java.security.PrivilegedExceptionAction;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 
@@ -39,6 +41,7 @@ import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmitWor
 import org.apache.hadoop.hive.llap.metrics.LlapDaemonExecutorMetrics;
 import org.apache.hadoop.hive.llap.protocol.LlapTaskUmbilicalProtocol;
 import org.apache.hadoop.hive.llap.tez.Converters;
+import org.apache.hadoop.hive.llap.tezplugins.LlapTezUtils;
 import org.apache.hadoop.hive.ql.io.IOContextMap;
 import org.apache.hadoop.ipc.RPC;
 import org.apache.hadoop.net.NetUtils;
@@ -46,6 +49,7 @@ import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.security.SecurityUtil;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.tez.common.CallableWithNdc;
 import org.apache.tez.common.TezCommonUtils;
 import org.apache.tez.common.security.JobTokenIdentifier;
@@ -100,7 +104,7 @@ public class TaskRunnerCallable extends CallableWithNdc<TaskRunner2Result> {
   private final FragmentCompletionHandler fragmentCompletionHanler;
   private volatile TezTaskRunner2 taskRunner;
   private volatile TaskReporterInterface taskReporter;
-  private volatile ListeningExecutorService executor;
+  private volatile ExecutorService executor;
   private LlapTaskUmbilicalProtocol umbilical;
   private volatile long startTime;
   private volatile String threadName;
@@ -181,12 +185,13 @@ public class TaskRunnerCallable extends CallableWithNdc<TaskRunner2Result> {
     }
 
     // TODO This executor seems unnecessary. Here and TezChild
-    ExecutorService executorReal = Executors.newFixedThreadPool(1,
+    executor = new StatsRecordingThreadPool(1, 1,
+        0L, TimeUnit.MILLISECONDS,
+        new LinkedBlockingQueue<Runnable>(),
         new ThreadFactoryBuilder()
             .setDaemon(true)
             .setNameFormat("TezTaskRunner")
             .build());
-    executor = MoreExecutors.listeningDecorator(executorReal);
 
     // TODO Consolidate this code with TezChild.
     runtimeWatch.start();
@@ -214,12 +219,7 @@ public class TaskRunnerCallable extends CallableWithNdc<TaskRunner2Result> {
       }
     });
 
-    TezTaskAttemptID taskAttemptID = taskSpec.getTaskAttemptID();
-    TezTaskID taskId = taskAttemptID.getTaskID();
-    TezVertexID tezVertexID = taskId.getVertexID();
-    TezDAGID tezDAGID = tezVertexID.getDAGId();
-    String fragFullId = Joiner.on('_').join(tezDAGID.getId(), tezVertexID.getId(), taskId.getId(),
-        taskAttemptID.getId());
+    String fragmentId = LlapTezUtils.stripAttemptPrefix(taskSpec.getTaskAttemptID().toString());
     taskReporter = new LlapTaskReporter(
         umbilical,
         confParams.amHeartbeatIntervalMsMax,
@@ -227,7 +227,7 @@ public class TaskRunnerCallable extends CallableWithNdc<TaskRunner2Result> {
         confParams.amMaxEventsPerHeartbeat,
         new AtomicLong(0),
         request.getContainerIdString(),
-        fragFullId,
+        fragmentId,
         initialEvent);
 
     String attemptId = fragmentInfo.getFragmentIdentifierString();

http://git-wip-us.apache.org/repos/asf/hive/blob/5b2c36a4/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapInputFormat.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapInputFormat.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapInputFormat.java
index ff9604e..c4ffb9f 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapInputFormat.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapInputFormat.java
@@ -34,14 +34,18 @@ import java.util.Iterator;
 import java.util.LinkedHashMap;
 import java.util.LinkedList;
 import java.util.List;
+import java.util.concurrent.ExecutorService;
+
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.llap.ConsumerFeedback;
 import org.apache.hadoop.hive.llap.counters.FragmentCountersMap;
 import org.apache.hadoop.hive.llap.counters.LlapIOCounters;
 import org.apache.hadoop.hive.llap.counters.QueryFragmentCounters;
+import org.apache.hadoop.hive.llap.daemon.impl.StatsRecordingThreadPool;
 import org.apache.hadoop.hive.llap.io.decode.ColumnVectorProducer;
 import org.apache.hadoop.hive.llap.io.decode.ReadPipeline;
+import org.apache.hadoop.hive.llap.tezplugins.LlapTezUtils;
 import org.apache.hadoop.hive.ql.exec.ColumnInfo;
 import org.apache.hadoop.hive.ql.exec.Operator;
 import org.apache.hadoop.hive.ql.exec.RowSchema;
@@ -87,12 +91,12 @@ public class LlapInputFormat implements InputFormat<NullWritable, VectorizedRowB
   private final InputFormat sourceInputFormat;
   private final AvoidSplitCombination sourceASC;
   private final ColumnVectorProducer cvp;
-  private final ListeningExecutorService executor;
+  private final ExecutorService executor;
   private final String hostName;
 
   @SuppressWarnings("rawtypes")
   LlapInputFormat(InputFormat sourceInputFormat, ColumnVectorProducer cvp,
-      ListeningExecutorService executor) {
+      ExecutorService executor) {
     // TODO: right now, we do nothing with source input format, ORC-only in the first cut.
     //       We'd need to plumb it thru and use it to get data to cache/etc.
     assert sourceInputFormat instanceof OrcInputFormat;
@@ -173,19 +177,13 @@ public class LlapInputFormat implements InputFormat<NullWritable, VectorizedRowB
       this.columnIds = includedCols;
       this.sarg = ConvertAstToSearchArg.createFromConf(job);
       this.columnNames = ColumnProjectionUtils.getReadColumnNames(job);
-      String dagId = job.get("tez.mapreduce.dag.index");
-      String vertexId = job.get("tez.mapreduce.vertex.index");
-      String taskId = job.get("tez.mapreduce.task.index");
-      String taskAttemptId = job.get("tez.mapreduce.task.attempt.index");
+      String fragmentId = LlapTezUtils.getFragmentId(job);
       TezCounters taskCounters = null;
-      if (dagId != null && vertexId != null && taskId != null && taskAttemptId != null) {
-        String fullId = Joiner.on('_').join(dagId, vertexId, taskId, taskAttemptId);
-        taskCounters = FragmentCountersMap.getCountersForFragment(fullId);
-        LOG.info("Received dagid_vertexid_taskid_attempid: {}", fullId);
+      if (fragmentId != null) {
+        taskCounters = FragmentCountersMap.getCountersForFragment(fragmentId);
+        LOG.info("Received fragment id: {}", fragmentId);
       } else {
-        LOG.warn("Not using tez counters as some identifier is null." +
-            " dagId: {} vertexId: {} taskId: {} taskAttempId: {}",
-            dagId, vertexId, taskId, taskAttemptId);
+        LOG.warn("Not using tez counters as fragment id string is null");
       }
       this.counters = new QueryFragmentCounters(job, taskCounters);
       this.counters.setDesc(QueryFragmentCounters.Desc.MACHINE, hostName);
@@ -255,17 +253,12 @@ public class LlapInputFormat implements InputFormat<NullWritable, VectorizedRowB
       return rbCtx;
     }
 
-    private final class UncaughtErrorHandler implements FutureCallback<Void> {
+    private final class IOUncaughtExceptionHandler implements Thread.UncaughtExceptionHandler {
       @Override
-      public void onSuccess(Void result) {
-        // Successful execution of reader is supposed to call setDone.
-      }
-
-      @Override
-      public void onFailure(Throwable t) {
-        // Reader is not supposed to throw AFTER calling setError.
-        LlapIoImpl.LOG.error("Unhandled error from reader thread " + t.getMessage());
-        setError(t);
+      public void uncaughtException(final Thread t, final Throwable e) {
+        LlapIoImpl.LOG.error("Unhandled error from reader thread. threadName: {} threadId: {}" +
+            " Message: {}", t.getName(), t.getId(), e.getMessage());
+        setError(e);
       }
     }
 
@@ -274,9 +267,12 @@ public class LlapInputFormat implements InputFormat<NullWritable, VectorizedRowB
       ReadPipeline rp = cvp.createReadPipeline(
           this, split, columnIds, sarg, columnNames, counters);
       feedback = rp;
-      ListenableFuture<Void> future = executor.submit(rp.getReadCallable());
-      // TODO: we should NOT do this thing with handler. Reader needs to do cleanup in most cases.
-      Futures.addCallback(future, new UncaughtErrorHandler());
+      if (executor instanceof StatsRecordingThreadPool) {
+        // Every thread created by this thread pool will use the same handler
+        ((StatsRecordingThreadPool) executor)
+            .setUncaughtExceptionHandler(new IOUncaughtExceptionHandler());
+      }
+      executor.submit(rp.getReadCallable());
     }
 
     ColumnVectorBatch nextCvb() throws InterruptedException, IOException {

http://git-wip-us.apache.org/repos/asf/hive/blob/5b2c36a4/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapIoImpl.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapIoImpl.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapIoImpl.java
index 9316dff..9deef0c 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapIoImpl.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapIoImpl.java
@@ -22,10 +22,15 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
+import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
 
 import javax.management.ObjectName;
 
+import org.apache.hadoop.hive.llap.daemon.impl.StatsRecordingThreadPool;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -69,7 +74,7 @@ public class LlapIoImpl implements LlapIo<VectorizedRowBatch> {
   private static final String MODE_CACHE = "cache", MODE_ALLOCATOR = "allocator";
 
   private final ColumnVectorProducer cvp;
-  private final ListeningExecutorService executor;
+  private final ExecutorService executor;
   private final LlapDaemonCacheMetrics cacheMetrics;
   private final LlapDaemonIOMetrics ioMetrics;
   private ObjectName buddyAllocatorMXBean;
@@ -137,8 +142,10 @@ public class LlapIoImpl implements LlapIo<VectorizedRowBatch> {
     }
     // IO thread pool. Listening is used for unhandled errors for now (TODO: remove?)
     int numThreads = HiveConf.getIntVar(conf, HiveConf.ConfVars.LLAP_IO_THREADPOOL_SIZE);
-    executor = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(numThreads,
-        new ThreadFactoryBuilder().setNameFormat("IO-Elevator-Thread-%d").setDaemon(true).build()));
+    executor = new StatsRecordingThreadPool(numThreads, numThreads,
+        0L, TimeUnit.MILLISECONDS,
+        new LinkedBlockingQueue<Runnable>(),
+        new ThreadFactoryBuilder().setNameFormat("IO-Elevator-Thread-%d").setDaemon(true).build());
     // TODO: this should depends on input format and be in a map, or something.
     this.cvp = new OrcColumnVectorProducer(
         metadataCache, orcCache, bufferManager, conf, cacheMetrics, ioMetrics);

http://git-wip-us.apache.org/repos/asf/hive/blob/5b2c36a4/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java
index 1befba7..1dcd2cd 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java
@@ -29,6 +29,7 @@ import org.apache.hadoop.hive.llap.counters.LlapIOCounters;
 import org.apache.hadoop.hive.llap.metrics.LlapDaemonIOMetrics;
 import org.apache.orc.impl.DataReaderProperties;
 import org.apache.orc.impl.OrcIndex;
+import org.apache.tez.common.counters.TezCounters;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -921,4 +922,8 @@ public class OrcEncodedDataReader extends CallableWithNdc<Void>
       return orcDataReader.readStripeFooter(stripe);
     }
   }
+
+  public TezCounters getTezCounters() {
+    return counters.getTezCounters();
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/5b2c36a4/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTezUtils.java
----------------------------------------------------------------------
diff --git a/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTezUtils.java b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTezUtils.java
index 2c3e53c..eda8862 100644
--- a/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTezUtils.java
+++ b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTezUtils.java
@@ -14,11 +14,24 @@
 
 package org.apache.hadoop.hive.llap.tezplugins;
 
+import java.text.NumberFormat;
+
 import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.tez.dag.records.TezDAGID;
+import org.apache.tez.dag.records.TezTaskAttemptID;
+import org.apache.tez.dag.records.TezTaskID;
+import org.apache.tez.dag.records.TezVertexID;
+import org.apache.tez.mapreduce.hadoop.MRHelpers;
+import org.apache.tez.mapreduce.hadoop.MRInputHelpers;
+import org.apache.tez.mapreduce.hadoop.MRJobConfig;
 import org.apache.tez.mapreduce.input.MRInput;
 import org.apache.tez.mapreduce.input.MRInputLegacy;
 import org.apache.tez.mapreduce.input.MultiMRInput;
 
+import com.google.common.base.Joiner;
+
 @InterfaceAudience.Private
 public class LlapTezUtils {
   public static boolean isSourceOfInterest(String inputClassName) {
@@ -26,4 +39,19 @@ public class LlapTezUtils {
     return !(inputClassName.equals(MRInputLegacy.class.getName()) || inputClassName.equals(
         MultiMRInput.class.getName()) || inputClassName.equals(MRInput.class.getName()));
   }
+
+  public static String getFragmentId(final JobConf job) {
+    String taskAttemptId = job.get(MRInput.TEZ_MAPREDUCE_TASK_ATTEMPT_ID);
+    if (taskAttemptId != null) {
+      return stripAttemptPrefix(taskAttemptId);
+    }
+    return null;
+  }
+
+  public static String stripAttemptPrefix(final String s) {
+    if (s.startsWith(TezTaskAttemptID.ATTEMPT)) {
+      return s.substring(TezTaskAttemptID.ATTEMPT.length() + 1);
+    }
+    return s;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/5b2c36a4/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezJobMonitor.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezJobMonitor.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezJobMonitor.java
index 5aab0e5..67cd38d 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezJobMonitor.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezJobMonitor.java
@@ -36,6 +36,7 @@ import java.util.Set;
 import java.util.SortedSet;
 import java.util.TreeSet;
 
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.llap.counters.LlapIOCounters;
 import org.apache.hadoop.hive.ql.Context;
@@ -47,6 +48,7 @@ import org.apache.hadoop.hive.ql.log.PerfLogger;
 import org.apache.hadoop.hive.ql.plan.BaseWork;
 import org.apache.hadoop.hive.ql.session.SessionState;
 import org.apache.hadoop.hive.ql.session.SessionState.LogHelper;
+import org.apache.tez.common.counters.FileSystemCounter;
 import org.apache.tez.common.counters.TaskCounter;
 import org.apache.tez.common.counters.TezCounter;
 import org.apache.tez.common.counters.TezCounters;
@@ -80,6 +82,7 @@ public class TezJobMonitor {
   private static final String QUERY_EXEC_SUMMARY_HEADER = "Query Execution Summary";
   private static final String TASK_SUMMARY_HEADER = "Task Execution Summary";
   private static final String LLAP_IO_SUMMARY_HEADER = "LLAP IO Summary";
+  private static final String FS_COUNTERS_SUMMARY_HEADER = "FileSystem Counters Summary";
 
   // keep this within 80 chars width. If more columns needs to be added then update min terminal
   // width requirement and SEPARATOR width accordingly
@@ -106,6 +109,9 @@ public class TezJobMonitor {
       "VERTICES", "ROWGROUPS", "META_HIT", "META_MISS", "DATA_HIT", "DATA_MISS",
       "ALLOCATION", "USED", "TOTAL_IO");
 
+  // FileSystem counters
+  private static final String FS_COUNTERS_HEADER_FORMAT = "%10s %15s %13s %18s %18s %13s";
+
   // Methods summary
   private static final String OPERATION_SUMMARY = "%-35s %9s";
   private static final String OPERATION = "OPERATION";
@@ -391,6 +397,10 @@ public class TezJobMonitor {
         console.printInfo(LLAP_IO_SUMMARY_HEADER);
         printLlapIOSummary(progressMap, console, dagClient);
         console.printInfo(SEPARATOR);
+        console.printInfo("");
+
+        console.printInfo(FS_COUNTERS_SUMMARY_HEADER);
+        printFSCountersSummary(progressMap, console, dagClient);
       }
 
       console.printInfo("");
@@ -697,6 +707,62 @@ public class TezJobMonitor {
     }
   }
 
+  private void printFSCountersSummary(Map<String, Progress> progressMap, LogHelper console,
+      DAGClient dagClient) {
+    SortedSet<String> keys = new TreeSet<>(progressMap.keySet());
+    Set<StatusGetOpts> statusOptions = new HashSet<>(1);
+    statusOptions.add(StatusGetOpts.GET_COUNTERS);
+    // Assuming FileSystem.getAllStatistics() returns all schemes that are accessed on task side
+    // as well. If not, we need a way to get all the schemes that are accessed by the tez task/llap.
+    for (FileSystem.Statistics statistics : FileSystem.getAllStatistics()) {
+      final String scheme = statistics.getScheme().toUpperCase();
+      final String fsCountersHeader = String.format(FS_COUNTERS_HEADER_FORMAT,
+          "VERTICES", "BYTES_READ", "READ_OPS", "LARGE_READ_OPS", "BYTES_WRITTEN", "WRITE_OPS");
+
+      console.printInfo("");
+      reprintLineWithColorAsBold("Scheme: " + scheme, Ansi.Color.RED);
+      console.printInfo(SEPARATOR);
+      reprintLineWithColorAsBold(fsCountersHeader, Ansi.Color.CYAN);
+      console.printInfo(SEPARATOR);
+
+      for (String vertexName : keys) {
+        TezCounters vertexCounters = null;
+        try {
+          vertexCounters = dagClient.getVertexStatus(vertexName, statusOptions)
+              .getVertexCounters();
+        } catch (IOException e) {
+          // best attempt, shouldn't really kill DAG for this
+        } catch (TezException e) {
+          // best attempt, shouldn't really kill DAG for this
+        }
+        if (vertexCounters != null) {
+          final String counterGroup = FileSystemCounter.class.getName();
+          final long bytesRead = getCounterValueByGroupName(vertexCounters,
+              counterGroup, scheme + "_" + FileSystemCounter.BYTES_READ.name());
+          final long bytesWritten = getCounterValueByGroupName(vertexCounters,
+              counterGroup, scheme + "_" + FileSystemCounter.BYTES_WRITTEN.name());
+          final long readOps = getCounterValueByGroupName(vertexCounters,
+              counterGroup, scheme + "_" + FileSystemCounter.READ_OPS.name());
+          final long largeReadOps = getCounterValueByGroupName(vertexCounters,
+              counterGroup, scheme + "_" + FileSystemCounter.LARGE_READ_OPS.name());
+          final long writeOps = getCounterValueByGroupName(vertexCounters,
+              counterGroup, scheme + "_" + FileSystemCounter.WRITE_OPS.name());
+
+          String fsCountersSummary = String.format(FS_COUNTERS_HEADER_FORMAT,
+              vertexName,
+              humanReadableByteCount(bytesRead),
+              readOps,
+              largeReadOps,
+              humanReadableByteCount(bytesWritten),
+              writeOps);
+          console.printInfo(fsCountersSummary);
+        }
+      }
+
+      console.printInfo(SEPARATOR);
+    }
+  }
+
   private void printStatusInPlace(Map<String, Progress> progressMap, long startTime,
       boolean vextexStatusFromAM, DAGClient dagClient) {
     StringBuilder reportBuffer = new StringBuilder();

http://git-wip-us.apache.org/repos/asf/hive/blob/5b2c36a4/ql/src/java/org/apache/hadoop/hive/ql/hooks/PostExecTezSummaryPrinter.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/hooks/PostExecTezSummaryPrinter.java b/ql/src/java/org/apache/hadoop/hive/ql/hooks/PostExecTezSummaryPrinter.java
index 81bda08..412f45c 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/hooks/PostExecTezSummaryPrinter.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/hooks/PostExecTezSummaryPrinter.java
@@ -19,6 +19,8 @@ package org.apache.hadoop.hive.ql.hooks;
 
 import java.util.List;
 
+import org.apache.hadoop.hive.llap.counters.LlapIOCounters;
+import org.apache.tez.common.counters.FileSystemCounter;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.hive.conf.HiveConf;
@@ -57,12 +59,30 @@ public class PostExecTezSummaryPrinter implements ExecuteWithHookContext {
       LOG.info("Printing summary for tez task: " + tezTask.getName());
       TezCounters counters = tezTask.getTezCounters();
       if (counters != null) {
+        String hiveCountersGroup = HiveConf.getVar(conf, HiveConf.ConfVars.HIVECOUNTERGROUP);
         for (CounterGroup group : counters) {
-          if ("HIVE".equals(group.getDisplayName())) {
+          if (hiveCountersGroup.equals(group.getDisplayName())) {
             console.printError(tezTask.getId() + " HIVE COUNTERS:");
             for (TezCounter counter : group) {
               console.printError("   " + counter.getDisplayName() + ": " + counter.getValue());
             }
+          } else if (group.getName().equals(FileSystemCounter.class.getName())) {
+            console.printError(tezTask.getId() + " FILE SYSTEM COUNTERS:");
+            for (TezCounter counter : group) {
+              // HDFS counters should be relatively consistent across test runs when compared to
+              // local file system counters
+              if (counter.getName().contains("HDFS")) {
+                console.printError("   " + counter.getDisplayName() + ": " + counter.getValue());
+              }
+            }
+          } else if (group.getName().equals(LlapIOCounters.class.getName())) {
+            console.printError(tezTask.getId() + " LLAP IO COUNTERS:");
+            List<String> testSafeCounters = LlapIOCounters.testSafeCounterNames();
+            for (TezCounter counter : group) {
+              if (testSafeCounters.contains(counter.getDisplayName())) {
+                console.printError("   " + counter.getDisplayName() + ": " + counter.getValue());
+              }
+            }
           }
         }
       }

http://git-wip-us.apache.org/repos/asf/hive/blob/5b2c36a4/ql/src/test/queries/clientpositive/orc_llap_counters.q
----------------------------------------------------------------------
diff --git a/ql/src/test/queries/clientpositive/orc_llap_counters.q b/ql/src/test/queries/clientpositive/orc_llap_counters.q
new file mode 100644
index 0000000..1bd55d3
--- /dev/null
+++ b/ql/src/test/queries/clientpositive/orc_llap_counters.q
@@ -0,0 +1,182 @@
+set hive.mapred.mode=nonstrict;
+SET hive.optimize.index.filter=true;
+SET hive.cbo.enable=false;
+SET hive.vectorized.execution.enabled=true;
+SET hive.llap.io.enabled=true;
+
+CREATE TABLE staging(t tinyint,
+           si smallint,
+           i int,
+           b bigint,
+           f float,
+           d double,
+           bo boolean,
+           s string,
+           ts timestamp,
+           dec decimal(4,2),
+           bin binary)
+ROW FORMAT DELIMITED FIELDS TERMINATED BY '|'
+STORED AS TEXTFILE;
+
+LOAD DATA LOCAL INPATH '../../data/files/over1k' OVERWRITE INTO TABLE staging;
+LOAD DATA LOCAL INPATH '../../data/files/over1k' INTO TABLE staging;
+
+CREATE TABLE orc_ppd_staging(t tinyint,
+           si smallint,
+           i int,
+           b bigint,
+           f float,
+           d double,
+           bo boolean,
+           s string,
+           c char(50),
+           v varchar(50),
+           da date,
+           ts timestamp,
+           dec decimal(4,2),
+           bin binary)
+STORED AS ORC tblproperties("orc.row.index.stride" = "1000", "orc.bloom.filter.columns"="*");
+
+insert overwrite table orc_ppd_staging select t, si, i, b, f, d, bo, s, cast(s as char(50)), cast(s as varchar(50)), cast(ts as date), ts, dec, bin from staging order by t, s;
+
+-- just to introduce a gap in min/max range for bloom filters. The dataset has contiguous values
+-- which makes it hard to test bloom filters
+insert into orc_ppd_staging select -10,-321,-65680,-4294967430,-97.94,-13.07,true,"aaa","aaa","aaa","1990-03-11","1990-03-11 10:11:58.703308",-71.54,"aaa" from staging limit 1;
+insert into orc_ppd_staging select 127,331,65690,4294967440,107.94,23.07,true,"zzz","zzz","zzz","2023-03-11","2023-03-11 10:11:58.703308",71.54,"zzz" from staging limit 1;
+
+CREATE TABLE orc_ppd(t tinyint,
+           si smallint,
+           i int,
+           b bigint,
+           f float,
+           d double,
+           bo boolean,
+           s string,
+           c char(50),
+           v varchar(50),
+           da date,
+           ts timestamp,
+           dec decimal(4,2),
+           bin binary)
+STORED AS ORC tblproperties("orc.row.index.stride" = "1000", "orc.bloom.filter.columns"="*");
+
+insert overwrite table orc_ppd select t, si, i, b, f, d, bo, s, cast(s as char(50)), cast(s as varchar(50)), cast(ts as date), ts, dec, bin from orc_ppd_staging order by t, s;
+
+describe formatted orc_ppd;
+
+SET hive.fetch.task.conversion=none;
+SET hive.exec.post.hooks=org.apache.hadoop.hive.ql.hooks.PostExecTezSummaryPrinter;
+
+-- Row group statistics for column t:
+-- Entry 0: count: 994 hasNull: true min: -10 max: 54 sum: 26014 positions: 0,0,0,0,0,0,0
+-- Entry 1: count: 1000 hasNull: false min: 54 max: 118 sum: 86812 positions: 0,2,124,0,0,116,11
+-- Entry 2: count: 100 hasNull: false min: 118 max: 127 sum: 12151 positions: 0,4,119,0,0,244,19
+
+-- INPUT_RECORDS: 2100 (all row groups)
+select count(*) from orc_ppd;
+
+-- INPUT_RECORDS: 0 (no row groups)
+select count(*) from orc_ppd where t > 127;
+
+-- INPUT_RECORDS: 1000 (1 row group)
+select count(*) from orc_ppd where t = 55;
+select count(*) from orc_ppd where t <=> 50;
+select count(*) from orc_ppd where t <=> 100;
+
+-- INPUT_RECORDS: 2000 (2 row groups)
+select count(*) from orc_ppd where t = "54";
+
+-- INPUT_RECORDS: 1000 (1 row group)
+select count(*) from orc_ppd where t = -10.0;
+
+-- INPUT_RECORDS: 1000 (1 row group)
+select count(*) from orc_ppd where t = cast(53 as float);
+select count(*) from orc_ppd where t = cast(53 as double);
+
+-- INPUT_RECORDS: 2000 (2 row groups)
+select count(*) from orc_ppd where t < 100;
+
+-- INPUT_RECORDS: 1000 (1 row group)
+select count(*) from orc_ppd where t < 100 and t > 98;
+
+-- INPUT_RECORDS: 2000 (2 row groups)
+select count(*) from orc_ppd where t <= 100;
+
+-- INPUT_RECORDS: 1000 (1 row group)
+select count(*) from orc_ppd where t is null;
+
+-- INPUT_RECORDS: 1100 (2 row groups)
+select count(*) from orc_ppd where t in (5, 120);
+
+-- INPUT_RECORDS: 1000 (1 row group)
+select count(*) from orc_ppd where t between 60 and 80;
+
+-- bloom filter tests
+-- INPUT_RECORDS: 0
+select count(*) from orc_ppd where t = -100;
+select count(*) from orc_ppd where t <=> -100;
+select count(*) from orc_ppd where t = 125;
+select count(*) from orc_ppd where t IN (-100, 125, 200);
+
+-- Row group statistics for column s:
+-- Entry 0: count: 1000 hasNull: false min:  max: zach young sum: 12907 positions: 0,0,0
+-- Entry 1: count: 1000 hasNull: false min: alice allen max: zach zipper sum: 12704 positions: 0,1611,191
+-- Entry 2: count: 100 hasNull: false min: bob davidson max: zzz sum: 1281 positions: 0,3246,373
+
+-- INPUT_RECORDS: 0 (no row groups)
+select count(*) from orc_ppd where s > "zzz";
+
+-- INPUT_RECORDS: 1000 (1 row group)
+select count(*) from orc_ppd where s = "zach young";
+select count(*) from orc_ppd where s <=> "zach zipper";
+select count(*) from orc_ppd where s <=> "";
+
+-- INPUT_RECORDS: 0
+select count(*) from orc_ppd where s is null;
+
+-- INPUT_RECORDS: 2100
+select count(*) from orc_ppd where s is not null;
+
+-- INPUT_RECORDS: 0
+select count(*) from orc_ppd where s = cast("zach young" as char(50));
+
+-- INPUT_RECORDS: 1000 (1 row group)
+select count(*) from orc_ppd where s = cast("zach young" as char(10));
+select count(*) from orc_ppd where s = cast("zach young" as varchar(10));
+select count(*) from orc_ppd where s = cast("zach young" as varchar(50));
+
+-- INPUT_RECORDS: 2000 (2 row groups)
+select count(*) from orc_ppd where s < "b";
+
+-- INPUT_RECORDS: 2000 (2 row groups)
+select count(*) from orc_ppd where s > "alice" and s < "bob";
+
+-- INPUT_RECORDS: 2000 (2 row groups)
+select count(*) from orc_ppd where s in ("alice allen", "");
+
+-- INPUT_RECORDS: 2000 (2 row groups)
+select count(*) from orc_ppd where s between "" and "alice allen";
+
+-- INPUT_RECORDS: 100 (1 row group)
+select count(*) from orc_ppd where s between "zz" and "zzz";
+
+-- INPUT_RECORDS: 1100 (2 row groups)
+select count(*) from orc_ppd where s between "zach zipper" and "zzz";
+
+-- bloom filter tests
+-- INPUT_RECORDS: 0
+select count(*) from orc_ppd where s = "hello world";
+select count(*) from orc_ppd where s <=> "apache hive";
+select count(*) from orc_ppd where s IN ("a", "z");
+
+-- INPUT_RECORDS: 100
+select count(*) from orc_ppd where s = "sarah ovid";
+
+-- INPUT_RECORDS: 1100
+select count(*) from orc_ppd where s = "wendy king";
+
+-- INPUT_RECORDS: 1000
+select count(*) from orc_ppd where s = "wendy king" and t < 0;
+
+-- INPUT_RECORDS: 100
+select count(*) from orc_ppd where s = "wendy king" and t > 100;

http://git-wip-us.apache.org/repos/asf/hive/blob/5b2c36a4/ql/src/test/queries/clientpositive/orc_llap_counters1.q
----------------------------------------------------------------------
diff --git a/ql/src/test/queries/clientpositive/orc_llap_counters1.q b/ql/src/test/queries/clientpositive/orc_llap_counters1.q
new file mode 100644
index 0000000..06d6c4f
--- /dev/null
+++ b/ql/src/test/queries/clientpositive/orc_llap_counters1.q
@@ -0,0 +1,83 @@
+set hive.mapred.mode=nonstrict;
+SET hive.optimize.index.filter=true;
+SET hive.cbo.enable=false;
+SET hive.vectorized.execution.enabled=true;
+SET hive.llap.io.enabled=true;
+
+CREATE TABLE staging(t tinyint,
+           si smallint,
+           i int,
+           b bigint,
+           f float,
+           d double,
+           bo boolean,
+           s string,
+           ts timestamp,
+           dec decimal(4,2),
+           bin binary)
+ROW FORMAT DELIMITED FIELDS TERMINATED BY '|'
+STORED AS TEXTFILE;
+
+LOAD DATA LOCAL INPATH '../../data/files/over1k' OVERWRITE INTO TABLE staging;
+LOAD DATA LOCAL INPATH '../../data/files/over1k' INTO TABLE staging;
+
+CREATE TABLE orc_ppd_staging(t tinyint,
+           si smallint,
+           i int,
+           b bigint,
+           f float,
+           d double,
+           bo boolean,
+           s string,
+           c char(50),
+           v varchar(50),
+           da date,
+           ts timestamp,
+           dec decimal(4,2),
+           bin binary)
+STORED AS ORC tblproperties("orc.row.index.stride" = "1000", "orc.bloom.filter.columns"="*");
+
+insert overwrite table orc_ppd_staging select t, si, i, b, f, d, bo, s, cast(s as char(50)), cast(s as varchar(50)), cast(ts as date), ts, dec, bin from staging order by t, s;
+
+-- just to introduce a gap in min/max range for bloom filters. The dataset has contiguous values
+-- which makes it hard to test bloom filters
+insert into orc_ppd_staging select -10,-321,-65680,-4294967430,-97.94,-13.07,true,"aaa","aaa","aaa","1990-03-11","1990-03-11 10:11:58.703308",-71.54,"aaa" from staging limit 1;
+insert into orc_ppd_staging select 127,331,65690,4294967440,107.94,23.07,true,"zzz","zzz","zzz","2023-03-11","2023-03-11 10:11:58.703308",71.54,"zzz" from staging limit 1;
+
+CREATE TABLE orc_ppd(t tinyint,
+           si smallint,
+           i int,
+           b bigint,
+           f float,
+           d double,
+           bo boolean,
+           s string,
+           c char(50),
+           v varchar(50),
+           da date,
+           ts timestamp,
+           dec decimal(4,2),
+           bin binary)
+STORED AS ORC tblproperties("orc.row.index.stride" = "1000", "orc.bloom.filter.columns"="*");
+
+insert overwrite table orc_ppd select t, si, i, b, f, d, bo, s, cast(s as char(50)), cast(s as varchar(50)), cast(ts as date), ts, dec, bin from orc_ppd_staging order by t, s;
+
+describe formatted orc_ppd;
+
+SET hive.fetch.task.conversion=none;
+SET hive.exec.post.hooks=org.apache.hadoop.hive.ql.hooks.PostExecTezSummaryPrinter;
+
+-- Row group statistics for column t:
+-- Entry 0: count: 994 hasNull: true min: -10 max: 54 sum: 26014 positions: 0,0,0,0,0,0,0
+-- Entry 1: count: 1000 hasNull: false min: 54 max: 118 sum: 86812 positions: 0,2,124,0,0,116,11
+-- Entry 2: count: 100 hasNull: false min: 118 max: 127 sum: 12151 positions: 0,4,119,0,0,244,19
+
+-- INPUT_RECORDS: 2100 (all row groups)
+select count(*) from orc_ppd where t > -100;
+
+-- 100% LLAP cache hit
+select count(*) from orc_ppd where t > -100;
+
+DROP TABLE staging;
+DROP TABLE orc_ppd_staging;
+DROP TABLE orc_ppd;

http://git-wip-us.apache.org/repos/asf/hive/blob/5b2c36a4/ql/src/test/results/clientpositive/llap/orc_llap.q.out
----------------------------------------------------------------------
diff --git a/ql/src/test/results/clientpositive/llap/orc_llap.q.out b/ql/src/test/results/clientpositive/llap/orc_llap.q.out
index e62fd92..8ef0c5c 100644
--- a/ql/src/test/results/clientpositive/llap/orc_llap.q.out
+++ b/ql/src/test/results/clientpositive/llap/orc_llap.q.out
@@ -81,7 +81,7 @@ POSTHOOK: type: QUERY
 POSTHOOK: Input: default@alltypesorc
 POSTHOOK: Output: default@cross_numbers
 POSTHOOK: Lineage: cross_numbers.i EXPRESSION [(alltypesorc)alltypesorc.FieldSchema(name:csmallint, type:smallint, comment:null), ]
-Warning: Shuffle Join MERGEJOIN[12][tables = [$hdt$_0, $hdt$_1]] in Stage 'Reducer 2' is a cross product
+Warning: Shuffle Join MERGEJOIN[10][tables = [$hdt$_0, $hdt$_1]] in Stage 'Reducer 2' is a cross product
 PREHOOK: query: insert into table orc_llap
 select ctinyint + i, csmallint + i, cint + i, cbigint + i, cfloat + i, cdouble + i, cstring1, cstring2, ctimestamp1, ctimestamp2, cboolean1, cboolean2
 from alltypesorc cross join cross_numbers
@@ -121,7 +121,7 @@ POSTHOOK: Output: default@orc_llap_small
 POSTHOOK: Lineage: orc_llap_small.cint SIMPLE [(alltypesorc)alltypesorc.FieldSchema(name:cint, type:int, comment:null), ]
 POSTHOOK: Lineage: orc_llap_small.csmallint SIMPLE [(alltypesorc)alltypesorc.FieldSchema(name:csmallint, type:smallint, comment:null), ]
 POSTHOOK: Lineage: orc_llap_small.ctinyint SIMPLE [(alltypesorc)alltypesorc.FieldSchema(name:ctinyint, type:tinyint, comment:null), ]
-Warning: Map Join MAPJOIN[16][bigTable=?] in task 'Map 1' is a cross product
+Warning: Map Join MAPJOIN[14][bigTable=?] in task 'Map 1' is a cross product
 PREHOOK: query: -- Cross join with no projection - do it on small table
 explain
 select count(1) from orc_llap_small y join orc_llap_small x
@@ -137,6 +137,7 @@ STAGE DEPENDENCIES:
 STAGE PLANS:
   Stage: Stage-1
     Tez
+#### A masked pattern was here ####
       Edges:
         Map 1 <- Map 3 (BROADCAST_EDGE)
         Reducer 2 <- Map 1 (SIMPLE_EDGE)
@@ -157,8 +158,7 @@ STAGE PLANS:
                         1 
                       input vertices:
                         1 Map 3
-                      Statistics: Num rows: 112 Data size: 0 Basic stats: PARTIAL Column stats: COMPLETE
-                      HybridGraceHashJoin: true
+                      Statistics: Num rows: 225 Data size: 0 Basic stats: PARTIAL Column stats: COMPLETE
                       Group By Operator
                         aggregations: count(1)
                         mode: hash
@@ -183,7 +183,7 @@ STAGE PLANS:
             Execution mode: vectorized, llap
             LLAP IO: all inputs
         Reducer 2 
-            Execution mode: vectorized, uber
+            Execution mode: vectorized, llap
             Reduce Operator Tree:
               Group By Operator
                 aggregations: count(VALUE._col0)
@@ -194,8 +194,8 @@ STAGE PLANS:
                   compressed: false
                   Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: COMPLETE
                   table:
-                      input format: org.apache.hadoop.mapred.TextInputFormat
-                      output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
+                      input format: org.apache.hadoop.mapred.SequenceFileInputFormat
+                      output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
                       serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
 
   Stage: Stage-0
@@ -204,7 +204,7 @@ STAGE PLANS:
       Processor Tree:
         ListSink
 
-Warning: Map Join MAPJOIN[16][bigTable=?] in task 'Map 1' is a cross product
+Warning: Map Join MAPJOIN[14][bigTable=?] in task 'Map 1' is a cross product
 PREHOOK: query: select count(1) from orc_llap_small y join orc_llap_small x
 PREHOOK: type: QUERY
 PREHOOK: Input: default@orc_llap_small
@@ -278,6 +278,9 @@ POSTHOOK: type: CREATETABLE_AS_SELECT
 POSTHOOK: Input: default@orc_llap
 POSTHOOK: Output: database:default
 POSTHOOK: Output: default@llap_temp_table
+POSTHOOK: Lineage: llap_temp_table.cbigint SIMPLE [(orc_llap)orc_llap.FieldSchema(name:cbigint, type:bigint, comment:null), ]
+POSTHOOK: Lineage: llap_temp_table.cint SIMPLE [(orc_llap)orc_llap.FieldSchema(name:cint, type:int, comment:null), ]
+POSTHOOK: Lineage: llap_temp_table.csmallint SIMPLE [(orc_llap)orc_llap.FieldSchema(name:csmallint, type:smallint, comment:null), ]
 PREHOOK: query: select sum(hash(*)) from llap_temp_table
 PREHOOK: type: QUERY
 PREHOOK: Input: default@llap_temp_table
@@ -331,6 +334,18 @@ POSTHOOK: type: CREATETABLE_AS_SELECT
 POSTHOOK: Input: default@orc_llap
 POSTHOOK: Output: database:default
 POSTHOOK: Output: default@llap_temp_table
+POSTHOOK: Lineage: llap_temp_table.cbigint SIMPLE [(orc_llap)orc_llap.FieldSchema(name:cbigint, type:bigint, comment:null), ]
+POSTHOOK: Lineage: llap_temp_table.cboolean1 SIMPLE [(orc_llap)orc_llap.FieldSchema(name:cboolean1, type:boolean, comment:null), ]
+POSTHOOK: Lineage: llap_temp_table.cboolean2 SIMPLE [(orc_llap)orc_llap.FieldSchema(name:cboolean2, type:boolean, comment:null), ]
+POSTHOOK: Lineage: llap_temp_table.cdouble SIMPLE [(orc_llap)orc_llap.FieldSchema(name:cdouble, type:double, comment:null), ]
+POSTHOOK: Lineage: llap_temp_table.cfloat SIMPLE [(orc_llap)orc_llap.FieldSchema(name:cfloat, type:float, comment:null), ]
+POSTHOOK: Lineage: llap_temp_table.cint SIMPLE [(orc_llap)orc_llap.FieldSchema(name:cint, type:int, comment:null), ]
+POSTHOOK: Lineage: llap_temp_table.csmallint SIMPLE [(orc_llap)orc_llap.FieldSchema(name:csmallint, type:smallint, comment:null), ]
+POSTHOOK: Lineage: llap_temp_table.cstring1 SIMPLE [(orc_llap)orc_llap.FieldSchema(name:cstring1, type:string, comment:null), ]
+POSTHOOK: Lineage: llap_temp_table.cstring2 SIMPLE [(orc_llap)orc_llap.FieldSchema(name:cstring2, type:string, comment:null), ]
+POSTHOOK: Lineage: llap_temp_table.ctimestamp1 SIMPLE [(orc_llap)orc_llap.FieldSchema(name:ctimestamp1, type:timestamp, comment:null), ]
+POSTHOOK: Lineage: llap_temp_table.ctimestamp2 SIMPLE [(orc_llap)orc_llap.FieldSchema(name:ctimestamp2, type:timestamp, comment:null), ]
+POSTHOOK: Lineage: llap_temp_table.ctinyint SIMPLE [(orc_llap)orc_llap.FieldSchema(name:ctinyint, type:tinyint, comment:null), ]
 PREHOOK: query: select sum(hash(*)) from llap_temp_table
 PREHOOK: type: QUERY
 PREHOOK: Input: default@llap_temp_table
@@ -384,6 +399,7 @@ POSTHOOK: type: CREATETABLE_AS_SELECT
 POSTHOOK: Input: default@orc_llap
 POSTHOOK: Output: database:default
 POSTHOOK: Output: default@llap_temp_table
+POSTHOOK: Lineage: llap_temp_table.cstring2 SIMPLE [(orc_llap)orc_llap.FieldSchema(name:cstring2, type:string, comment:null), ]
 PREHOOK: query: select sum(hash(*)) from llap_temp_table
 PREHOOK: type: QUERY
 PREHOOK: Input: default@llap_temp_table
@@ -414,6 +430,7 @@ STAGE DEPENDENCIES:
 STAGE PLANS:
   Stage: Stage-1
     Tez
+#### A masked pattern was here ####
       Edges:
         Reducer 2 <- Map 1 (SIMPLE_EDGE)
 #### A masked pattern was here ####
@@ -442,7 +459,7 @@ STAGE PLANS:
             Execution mode: vectorized, llap
             LLAP IO: all inputs
         Reducer 2 
-            Execution mode: vectorized, uber
+            Execution mode: vectorized, llap
             Reduce Operator Tree:
               Group By Operator
                 aggregations: count(VALUE._col0)
@@ -454,8 +471,8 @@ STAGE PLANS:
                   compressed: false
                   Statistics: Num rows: 61440 Data size: 14539970 Basic stats: COMPLETE Column stats: NONE
                   table:
-                      input format: org.apache.hadoop.mapred.TextInputFormat
-                      output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
+                      input format: org.apache.hadoop.mapred.SequenceFileInputFormat
+                      output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
                       serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
 
   Stage: Stage-0
@@ -476,6 +493,9 @@ POSTHOOK: type: CREATETABLE_AS_SELECT
 POSTHOOK: Input: default@orc_llap
 POSTHOOK: Output: database:default
 POSTHOOK: Output: default@llap_temp_table
+POSTHOOK: Lineage: llap_temp_table.c2 EXPRESSION [(orc_llap)orc_llap.null, ]
+POSTHOOK: Lineage: llap_temp_table.cstring1 SIMPLE [(orc_llap)orc_llap.FieldSchema(name:cstring1, type:string, comment:null), ]
+POSTHOOK: Lineage: llap_temp_table.cstring2 SIMPLE [(orc_llap)orc_llap.FieldSchema(name:cstring2, type:string, comment:null), ]
 PREHOOK: query: select sum(hash(*)) from llap_temp_table
 PREHOOK: type: QUERY
 PREHOOK: Input: default@llap_temp_table
@@ -506,8 +526,9 @@ STAGE DEPENDENCIES:
 STAGE PLANS:
   Stage: Stage-1
     Tez
+#### A masked pattern was here ####
       Edges:
-        Map 1 <- Map 2 (BROADCAST_EDGE)
+        Reducer 2 <- Map 1 (SIMPLE_EDGE), Map 3 (SIMPLE_EDGE)
 #### A masked pattern was here ####
       Vertices:
         Map 1 
@@ -518,48 +539,62 @@ STAGE PLANS:
                   Statistics: Num rows: 122880 Data size: 29079940 Basic stats: COMPLETE Column stats: NONE
                   Filter Operator
                     predicate: (csmallint is not null and cbigint is not null) (type: boolean)
-                    Statistics: Num rows: 30720 Data size: 7269985 Basic stats: COMPLETE Column stats: NONE
-                    Map Join Operator
-                      condition map:
-                           Inner Join 0 to 1
-                      keys:
-                        0 csmallint (type: smallint)
-                        1 csmallint (type: smallint)
-                      outputColumnNames: _col6, _col22
-                      input vertices:
-                        1 Map 2
-                      Statistics: Num rows: 33792 Data size: 7996983 Basic stats: COMPLETE Column stats: NONE
-                      HybridGraceHashJoin: true
-                      Select Operator
-                        expressions: _col6 (type: string), _col22 (type: string)
-                        outputColumnNames: _col0, _col1
-                        Statistics: Num rows: 33792 Data size: 7996983 Basic stats: COMPLETE Column stats: NONE
-                        File Output Operator
-                          compressed: false
-                          Statistics: Num rows: 33792 Data size: 7996983 Basic stats: COMPLETE Column stats: NONE
-                          table:
-                              input format: org.apache.hadoop.mapred.TextInputFormat
-                              output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
-                              serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
+                    Statistics: Num rows: 122880 Data size: 29079940 Basic stats: COMPLETE Column stats: NONE
+                    Select Operator
+                      expressions: csmallint (type: smallint), cstring1 (type: string)
+                      outputColumnNames: _col0, _col2
+                      Statistics: Num rows: 122880 Data size: 29079940 Basic stats: COMPLETE Column stats: NONE
+                      Reduce Output Operator
+                        key expressions: _col0 (type: smallint)
+                        sort order: +
+                        Map-reduce partition columns: _col0 (type: smallint)
+                        Statistics: Num rows: 122880 Data size: 29079940 Basic stats: COMPLETE Column stats: NONE
+                        value expressions: _col2 (type: string)
             Execution mode: vectorized, llap
             LLAP IO: all inputs
-        Map 2 
+        Map 3 
             Map Operator Tree:
                 TableScan
-                  alias: o2
+                  alias: o1
                   filterExpr: (csmallint is not null and cbigint is not null) (type: boolean)
                   Statistics: Num rows: 122880 Data size: 29079940 Basic stats: COMPLETE Column stats: NONE
                   Filter Operator
                     predicate: (csmallint is not null and cbigint is not null) (type: boolean)
-                    Statistics: Num rows: 30720 Data size: 7269985 Basic stats: COMPLETE Column stats: NONE
-                    Reduce Output Operator
-                      key expressions: csmallint (type: smallint)
-                      sort order: +
-                      Map-reduce partition columns: csmallint (type: smallint)
-                      Statistics: Num rows: 30720 Data size: 7269985 Basic stats: COMPLETE Column stats: NONE
-                      value expressions: cstring2 (type: string)
+                    Statistics: Num rows: 122880 Data size: 29079940 Basic stats: COMPLETE Column stats: NONE
+                    Select Operator
+                      expressions: csmallint (type: smallint), cstring2 (type: string)
+                      outputColumnNames: _col0, _col2
+                      Statistics: Num rows: 122880 Data size: 29079940 Basic stats: COMPLETE Column stats: NONE
+                      Reduce Output Operator
+                        key expressions: _col0 (type: smallint)
+                        sort order: +
+                        Map-reduce partition columns: _col0 (type: smallint)
+                        Statistics: Num rows: 122880 Data size: 29079940 Basic stats: COMPLETE Column stats: NONE
+                        value expressions: _col2 (type: string)
             Execution mode: vectorized, llap
             LLAP IO: all inputs
+        Reducer 2 
+            Execution mode: llap
+            Reduce Operator Tree:
+              Merge Join Operator
+                condition map:
+                     Inner Join 0 to 1
+                keys:
+                  0 _col0 (type: smallint)
+                  1 _col0 (type: smallint)
+                outputColumnNames: _col2, _col5
+                Statistics: Num rows: 135168 Data size: 31987934 Basic stats: COMPLETE Column stats: NONE
+                Select Operator
+                  expressions: _col2 (type: string), _col5 (type: string)
+                  outputColumnNames: _col0, _col1
+                  Statistics: Num rows: 135168 Data size: 31987934 Basic stats: COMPLETE Column stats: NONE
+                  File Output Operator
+                    compressed: false
+                    Statistics: Num rows: 135168 Data size: 31987934 Basic stats: COMPLETE Column stats: NONE
+                    table:
+                        input format: org.apache.hadoop.mapred.SequenceFileInputFormat
+                        output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
+                        serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
 
   Stage: Stage-0
     Fetch Operator
@@ -579,6 +614,8 @@ POSTHOOK: type: CREATETABLE_AS_SELECT
 POSTHOOK: Input: default@orc_llap
 POSTHOOK: Output: database:default
 POSTHOOK: Output: default@llap_temp_table
+POSTHOOK: Lineage: llap_temp_table.cstring1 SIMPLE [(orc_llap)o1.FieldSchema(name:cstring1, type:string, comment:null), ]
+POSTHOOK: Lineage: llap_temp_table.cstring2 SIMPLE [(orc_llap)o1.FieldSchema(name:cstring2, type:string, comment:null), ]
 PREHOOK: query: select sum(hash(*)) from llap_temp_table
 PREHOOK: type: QUERY
 PREHOOK: Input: default@llap_temp_table
@@ -588,7 +625,7 @@ POSTHOOK: type: QUERY
 POSTHOOK: Input: default@llap_temp_table
 #### A masked pattern was here ####
 -735462183586256
-Warning: Map Join MAPJOIN[12][bigTable=?] in task 'Map 1' is a cross product
+Warning: Map Join MAPJOIN[10][bigTable=?] in task 'Map 1' is a cross product
 PREHOOK: query: -- multi-stripe test
 insert into table orc_llap
 select ctinyint + i, csmallint + i, cint + i, cbigint + i, cfloat + i, cdouble + i, cstring1, cstring2, ctimestamp1, ctimestamp2, cboolean1, cboolean2
@@ -669,6 +706,9 @@ POSTHOOK: type: CREATETABLE_AS_SELECT
 POSTHOOK: Input: default@orc_llap
 POSTHOOK: Output: database:default
 POSTHOOK: Output: default@llap_temp_table
+POSTHOOK: Lineage: llap_temp_table.cbigint SIMPLE [(orc_llap)orc_llap.FieldSchema(name:cbigint, type:bigint, comment:null), ]
+POSTHOOK: Lineage: llap_temp_table.cint SIMPLE [(orc_llap)orc_llap.FieldSchema(name:cint, type:int, comment:null), ]
+POSTHOOK: Lineage: llap_temp_table.csmallint SIMPLE [(orc_llap)orc_llap.FieldSchema(name:csmallint, type:smallint, comment:null), ]
 PREHOOK: query: select sum(hash(*)) from llap_temp_table
 PREHOOK: type: QUERY
 PREHOOK: Input: default@llap_temp_table
@@ -722,6 +762,18 @@ POSTHOOK: type: CREATETABLE_AS_SELECT
 POSTHOOK: Input: default@orc_llap
 POSTHOOK: Output: database:default
 POSTHOOK: Output: default@llap_temp_table
+POSTHOOK: Lineage: llap_temp_table.cbigint SIMPLE [(orc_llap)orc_llap.FieldSchema(name:cbigint, type:bigint, comment:null), ]
+POSTHOOK: Lineage: llap_temp_table.cboolean1 SIMPLE [(orc_llap)orc_llap.FieldSchema(name:cboolean1, type:boolean, comment:null), ]
+POSTHOOK: Lineage: llap_temp_table.cboolean2 SIMPLE [(orc_llap)orc_llap.FieldSchema(name:cboolean2, type:boolean, comment:null), ]
+POSTHOOK: Lineage: llap_temp_table.cdouble SIMPLE [(orc_llap)orc_llap.FieldSchema(name:cdouble, type:double, comment:null), ]
+POSTHOOK: Lineage: llap_temp_table.cfloat SIMPLE [(orc_llap)orc_llap.FieldSchema(name:cfloat, type:float, comment:null), ]
+POSTHOOK: Lineage: llap_temp_table.cint SIMPLE [(orc_llap)orc_llap.FieldSchema(name:cint, type:int, comment:null), ]
+POSTHOOK: Lineage: llap_temp_table.csmallint SIMPLE [(orc_llap)orc_llap.FieldSchema(name:csmallint, type:smallint, comment:null), ]
+POSTHOOK: Lineage: llap_temp_table.cstring1 SIMPLE [(orc_llap)orc_llap.FieldSchema(name:cstring1, type:string, comment:null), ]
+POSTHOOK: Lineage: llap_temp_table.cstring2 SIMPLE [(orc_llap)orc_llap.FieldSchema(name:cstring2, type:string, comment:null), ]
+POSTHOOK: Lineage: llap_temp_table.ctimestamp1 SIMPLE [(orc_llap)orc_llap.FieldSchema(name:ctimestamp1, type:timestamp, comment:null), ]
+POSTHOOK: Lineage: llap_temp_table.ctimestamp2 SIMPLE [(orc_llap)orc_llap.FieldSchema(name:ctimestamp2, type:timestamp, comment:null), ]
+POSTHOOK: Lineage: llap_temp_table.ctinyint SIMPLE [(orc_llap)orc_llap.FieldSchema(name:ctinyint, type:tinyint, comment:null), ]
 PREHOOK: query: select sum(hash(*)) from llap_temp_table
 PREHOOK: type: QUERY
 PREHOOK: Input: default@llap_temp_table
@@ -775,6 +827,7 @@ POSTHOOK: type: CREATETABLE_AS_SELECT
 POSTHOOK: Input: default@orc_llap
 POSTHOOK: Output: database:default
 POSTHOOK: Output: default@llap_temp_table
+POSTHOOK: Lineage: llap_temp_table.cstring2 SIMPLE [(orc_llap)orc_llap.FieldSchema(name:cstring2, type:string, comment:null), ]
 PREHOOK: query: select sum(hash(*)) from llap_temp_table
 PREHOOK: type: QUERY
 PREHOOK: Input: default@llap_temp_table
@@ -805,6 +858,7 @@ STAGE DEPENDENCIES:
 STAGE PLANS:
   Stage: Stage-1
     Tez
+#### A masked pattern was here ####
       Edges:
         Reducer 2 <- Map 1 (SIMPLE_EDGE)
 #### A masked pattern was here ####
@@ -813,40 +867,40 @@ STAGE PLANS:
             Map Operator Tree:
                 TableScan
                   alias: orc_llap
-                  Statistics: Num rows: 8014 Data size: 1602939 Basic stats: COMPLETE Column stats: NONE
+                  Statistics: Num rows: 245760 Data size: 58159880 Basic stats: COMPLETE Column stats: NONE
                   Select Operator
                     expressions: cstring1 (type: string), cstring2 (type: string)
                     outputColumnNames: cstring1, cstring2
-                    Statistics: Num rows: 8014 Data size: 1602939 Basic stats: COMPLETE Column stats: NONE
+                    Statistics: Num rows: 245760 Data size: 58159880 Basic stats: COMPLETE Column stats: NONE
                     Group By Operator
                       aggregations: count()
                       keys: cstring1 (type: string), cstring2 (type: string)
                       mode: hash
                       outputColumnNames: _col0, _col1, _col2
-                      Statistics: Num rows: 8014 Data size: 1602939 Basic stats: COMPLETE Column stats: NONE
+                      Statistics: Num rows: 245760 Data size: 58159880 Basic stats: COMPLETE Column stats: NONE
                       Reduce Output Operator
                         key expressions: _col0 (type: string), _col1 (type: string)
                         sort order: ++
                         Map-reduce partition columns: _col0 (type: string), _col1 (type: string)
-                        Statistics: Num rows: 8014 Data size: 1602939 Basic stats: COMPLETE Column stats: NONE
+                        Statistics: Num rows: 245760 Data size: 58159880 Basic stats: COMPLETE Column stats: NONE
                         value expressions: _col2 (type: bigint)
             Execution mode: vectorized, llap
             LLAP IO: all inputs
         Reducer 2 
-            Execution mode: vectorized, uber
+            Execution mode: vectorized, llap
             Reduce Operator Tree:
               Group By Operator
                 aggregations: count(VALUE._col0)
                 keys: KEY._col0 (type: string), KEY._col1 (type: string)
                 mode: mergepartial
                 outputColumnNames: _col0, _col1, _col2
-                Statistics: Num rows: 4007 Data size: 801469 Basic stats: COMPLETE Column stats: NONE
+                Statistics: Num rows: 122880 Data size: 29079940 Basic stats: COMPLETE Column stats: NONE
                 File Output Operator
                   compressed: false
-                  Statistics: Num rows: 4007 Data size: 801469 Basic stats: COMPLETE Column stats: NONE
+                  Statistics: Num rows: 122880 Data size: 29079940 Basic stats: COMPLETE Column stats: NONE
                   table:
-                      input format: org.apache.hadoop.mapred.TextInputFormat
-                      output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
+                      input format: org.apache.hadoop.mapred.SequenceFileInputFormat
+                      output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
                       serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
 
   Stage: Stage-0
@@ -867,6 +921,9 @@ POSTHOOK: type: CREATETABLE_AS_SELECT
 POSTHOOK: Input: default@orc_llap
 POSTHOOK: Output: database:default
 POSTHOOK: Output: default@llap_temp_table
+POSTHOOK: Lineage: llap_temp_table.c2 EXPRESSION [(orc_llap)orc_llap.null, ]
+POSTHOOK: Lineage: llap_temp_table.cstring1 SIMPLE [(orc_llap)orc_llap.FieldSchema(name:cstring1, type:string, comment:null), ]
+POSTHOOK: Lineage: llap_temp_table.cstring2 SIMPLE [(orc_llap)orc_llap.FieldSchema(name:cstring2, type:string, comment:null), ]
 PREHOOK: query: select sum(hash(*)) from llap_temp_table
 PREHOOK: type: QUERY
 PREHOOK: Input: default@llap_temp_table
@@ -897,8 +954,9 @@ STAGE DEPENDENCIES:
 STAGE PLANS:
   Stage: Stage-1
     Tez
+#### A masked pattern was here ####
       Edges:
-        Map 1 <- Map 2 (BROADCAST_EDGE)
+        Reducer 2 <- Map 1 (SIMPLE_EDGE), Map 3 (SIMPLE_EDGE)
 #### A masked pattern was here ####
       Vertices:
         Map 1 
@@ -906,51 +964,65 @@ STAGE PLANS:
                 TableScan
                   alias: o1
                   filterExpr: (csmallint is not null and cbigint is not null) (type: boolean)
-                  Statistics: Num rows: 14311 Data size: 1602939 Basic stats: COMPLETE Column stats: NONE
+                  Statistics: Num rows: 245760 Data size: 58159880 Basic stats: COMPLETE Column stats: NONE
                   Filter Operator
                     predicate: (csmallint is not null and cbigint is not null) (type: boolean)
-                    Statistics: Num rows: 3578 Data size: 400762 Basic stats: COMPLETE Column stats: NONE
-                    Map Join Operator
-                      condition map:
-                           Inner Join 0 to 1
-                      keys:
-                        0 csmallint (type: smallint)
-                        1 csmallint (type: smallint)
-                      outputColumnNames: _col6, _col22
-                      input vertices:
-                        1 Map 2
-                      Statistics: Num rows: 3935 Data size: 440838 Basic stats: COMPLETE Column stats: NONE
-                      HybridGraceHashJoin: true
-                      Select Operator
-                        expressions: _col6 (type: string), _col22 (type: string)
-                        outputColumnNames: _col0, _col1
-                        Statistics: Num rows: 3935 Data size: 440838 Basic stats: COMPLETE Column stats: NONE
-                        File Output Operator
-                          compressed: false
-                          Statistics: Num rows: 3935 Data size: 440838 Basic stats: COMPLETE Column stats: NONE
-                          table:
-                              input format: org.apache.hadoop.mapred.TextInputFormat
-                              output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
-                              serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
+                    Statistics: Num rows: 245760 Data size: 58159880 Basic stats: COMPLETE Column stats: NONE
+                    Select Operator
+                      expressions: csmallint (type: smallint), cstring1 (type: string)
+                      outputColumnNames: _col0, _col2
+                      Statistics: Num rows: 245760 Data size: 58159880 Basic stats: COMPLETE Column stats: NONE
+                      Reduce Output Operator
+                        key expressions: _col0 (type: smallint)
+                        sort order: +
+                        Map-reduce partition columns: _col0 (type: smallint)
+                        Statistics: Num rows: 245760 Data size: 58159880 Basic stats: COMPLETE Column stats: NONE
+                        value expressions: _col2 (type: string)
             Execution mode: vectorized, llap
             LLAP IO: all inputs
-        Map 2 
+        Map 3 
             Map Operator Tree:
                 TableScan
-                  alias: o2
+                  alias: o1
                   filterExpr: (csmallint is not null and cbigint is not null) (type: boolean)
-                  Statistics: Num rows: 14311 Data size: 1602939 Basic stats: COMPLETE Column stats: NONE
+                  Statistics: Num rows: 245760 Data size: 58159880 Basic stats: COMPLETE Column stats: NONE
                   Filter Operator
                     predicate: (csmallint is not null and cbigint is not null) (type: boolean)
-                    Statistics: Num rows: 3578 Data size: 400762 Basic stats: COMPLETE Column stats: NONE
-                    Reduce Output Operator
-                      key expressions: csmallint (type: smallint)
-                      sort order: +
-                      Map-reduce partition columns: csmallint (type: smallint)
-                      Statistics: Num rows: 3578 Data size: 400762 Basic stats: COMPLETE Column stats: NONE
-                      value expressions: cstring2 (type: string)
+                    Statistics: Num rows: 245760 Data size: 58159880 Basic stats: COMPLETE Column stats: NONE
+                    Select Operator
+                      expressions: csmallint (type: smallint), cstring2 (type: string)
+                      outputColumnNames: _col0, _col2
+                      Statistics: Num rows: 245760 Data size: 58159880 Basic stats: COMPLETE Column stats: NONE
+                      Reduce Output Operator
+                        key expressions: _col0 (type: smallint)
+                        sort order: +
+                        Map-reduce partition columns: _col0 (type: smallint)
+                        Statistics: Num rows: 245760 Data size: 58159880 Basic stats: COMPLETE Column stats: NONE
+                        value expressions: _col2 (type: string)
             Execution mode: vectorized, llap
             LLAP IO: all inputs
+        Reducer 2 
+            Execution mode: llap
+            Reduce Operator Tree:
+              Merge Join Operator
+                condition map:
+                     Inner Join 0 to 1
+                keys:
+                  0 _col0 (type: smallint)
+                  1 _col0 (type: smallint)
+                outputColumnNames: _col2, _col5
+                Statistics: Num rows: 270336 Data size: 63975869 Basic stats: COMPLETE Column stats: NONE
+                Select Operator
+                  expressions: _col2 (type: string), _col5 (type: string)
+                  outputColumnNames: _col0, _col1
+                  Statistics: Num rows: 270336 Data size: 63975869 Basic stats: COMPLETE Column stats: NONE
+                  File Output Operator
+                    compressed: false
+                    Statistics: Num rows: 270336 Data size: 63975869 Basic stats: COMPLETE Column stats: NONE
+                    table:
+                        input format: org.apache.hadoop.mapred.SequenceFileInputFormat
+                        output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
+                        serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
 
   Stage: Stage-0
     Fetch Operator
@@ -970,6 +1042,8 @@ POSTHOOK: type: CREATETABLE_AS_SELECT
 POSTHOOK: Input: default@orc_llap
 POSTHOOK: Output: database:default
 POSTHOOK: Output: default@llap_temp_table
+POSTHOOK: Lineage: llap_temp_table.cstring1 SIMPLE [(orc_llap)o1.FieldSchema(name:cstring1, type:string, comment:null), ]
+POSTHOOK: Lineage: llap_temp_table.cstring2 SIMPLE [(orc_llap)o1.FieldSchema(name:cstring2, type:string, comment:null), ]
 PREHOOK: query: select sum(hash(*)) from llap_temp_table
 PREHOOK: type: QUERY
 PREHOOK: Input: default@llap_temp_table


Mime
View raw message