impala-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From k...@apache.org
Subject [3/6] incubator-impala git commit: IMPALA-3342: Add thread counters to monitor plan fragment execution
Date Wed, 23 Nov 2016 08:25:24 GMT
IMPALA-3342: Add thread counters to monitor plan fragment execution

This change removes the use of total_cpu_timer which incorrectly
monitors the CPU time. Adding THREAD_COUNTERS to measure the user
and sys time in plan fragment execution. This also accounts for the
time spent in the hdfs/kudu scanner and in a blocking join.

Snippet of a query plan with the newly added PlanFragment
THREAD_COUNTERS:
      Instance 2b40b101e2626e7a:a3d8f2300000000
         - PeakMemoryUsage: 32.02 KB (32784)
         - PerHostPeakMemUsage: 430.52 MB (451431312)
         - RowsProduced: 1 (1)
         - TotalNetworkReceiveTime: 10s379ms
         - TotalNetworkSendTime: 0.000ns
         - TotalStorageWaitTime: 0.000ns
         - TotalWallClockTime: 10s577ms
           - SysTime: 8.000ms
           - UserTime: 8.000ms
         - VoluntaryContextSwitches: 80 (80)

Change-Id: Ifa88aa6f3371fa42d11ecc122f43c7d83623c300
Reviewed-on: http://gerrit.cloudera.org:8080/4633
Reviewed-by: Bharath Vissapragada <bharathv@cloudera.com>
Tested-by: Internal Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/bb1c6338
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/bb1c6338
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/bb1c6338

Branch: refs/heads/master
Commit: bb1c63380b8ec14fb6058f3157cad8746463e054
Parents: 8f2bb2f
Author: aphadke <aphadke@cloudera.com>
Authored: Thu Oct 6 22:10:51 2016 -0700
Committer: Internal Jenkins <cloudera-hudson@gerrit.cloudera.org>
Committed: Wed Nov 23 00:56:48 2016 +0000

----------------------------------------------------------------------
 be/src/exec/blocking-join-node.cc        |  2 +-
 be/src/exec/hdfs-scan-node.cc            |  2 +-
 be/src/exec/kudu-scan-node.cc            |  3 +--
 be/src/runtime/plan-fragment-executor.cc | 16 +++-------------
 be/src/runtime/plan-fragment-executor.h  |  3 ---
 be/src/runtime/runtime-state.cc          |  2 +-
 be/src/runtime/runtime-state.h           | 14 ++++++++++----
 7 files changed, 17 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/bb1c6338/be/src/exec/blocking-join-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/blocking-join-node.cc b/be/src/exec/blocking-join-node.cc
index b114451..0e23727 100644
--- a/be/src/exec/blocking-join-node.cc
+++ b/be/src/exec/blocking-join-node.cc
@@ -145,9 +145,9 @@ void BlockingJoinNode::Close(RuntimeState* state) {
 
 void BlockingJoinNode::ProcessBuildInputAsync(RuntimeState* state, DataSink* build_sink,
     Promise<Status>* status) {
+  SCOPED_THREAD_COUNTER_MEASUREMENT(state->total_thread_statistics());
   Status s;
   {
-    SCOPED_TIMER(state->total_cpu_timer());
     if  (build_sink == NULL){
       s = ProcessBuildInput(state);
     } else {

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/bb1c6338/be/src/exec/hdfs-scan-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-scan-node.cc b/be/src/exec/hdfs-scan-node.cc
index 3798107..eebf075 100644
--- a/be/src/exec/hdfs-scan-node.cc
+++ b/be/src/exec/hdfs-scan-node.cc
@@ -356,7 +356,7 @@ void HdfsScanNode::ThreadTokenAvailableCb(ThreadResourceMgr::ResourcePool*
pool)
 
 void HdfsScanNode::ScannerThread() {
   SCOPED_THREAD_COUNTER_MEASUREMENT(scanner_thread_counters());
-  SCOPED_TIMER(runtime_state_->total_cpu_timer());
+  SCOPED_THREAD_COUNTER_MEASUREMENT(runtime_state_->total_thread_statistics());
 
   // Make thread-local copy of filter contexts to prune scan ranges, and to pass to the
   // scanner for finer-grained filtering.

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/bb1c6338/be/src/exec/kudu-scan-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/kudu-scan-node.cc b/be/src/exec/kudu-scan-node.cc
index f98077e..40689ee 100644
--- a/be/src/exec/kudu-scan-node.cc
+++ b/be/src/exec/kudu-scan-node.cc
@@ -273,8 +273,7 @@ Status KuduScanNode::ProcessScanToken(KuduScanner* scanner, const string&
scan_t
 void KuduScanNode::RunScannerThread(const string& name, const string* initial_token)
{
   DCHECK(initial_token != NULL);
   SCOPED_THREAD_COUNTER_MEASUREMENT(scanner_thread_counters());
-  SCOPED_TIMER(runtime_state_->total_cpu_timer());
-
+  SCOPED_THREAD_COUNTER_MEASUREMENT(runtime_state_->total_thread_statistics());
   // Set to true if this thread observes that the number of optional threads has been
   // exceeded and is exiting early.
   bool optional_thread_exiting = false;

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/bb1c6338/be/src/runtime/plan-fragment-executor.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/plan-fragment-executor.cc b/be/src/runtime/plan-fragment-executor.cc
index 9db4d38..f673f34 100644
--- a/be/src/runtime/plan-fragment-executor.cc
+++ b/be/src/runtime/plan-fragment-executor.cc
@@ -39,6 +39,7 @@
 #include "runtime/row-batch.h"
 #include "runtime/runtime-filter-bank.h"
 #include "util/container-util.h"
+#include "runtime/runtime-state.h"
 #include "util/cpu-info.h"
 #include "util/debug-util.h"
 #include "util/mem-info.h"
@@ -107,7 +108,6 @@ Status PlanFragmentExecutor::PrepareInternal(const TExecPlanFragmentParams&
requ
   is_prepared_ = true;
 
   // TODO: Break this method up.
-  fragment_sw_.Start();
   const TPlanFragmentInstanceCtx& fragment_instance_ctx = request.fragment_instance_ctx;
   query_id_ = request.query_ctx.query_id;
 
@@ -302,6 +302,7 @@ Status PlanFragmentExecutor::Open() {
 }
 
 Status PlanFragmentExecutor::OpenInternal() {
+  SCOPED_THREAD_COUNTER_MEASUREMENT(runtime_state_->total_thread_statistics());
   RETURN_IF_ERROR(
       runtime_state_->desc_tbl().PrepareAndOpenPartitionExprs(runtime_state_.get()));
 
@@ -357,6 +358,7 @@ Status PlanFragmentExecutor::Exec() {
 Status PlanFragmentExecutor::ExecInternal() {
   RuntimeProfile::Counter* plan_exec_timer =
       ADD_CHILD_TIMER(timings_profile_, "ExecTreeExecTime", EXEC_TIMER_NAME);
+  SCOPED_THREAD_COUNTER_MEASUREMENT(runtime_state_->total_thread_statistics());
   bool exec_tree_complete = false;
   do {
     Status status;
@@ -467,18 +469,6 @@ void PlanFragmentExecutor::FragmentComplete() {
   // Check the atomic flag. If it is set, then a fragment complete report has already
   // been sent.
   bool send_report = completed_report_sent_.CompareAndSwap(0, 1);
-
-  fragment_sw_.Stop();
-  int64_t cpu_and_wait_time = fragment_sw_.ElapsedTime();
-  fragment_sw_ = MonotonicStopWatch();
-  int64_t cpu_time = cpu_and_wait_time
-      - runtime_state_->total_storage_wait_timer()->value()
-      - runtime_state_->total_network_send_timer()->value()
-      - runtime_state_->total_network_receive_timer()->value();
-  // Timing is not perfect.
-  if (cpu_time < 0) cpu_time = 0;
-  runtime_state_->total_cpu_timer()->Add(cpu_time);
-
   ReleaseThreadToken();
   StopReportThread();
   if (send_report) SendReport(true);

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/bb1c6338/be/src/runtime/plan-fragment-executor.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/plan-fragment-executor.h b/be/src/runtime/plan-fragment-executor.h
index 7149272..708e979 100644
--- a/be/src/runtime/plan-fragment-executor.h
+++ b/be/src/runtime/plan-fragment-executor.h
@@ -242,9 +242,6 @@ class PlanFragmentExecutor {
   /// of the execution.
   RuntimeProfile::Counter* average_thread_tokens_;
 
-  /// Stopwatch for this entire fragment. Started in Prepare(), stopped in Close().
-  MonotonicStopWatch fragment_sw_;
-
   /// (Atomic) Flag that indicates whether a completed fragment report has been or will
   /// be fired. It is initialized to 0 and atomically swapped to 1 when a completed
   /// fragment report is about to be fired. Used for reducing the probability that a

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/bb1c6338/be/src/runtime/runtime-state.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/runtime-state.cc b/be/src/runtime/runtime-state.cc
index c2adcb7..40a4946 100644
--- a/be/src/runtime/runtime-state.cc
+++ b/be/src/runtime/runtime-state.cc
@@ -134,7 +134,7 @@ Status RuntimeState::Init(ExecEnv* exec_env) {
     DCHECK(resource_pool_ != NULL);
   }
 
-  total_cpu_timer_ = ADD_TIMER(runtime_profile(), "TotalCpuTime");
+  total_thread_statistics_ = ADD_THREAD_COUNTERS(runtime_profile(), "");
   total_storage_wait_timer_ = ADD_TIMER(runtime_profile(), "TotalStorageWaitTime");
   total_network_send_timer_ = ADD_TIMER(runtime_profile(), "TotalNetworkSendTime");
   total_network_receive_timer_ = ADD_TIMER(runtime_profile(), "TotalNetworkReceiveTime");

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/bb1c6338/be/src/runtime/runtime-state.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/runtime-state.h b/be/src/runtime/runtime-state.h
index 0ada7e4..f86374e 100644
--- a/be/src/runtime/runtime-state.h
+++ b/be/src/runtime/runtime-state.h
@@ -251,17 +251,22 @@ class RuntimeState {
   bool is_cancelled() const { return is_cancelled_; }
   void set_is_cancelled(bool v) { is_cancelled_ = v; }
 
-  RuntimeProfile::Counter* total_cpu_timer() { return total_cpu_timer_; }
   RuntimeProfile::Counter* total_storage_wait_timer() {
     return total_storage_wait_timer_;
   }
+
   RuntimeProfile::Counter* total_network_send_timer() {
     return total_network_send_timer_;
   }
+
   RuntimeProfile::Counter* total_network_receive_timer() {
     return total_network_receive_timer_;
   }
 
+  RuntimeProfile::ThreadCounters* total_thread_statistics() const {
+   return total_thread_statistics_;
+  }
+
   /// Sets query_status_ with err_msg if no error has been set yet.
   void SetQueryStatus(const std::string& err_msg) {
     boost::lock_guard<SpinLock> l(query_status_lock_);
@@ -351,9 +356,6 @@ class RuntimeState {
 
   RuntimeProfile profile_;
 
-  /// Total CPU time (across all threads), including all wait times.
-  RuntimeProfile::Counter* total_cpu_timer_;
-
   /// Total time waiting in storage (across all threads)
   RuntimeProfile::Counter* total_storage_wait_timer_;
 
@@ -363,6 +365,9 @@ class RuntimeState {
   /// Total time spent receiving over the network (across all threads)
   RuntimeProfile::Counter* total_network_receive_timer_;
 
+  /// Total CPU utilization for all threads in this plan fragment.
+  RuntimeProfile::ThreadCounters* total_thread_statistics_;
+
   /// MemTracker that is shared by all fragment instances running on this host.
   /// The query mem tracker must be released after the instance_mem_tracker_.
   std::shared_ptr<MemTracker> query_mem_tracker_;
@@ -405,6 +410,7 @@ class RuntimeState {
 
   /// prohibit copies
   RuntimeState(const RuntimeState&);
+
 };
 
 #define RETURN_IF_CANCELLED(state) \


Mime
View raw message