Return-Path: X-Original-To: apmail-hive-commits-archive@www.apache.org Delivered-To: apmail-hive-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id A1E5CDA53 for ; Mon, 1 Oct 2012 04:28:55 +0000 (UTC) Received: (qmail 21836 invoked by uid 500); 1 Oct 2012 04:28:55 -0000 Delivered-To: apmail-hive-commits-archive@hive.apache.org Received: (qmail 21704 invoked by uid 500); 1 Oct 2012 04:28:54 -0000 Mailing-List: contact commits-help@hive.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: hive-dev@hive.apache.org Delivered-To: mailing list commits@hive.apache.org Received: (qmail 21655 invoked by uid 99); 1 Oct 2012 04:28:52 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 01 Oct 2012 04:28:52 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 01 Oct 2012 04:28:50 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id 6E5BF23889C5 for ; Mon, 1 Oct 2012 04:28:07 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1392202 - in /hive/trunk/ql/src/java/org/apache/hadoop/hive/ql: QueryPlan.java exec/HadoopJobExecHelper.java plan/ReducerTimeStatsPerJob.java Date: Mon, 01 Oct 2012 04:28:07 -0000 To: commits@hive.apache.org From: cws@apache.org X-Mailer: svnmailer-1.0.8-patched Message-Id: <20121001042807.6E5BF23889C5@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: cws Date: Mon Oct 1 04:28:06 2012 New Revision: 1392202 URL: http://svn.apache.org/viewvc?rev=1392202&view=rev Log: add instrumentation to capture if there is skew in reducers (Arun Dobriya via cws) Added: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/ReducerTimeStatsPerJob.java (with props) Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/QueryPlan.java hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/HadoopJobExecHelper.java Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/QueryPlan.java URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/QueryPlan.java?rev=1392202&r1=1392201&r2=1392202&view=diff ============================================================================== --- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/QueryPlan.java (original) +++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/QueryPlan.java Mon Oct 1 04:28:06 2012 @@ -45,6 +45,7 @@ import org.apache.hadoop.hive.ql.hooks.R import org.apache.hadoop.hive.ql.hooks.WriteEntity; import org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer; import org.apache.hadoop.hive.ql.plan.OperatorDesc; +import org.apache.hadoop.hive.ql.plan.ReducerTimeStatsPerJob; import org.apache.hadoop.hive.ql.plan.api.AdjacencyType; import org.apache.hadoop.hive.ql.plan.api.NodeType; import org.apache.hadoop.hive.ql.plan.api.TaskType; @@ -67,6 +68,7 @@ public class QueryPlan implements Serial private ArrayList> rootTasks; private FetchTask fetchTask; + private final List reducerTimeStatsPerJobList; private HashSet inputs; /** @@ -94,12 +96,14 @@ public class QueryPlan implements Serial private transient Long queryStartTime; public QueryPlan() { + this.reducerTimeStatsPerJobList = new ArrayList(); } public QueryPlan(String queryString, BaseSemanticAnalyzer sem, Long startTime) { this.queryString = queryString; rootTasks = new ArrayList>(); + this.reducerTimeStatsPerJobList = new ArrayList(); rootTasks.addAll(sem.getRootTasks()); fetchTask = sem.getFetchTask(); // Note that inputs and outputs can be changed when the query gets executed @@ -706,6 +710,10 @@ public class QueryPlan implements Serial return query; } + public List getReducerTimeStatsPerJobList() { + return this.reducerTimeStatsPerJobList; + } + public void setQuery(org.apache.hadoop.hive.ql.plan.api.Query query) { this.query = query; } Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/HadoopJobExecHelper.java URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/HadoopJobExecHelper.java?rev=1392202&r1=1392201&r2=1392202&view=diff ============================================================================== --- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/HadoopJobExecHelper.java (original) +++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/HadoopJobExecHelper.java Mon Oct 1 04:28:06 2012 @@ -42,6 +42,7 @@ import org.apache.hadoop.hive.ql.exec.Op import org.apache.hadoop.hive.ql.exec.errors.ErrorAndSolution; import org.apache.hadoop.hive.ql.exec.errors.TaskLogProcessor; import org.apache.hadoop.hive.ql.history.HiveHistory.Keys; +import org.apache.hadoop.hive.ql.plan.ReducerTimeStatsPerJob; import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.hadoop.hive.ql.session.SessionState.LogHelper; import org.apache.hadoop.hive.ql.stats.ClientStatsPublisher; @@ -696,6 +697,12 @@ public class HadoopJobExecHelper { // for special modes. In that case, SessionState.get() is empty. if (SessionState.get() != null) { SessionState.get().getLastMapRedStatsList().add(mapRedStats); + + // Computes the skew for all the MapReduce irrespective + // of Success or Failure + if (this.task.getQueryPlan() != null) { + computeReducerTimeStatsPerJob(rj); + } } boolean success = mapRedStats.isSuccess(); @@ -733,6 +740,31 @@ public class HadoopJobExecHelper { return returnVal; } + + private void computeReducerTimeStatsPerJob(RunningJob rj) throws IOException { + TaskCompletionEvent[] taskCompletions = rj.getTaskCompletionEvents(0); + List reducersRunTimes = new ArrayList(); + + for (TaskCompletionEvent taskCompletion : taskCompletions) { + String[] taskJobIds = ShimLoader.getHadoopShims().getTaskJobIDs(taskCompletion); + if (taskJobIds == null) { + // Task attempt info is unavailable in this Hadoop version"); + continue; + } + String taskId = taskJobIds[0]; + if (!taskCompletion.isMapTask()) { + reducersRunTimes.add(new Integer(taskCompletion.getTaskRunTime())); + } + } + // Compute the reducers run time statistics for the job + ReducerTimeStatsPerJob reducerTimeStatsPerJob = new ReducerTimeStatsPerJob(reducersRunTimes, + new String(this.jobId)); + // Adding the reducers run time statistics for the job in the QueryPlan + this.task.getQueryPlan().getReducerTimeStatsPerJobList().add(reducerTimeStatsPerJob); + return; + } + + private Map extractAllCounterValues(Counters counters) { Map exctractedCounters = new HashMap(); for (Counters.Group cg : counters) { Added: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/ReducerTimeStatsPerJob.java URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/ReducerTimeStatsPerJob.java?rev=1392202&view=auto ============================================================================== --- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/ReducerTimeStatsPerJob.java (added) +++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/ReducerTimeStatsPerJob.java Mon Oct 1 04:28:06 2012 @@ -0,0 +1,111 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.plan; + +import java.util.List; + +/* + * Encapsulates statistics about the duration of all reduce tasks + * corresponding to a specific JobId. + * The stats are computed in the HadoopJobExecHelper when the + * job completes and then populated inside the QueryPlan for + * each job, from where it can be later on accessed. + * The reducer statistics consist of minimum/maximum/mean/stdv of the + * run times of all the reduce tasks for a job. All the Run times are + * in Milliseconds. + */ +public class ReducerTimeStatsPerJob { + + // stores the JobId of the job + private final String jobId; + + // Stores the temporal statistics in milliseconds for reducers + // specific to a Job + private final long minimumTime; + private final long maximumTime; + private final double meanTime; + private final double standardDeviationTime; + + + /* + * Computes the temporal run time statistics of the reducers + * for a specific JobId. + */ + public ReducerTimeStatsPerJob(List reducersRunTimes, String jobId) { + this.jobId = jobId; + + // If no Run times present, then set -1, indicating no values + if (!reducersRunTimes.isEmpty()) { + long minimumTime = reducersRunTimes.get(0); + long maximumTime = reducersRunTimes.get(0); + long totalTime = reducersRunTimes.get(0); + double standardDeviationTime = 0.0; + double meanTime = 0.0; + + for (int i = 1; i < reducersRunTimes.size(); i++) { + if (reducersRunTimes.get(i) < minimumTime) { + minimumTime = reducersRunTimes.get(i); + } + if (reducersRunTimes.get(i) > maximumTime) { + maximumTime = reducersRunTimes.get(i); + } + totalTime += reducersRunTimes.get(i); + } + meanTime = (double) totalTime / reducersRunTimes.size(); + + for (int i = 0; i < reducersRunTimes.size(); i++) { + standardDeviationTime += Math.pow(meanTime - reducersRunTimes.get(i), 2); + } + standardDeviationTime /= reducersRunTimes.size(); + standardDeviationTime = Math.sqrt(standardDeviationTime); + + this.minimumTime = minimumTime; + this.maximumTime = maximumTime; + this.meanTime = meanTime; + this.standardDeviationTime = standardDeviationTime; + return; + } + this.minimumTime = -1; + this.maximumTime = -1; + this.meanTime = -1.0; + this.standardDeviationTime = -1.0; + return; + } + + public long getMinimumTime() { + return this.minimumTime; + } + + public long getMaximumTime() { + return this.maximumTime; + } + + public double getMeanTime() { + return this.meanTime; + } + + public double getStandardDeviationTime() { + return this.standardDeviationTime; + } + + public String getJobId() { + return this.jobId; + } + +} Propchange: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/ReducerTimeStatsPerJob.java ------------------------------------------------------------------------------ svn:eol-style = native