hive-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gunt...@apache.org
Subject svn commit: r1527812 - in /hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql: optimizer/SetReducerParallelism.java parse/SetReducerParallelism.java parse/TezCompiler.java
Date Mon, 30 Sep 2013 22:53:36 GMT
Author: gunther
Date: Mon Sep 30 22:53:35 2013
New Revision: 1527812

URL: http://svn.apache.org/r1527812
Log:
HIVE-5378: Need to move SetReducerParallelism to the optimize package. (Vikram Dixit K via
Gunther Hagleitner)

Added:
    hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SetReducerParallelism.java
Removed:
    hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/parse/SetReducerParallelism.java
Modified:
    hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java

Added: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SetReducerParallelism.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SetReducerParallelism.java?rev=1527812&view=auto
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SetReducerParallelism.java
(added)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SetReducerParallelism.java
Mon Sep 30 22:53:35 2013
@@ -0,0 +1,98 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.optimizer;
+
+import java.util.Stack;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.exec.Operator;
+import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator;
+import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.lib.Node;
+import org.apache.hadoop.hive.ql.lib.NodeProcessor;
+import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.parse.OptimizeTezProcContext;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.ql.plan.OperatorDesc;
+import org.apache.hadoop.hive.ql.plan.ReduceSinkDesc;
+
+/**
+ * SetReducerParallelism determines how many reducers should
+ * be run for a given reduce sink.
+ */
+public class SetReducerParallelism implements NodeProcessor {
+
+  static final private Log LOG = LogFactory.getLog(SetReducerParallelism.class.getName());
+
+  @SuppressWarnings("unchecked")
+  @Override
+  public Object process(Node nd, Stack<Node> stack,
+      NodeProcessorCtx procContext, Object... nodeOutputs)
+      throws SemanticException {
+
+    OptimizeTezProcContext context = (OptimizeTezProcContext) procContext;
+
+    ReduceSinkOperator sink = (ReduceSinkOperator) nd;
+    ReduceSinkDesc desc = sink.getConf();
+
+    long bytesPerReducer = context.conf.getLongVar(HiveConf.ConfVars.BYTESPERREDUCER);
+    int maxReducers = context.conf.getIntVar(HiveConf.ConfVars.MAXREDUCERS);
+    int constantReducers = context.conf.getIntVar(HiveConf.ConfVars.HADOOPNUMREDUCERS);
+
+    if (context.visitedReduceSinks.contains(sink)) {
+      // skip walking the children
+      LOG.debug("Already processed reduce sink: " + sink.getName());
+      return true;
+    }
+
+    context.visitedReduceSinks.add(sink);
+
+    try {
+      if (desc.getNumReducers() <= 0) {
+        if (constantReducers > 0) {
+          LOG.info("Parallelism for reduce sink "+sink+" set by user to "+constantReducers);
+          desc.setNumReducers(constantReducers);
+        } else {
+          long numberOfBytes = 0;
+
+          // we need to add up all the estimates from the siblings of this reduce sink
+          for (Operator<? extends OperatorDesc> sibling:
+            sink.getChildOperators().get(0).getParentOperators()) {
+            numberOfBytes += sibling.getStatistics(context.conf).getNumberOfBytes();
+          }
+
+          int numReducers = Utilities.estimateReducers(numberOfBytes, bytesPerReducer,
+              maxReducers, false);
+          LOG.info("Set parallelism for reduce sink "+sink+" to: "+numReducers);
+          desc.setNumReducers(numReducers);
+        }
+      } else {
+        LOG.info("Number of reducers determined to be: "+desc.getNumReducers());
+      }
+    } catch (HiveException e) {
+      throw new SemanticException(e);
+    }
+
+    return false;
+  }
+
+}

Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java?rev=1527812&r1=1527811&r2=1527812&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java Mon Sep
30 22:53:35 2013
@@ -49,6 +49,7 @@ import org.apache.hadoop.hive.ql.lib.Rul
 import org.apache.hadoop.hive.ql.lib.RuleRegExp;
 import org.apache.hadoop.hive.ql.optimizer.ConvertJoinMapJoin;
 import org.apache.hadoop.hive.ql.optimizer.ReduceSinkMapJoinProc;
+import org.apache.hadoop.hive.ql.optimizer.SetReducerParallelism;
 import org.apache.hadoop.hive.ql.plan.BaseWork;
 import org.apache.hadoop.hive.ql.plan.MapWork;
 import org.apache.hadoop.hive.ql.plan.MoveWork;



Mime
View raw message