hive-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From prasan...@apache.org
Subject svn commit: r1664774 - in /hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap: counters/ daemon/impl/ io/api/impl/ io/decode/ io/encoded/
Date Sat, 07 Mar 2015 01:16:49 GMT
Author: prasanthj
Date: Sat Mar  7 01:16:49 2015
New Revision: 1664774

URL: http://svn.apache.org/r1664774
Log:
HIVE-9888: LLAP: Provide per query counters (Prasanth Jayachandran)

Added:
    hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/counters/
    hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/counters/QueryFragmentCounters.java
Modified:
    hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java
    hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapInputFormat.java
    hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/ColumnVectorProducer.java
    hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcColumnVectorProducer.java
    hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcEncodedDataConsumer.java
    hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java

Added: hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/counters/QueryFragmentCounters.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/counters/QueryFragmentCounters.java?rev=1664774&view=auto
==============================================================================
--- hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/counters/QueryFragmentCounters.java
(added)
+++ hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/counters/QueryFragmentCounters.java
Sat Mar  7 01:16:49 2015
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.llap.counters;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * Per query counters.
+ */
+public class QueryFragmentCounters {
+
+  public static enum Counter {
+    NUM_VECTOR_BATCHES,
+    NUM_DECODED_BATCHES,
+    SELECTED_ROWGROUPS,
+    NUM_ERRORS,
+    ROWS_EMITTED
+  }
+
+  private String appId;
+  private Map<String, Long> counterMap;
+
+  public QueryFragmentCounters() {
+    this("Not Specified");
+  }
+
+  public QueryFragmentCounters(String applicationId) {
+    this.appId = applicationId;
+    this.counterMap = new ConcurrentHashMap<>();
+  }
+
+  public void incrCounter(Counter counter) {
+    incrCounter(counter, 1);
+  }
+
+  public void incrCounter(Counter counter, long delta) {
+    if (counterMap.containsKey(counter.name())) {
+      long val = counterMap.get(counter.name());
+      counterMap.put(counter.name(), val + delta);
+    } else {
+      setCounter(counter, delta);
+    }
+  }
+
+  public void setCounter(Counter counter, long value) {
+    counterMap.put(counter.name(), value);
+  }
+
+  @Override
+  public String toString() {
+    return "ApplicationId: " + appId + " Counters: " + counterMap;
+  }
+}

Modified: hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java?rev=1664774&r1=1664773&r2=1664774&view=diff
==============================================================================
--- hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java
(original)
+++ hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java
Sat Mar  7 01:16:49 2015
@@ -167,7 +167,6 @@ public class ContainerRunnerImpl extends
       LOG.info("DEBUG: Registering request with the ShuffleHandler");
       ShuffleHandler.get().registerApplication(request.getApplicationIdString(), jobToken,
request.getUser());
 
-
       ContainerRunnerCallable callable = new ContainerRunnerCallable(request, new Configuration(getConfig()),
           new ExecutionContextImpl(localAddress.get().getHostName()), env, localDirs,
           workingDir, credentials, memoryPerExecutor);

Modified: hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapInputFormat.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapInputFormat.java?rev=1664774&r1=1664773&r2=1664774&view=diff
==============================================================================
--- hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapInputFormat.java
(original)
+++ hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapInputFormat.java
Sat Mar  7 01:16:49 2015
@@ -26,6 +26,7 @@ import java.util.List;
 import org.apache.hadoop.hive.llap.Consumer;
 import org.apache.hadoop.hive.llap.ConsumerFeedback;
 import org.apache.hadoop.hive.llap.DebugUtils;
+import org.apache.hadoop.hive.llap.counters.QueryFragmentCounters;
 import org.apache.hadoop.hive.llap.io.decode.ColumnVectorProducer;
 import org.apache.hadoop.hive.llap.io.decode.ReadPipeline;
 import org.apache.hadoop.hive.ql.exec.Utilities;
@@ -110,12 +111,14 @@ public class LlapInputFormat
     /** Vector that is currently being processed by our user. */
     private boolean isDone = false, isClosed = false;
     private ConsumerFeedback<ColumnVectorBatch> feedback;
+    private final QueryFragmentCounters counters;
 
     public LlapRecordReader(JobConf job, FileSplit split, List<Integer> includedCols)
{
       this.split = split;
       this.columnIds = includedCols;
       this.sarg = SearchArgumentFactory.createFromConf(job);
       this.columnNames = ColumnProjectionUtils.getReadColumnNames(job);
+      this.counters = new QueryFragmentCounters();
       try {
         rbCtx = new VectorizedRowBatchCtx();
         rbCtx.init(job, split);
@@ -175,7 +178,7 @@ public class LlapInputFormat
 
     private void startRead() {
       // Create the consumer of encoded data; it will coordinate decoding to CVBs.
-      ReadPipeline rp = cvp.createReadPipeline(this, split, columnIds, sarg, columnNames);
+      ReadPipeline rp = cvp.createReadPipeline(this, split, columnIds, sarg, columnNames,
counters);
       feedback = rp;
       ListenableFuture<Void> future = executor.submit(rp.getReadCallable());
       // TODO: we should NOT do this thing with handler. Reader needs to do cleanup in most
cases.
@@ -235,6 +238,7 @@ public class LlapInputFormat
         LlapIoImpl.LOG.info("close called; closed " + isClosed + ", done " + isDone
             + ", err " + pendingError + ", pending " + pendingData.size());
       }
+      LlapIoImpl.LOG.info("QueryFragmentCounters: " + counters);
       feedback.stop();
       rethrowErrorIfAny();
     }
@@ -253,6 +257,7 @@ public class LlapInputFormat
         LlapIoImpl.LOG.info("setDone called; closed " + isClosed
           + ", done " + isDone + ", err " + pendingError + ", pending " + pendingData.size());
       }
+      LlapIoImpl.LOG.info("DONE: QueryFragmentCounters: " + counters);
       synchronized (pendingData) {
         isDone = true;
         pendingData.notifyAll();
@@ -276,9 +281,11 @@ public class LlapInputFormat
 
     @Override
     public void setError(Throwable t) {
+      counters.incrCounter(QueryFragmentCounters.Counter.NUM_ERRORS);
       LlapIoImpl.LOG.info("setError called; closed " + isClosed
         + ", done " + isDone + ", err " + pendingError + ", pending " + pendingData.size());
       assert t != null;
+      LlapIoImpl.LOG.info("ERROR: QueryFragmentCounters: " + counters);
       synchronized (pendingData) {
         pendingError = t;
         pendingData.notifyAll();

Modified: hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/ColumnVectorProducer.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/ColumnVectorProducer.java?rev=1664774&r1=1664773&r2=1664774&view=diff
==============================================================================
--- hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/ColumnVectorProducer.java
(original)
+++ hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/ColumnVectorProducer.java
Sat Mar  7 01:16:49 2015
@@ -21,6 +21,7 @@ package org.apache.hadoop.hive.llap.io.d
 import java.util.List;
 
 import org.apache.hadoop.hive.llap.Consumer;
+import org.apache.hadoop.hive.llap.counters.QueryFragmentCounters;
 import org.apache.hadoop.hive.llap.io.api.impl.ColumnVectorBatch;
 import org.apache.hadoop.hive.ql.io.sarg.SearchArgument;
 import org.apache.hadoop.mapred.InputSplit;
@@ -30,5 +31,6 @@ import org.apache.hadoop.mapred.InputSpl
  */
 public interface ColumnVectorProducer {
   ReadPipeline createReadPipeline(Consumer<ColumnVectorBatch> consumer, InputSplit
split,
-      List<Integer> columnIds, SearchArgument sarg, String[] columnNames);
+      List<Integer> columnIds, SearchArgument sarg, String[] columnNames,
+      QueryFragmentCounters counters);
 }
\ No newline at end of file

Modified: hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcColumnVectorProducer.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcColumnVectorProducer.java?rev=1664774&r1=1664773&r2=1664774&view=diff
==============================================================================
--- hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcColumnVectorProducer.java
(original)
+++ hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcColumnVectorProducer.java
Sat Mar  7 01:16:49 2015
@@ -25,6 +25,7 @@ import org.apache.hadoop.hive.conf.HiveC
 import org.apache.hadoop.hive.llap.Consumer;
 import org.apache.hadoop.hive.llap.cache.Cache;
 import org.apache.hadoop.hive.llap.cache.LowLevelCacheImpl;
+import org.apache.hadoop.hive.llap.counters.QueryFragmentCounters;
 import org.apache.hadoop.hive.llap.io.api.cache.LowLevelCache;
 import org.apache.hadoop.hive.llap.io.api.impl.ColumnVectorBatch;
 import org.apache.hadoop.hive.llap.io.api.impl.LlapIoImpl;
@@ -43,7 +44,7 @@ public class OrcColumnVectorProducer imp
   private final Configuration conf;
   private boolean _skipCorrupt; // TODO: get rid of this
   private LlapDaemonCacheMetrics metrics;
- 
+
   public OrcColumnVectorProducer(OrcMetadataCache metadataCache,
       LowLevelCacheImpl lowLevelCache, Cache<OrcCacheKey> cache, Configuration conf,
       LlapDaemonCacheMetrics metrics) {
@@ -62,12 +63,13 @@ public class OrcColumnVectorProducer imp
   @Override
   public ReadPipeline createReadPipeline(
       Consumer<ColumnVectorBatch> consumer, InputSplit split,
-      List<Integer> columnIds, SearchArgument sarg, String[] columnNames) {
+      List<Integer> columnIds, SearchArgument sarg, String[] columnNames,
+      QueryFragmentCounters counters) {
     metrics.incrCacheReadRequests();
-    OrcEncodedDataConsumer edc = new OrcEncodedDataConsumer(
-        consumer, columnIds.size(), _skipCorrupt);
-    OrcEncodedDataReader reader = new OrcEncodedDataReader(
-        lowLevelCache, cache, metadataCache, conf, split, columnIds, sarg, columnNames, edc);
+    OrcEncodedDataConsumer edc = new OrcEncodedDataConsumer(consumer, columnIds.size(),
+        _skipCorrupt, counters);
+    OrcEncodedDataReader reader = new OrcEncodedDataReader(lowLevelCache, cache, metadataCache,
+        conf, split, columnIds, sarg, columnNames, edc, counters);
     edc.init(reader, reader);
     return edc;
   }

Modified: hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcEncodedDataConsumer.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcEncodedDataConsumer.java?rev=1664774&r1=1664773&r2=1664774&view=diff
==============================================================================
--- hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcEncodedDataConsumer.java
(original)
+++ hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcEncodedDataConsumer.java
Sat Mar  7 01:16:49 2015
@@ -20,9 +20,9 @@ package org.apache.hadoop.hive.llap.io.d
 import java.io.IOException;
 
 import org.apache.hadoop.hive.llap.Consumer;
+import org.apache.hadoop.hive.llap.counters.QueryFragmentCounters;
 import org.apache.hadoop.hive.llap.io.api.EncodedColumnBatch;
 import org.apache.hadoop.hive.llap.io.api.impl.ColumnVectorBatch;
-import org.apache.hadoop.hive.llap.io.api.impl.LlapIoImpl;
 import org.apache.hadoop.hive.llap.io.api.orc.OrcBatchKey;
 import org.apache.hadoop.hive.llap.io.decode.orc.stream.StreamUtils;
 import org.apache.hadoop.hive.llap.io.decode.orc.stream.readers.BinaryStreamReader;
@@ -52,11 +52,14 @@ public class OrcEncodedDataConsumer exte
   private OrcFileMetadata fileMetadata; // We assume one request is only for one file.
   private OrcStripeMetadata[] stripes;
   private final boolean skipCorrupt; // TODO: get rid of this
+  private final QueryFragmentCounters counters;
 
   public OrcEncodedDataConsumer(
-      Consumer<ColumnVectorBatch> consumer, int colCount, boolean skipCorrupt) {
+      Consumer<ColumnVectorBatch> consumer, int colCount, boolean skipCorrupt,
+      QueryFragmentCounters counters) {
     super(consumer, colCount);
     this.skipCorrupt = skipCorrupt;
+    this.counters = counters;
   }
 
   public void setFileMetadata(OrcFileMetadata f) {
@@ -109,7 +112,10 @@ public class OrcEncodedDataConsumer exte
 
         // we are done reading a batch, send it to consumer for processing
         downstreamConsumer.consumeData(cvb);
+        counters.incrCounter(QueryFragmentCounters.Counter.ROWS_EMITTED, batchSize);
       }
+      counters.incrCounter(QueryFragmentCounters.Counter.NUM_VECTOR_BATCHES, maxBatchesRG);
+      counters.incrCounter(QueryFragmentCounters.Counter.NUM_DECODED_BATCHES);
     } catch (IOException e) {
       // Caller will return the batch.
       downstreamConsumer.setError(e);

Modified: hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java?rev=1664774&r1=1664773&r2=1664774&view=diff
==============================================================================
--- hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java
(original)
+++ hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java
Sat Mar  7 01:16:49 2015
@@ -8,13 +8,13 @@ import java.util.List;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.common.CallableWithNdc;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.llap.Consumer;
 import org.apache.hadoop.hive.llap.ConsumerFeedback;
 import org.apache.hadoop.hive.llap.DebugUtils;
 import org.apache.hadoop.hive.llap.cache.Cache;
+import org.apache.hadoop.hive.llap.counters.QueryFragmentCounters;
 import org.apache.hadoop.hive.llap.io.api.EncodedColumnBatch;
 import org.apache.hadoop.hive.llap.io.api.EncodedColumnBatch.StreamBuffer;
 import org.apache.hadoop.hive.llap.io.api.cache.LlapMemoryBuffer;
@@ -30,12 +30,12 @@ import org.apache.hadoop.hive.ql.io.orc.
 import org.apache.hadoop.hive.ql.io.orc.MetadataReader;
 import org.apache.hadoop.hive.ql.io.orc.OrcFile;
 import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat;
+import org.apache.hadoop.hive.ql.io.orc.OrcProto.Type;
 import org.apache.hadoop.hive.ql.io.orc.Reader;
 import org.apache.hadoop.hive.ql.io.orc.RecordReaderImpl;
+import org.apache.hadoop.hive.ql.io.orc.RecordReaderImpl.SargApplier;
 import org.apache.hadoop.hive.ql.io.orc.RecordReaderUtils;
 import org.apache.hadoop.hive.ql.io.orc.StripeInformation;
-import org.apache.hadoop.hive.ql.io.orc.OrcProto.Type;
-import org.apache.hadoop.hive.ql.io.orc.RecordReaderImpl.SargApplier;
 import org.apache.hadoop.hive.ql.io.sarg.SearchArgument;
 import org.apache.hadoop.mapred.FileSplit;
 import org.apache.hadoop.mapred.InputSplit;
@@ -52,7 +52,7 @@ public class OrcEncodedDataReader extend
   private final SearchArgument sarg;
   private final String[] columnNames;
   private final OrcEncodedDataConsumer consumer;
-
+  private final QueryFragmentCounters counters;
 
   // Read state.
   private int stripeIxFrom;
@@ -70,7 +70,7 @@ public class OrcEncodedDataReader extend
   public OrcEncodedDataReader(LowLevelCache lowLevelCache, Cache<OrcCacheKey> cache,
       OrcMetadataCache metadataCache, Configuration conf, InputSplit split,
       List<Integer> columnIds, SearchArgument sarg, String[] columnNames,
-      OrcEncodedDataConsumer consumer) {
+      OrcEncodedDataConsumer consumer, QueryFragmentCounters counters) {
     this.lowLevelCache = lowLevelCache;
     this.metadataCache = metadataCache;
     this.cache = cache;
@@ -83,6 +83,7 @@ public class OrcEncodedDataReader extend
     this.sarg = sarg;
     this.columnNames = columnNames;
     this.consumer = consumer;
+    this.counters = counters;
   }
 
   @Override
@@ -459,6 +460,17 @@ public class OrcEncodedDataReader extend
         readState[stripeIxMod][j] = (rgsToRead == null) ? null :
           Arrays.copyOf(rgsToRead, rgsToRead.length);
       }
+
+      int count = 0;
+      if (rgsToRead != null) {
+        for (boolean b : rgsToRead) {
+          if (b)
+            count++;
+        }
+      } else {
+        count = rgCount;
+      }
+      counters.setCounter(QueryFragmentCounters.Counter.SELECTED_ROWGROUPS, count);
     }
   }
 



Mime
View raw message