hive-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gunt...@apache.org
Subject svn commit: r1633988 - in /hive/trunk: common/src/java/org/apache/hadoop/hive/conf/ ql/src/java/org/apache/hadoop/hive/ql/exec/ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/tools/ ql/src/java/org/apache...
Date Fri, 24 Oct 2014 02:42:20 GMT
Author: gunther
Date: Fri Oct 24 02:42:19 2014
New Revision: 1633988

URL: http://svn.apache.org/r1633988
Log:
HIVE-8409: SMB joins fail intermittently on tez (Vikram Dixit K via Gunther Hagleitner)

Modified:
    hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/CommonMergeJoinOperator.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/CustomPartitionVertex.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/CustomVertexConfiguration.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HiveSplitGenerator.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordSource.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MergeFileRecordProcessor.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/SplitGrouper.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/tools/KeyValueInputMerger.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/IOContext.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/MergeJoinProc.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezWork.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/BaseWork.java

Modified: hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
URL: http://svn.apache.org/viewvc/hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java?rev=1633988&r1=1633987&r2=1633988&view=diff
==============================================================================
--- hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java (original)
+++ hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java Fri Oct 24 02:42:19 2014
@@ -1823,7 +1823,11 @@ public class HiveConf extends Configurat
     TEZ_DYNAMIC_PARTITION_PRUNING_MAX_EVENT_SIZE("hive.tez.dynamic.partition.pruning.max.event.size", 1*1024*1024L,
         "Maximum size of events sent by processors in dynamic pruning. If this size is crossed no pruning will take place."),
     TEZ_DYNAMIC_PARTITION_PRUNING_MAX_DATA_SIZE("hive.tez.dynamic.partition.pruning.max.data.size", 100*1024*1024L,
-        "Maximum total data size of events in dynamic pruning.")
+        "Maximum total data size of events in dynamic pruning."),
+    TEZ_SMB_NUMBER_WAVES(
+        "hive.tez.smb.number.waves",
+        (float) 0.5,
+        "The number of waves in which to run the SMB join. Account for cluster being occupied. Ideally should be 1 wave.")
     ;
 
     public final String varname;

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/CommonMergeJoinOperator.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/CommonMergeJoinOperator.java?rev=1633988&r1=1633987&r2=1633988&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/CommonMergeJoinOperator.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/CommonMergeJoinOperator.java Fri Oct 24 02:42:19 2014
@@ -20,7 +20,6 @@ package org.apache.hadoop.hive.ql.exec;
 
 import java.io.Serializable;
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
@@ -36,7 +35,6 @@ import org.apache.hadoop.hive.ql.metadat
 import org.apache.hadoop.hive.ql.plan.CommonMergeJoinDesc;
 import org.apache.hadoop.hive.ql.plan.OperatorDesc;
 import org.apache.hadoop.hive.ql.plan.api.OperatorType;
-import org.apache.hadoop.hive.serde2.objectinspector.InspectableObject;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption;
 import org.apache.hadoop.hive.serde2.objectinspector.StructField;
@@ -64,7 +62,6 @@ public class CommonMergeJoinOperator ext
   private static final long serialVersionUID = 1L;
   private boolean isBigTableWork;
   private static final Log LOG = LogFactory.getLog(CommonMergeJoinOperator.class.getName());
-  private Map<Integer, String> aliasToInputNameMap;
   transient List<Object>[] keyWritables;
   transient List<Object>[] nextKeyWritables;
   transient RowContainer<List<Object>>[] nextGroupStorage;

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java?rev=1633988&r1=1633987&r2=1633988&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java Fri Oct 24 02:42:19 2014
@@ -67,6 +67,7 @@ import org.apache.hadoop.util.StringUtil
  * different from regular operators in that it starts off by processing a
  * Writable data structure from a Table (instead of a Hive Object).
  **/
+@SuppressWarnings("deprecation")
 public class MapOperator extends Operator<MapWork> implements Serializable, Cloneable {
 
   private static final long serialVersionUID = 1L;
@@ -177,7 +178,6 @@ public class MapOperator extends Operato
 
   private MapOpCtx initObjectInspector(Configuration hconf, MapOpCtx opCtx,
       StructObjectInspector tableRowOI) throws Exception {
-
     PartitionDesc pd = opCtx.partDesc;
     TableDesc td = pd.getTableDesc();
 
@@ -616,4 +616,16 @@ public class MapOperator extends Operato
   public Map<Integer, DummyStoreOperator> getTagToOperatorTree() {
     return MapRecordProcessor.getConnectOps();
   }
+
+  public void initializeContexts() {
+    Path fpath = getExecContext().getCurrentInputPath();
+    String nominalPath = getNominalPath(fpath);
+    Map<Operator<?>, MapOpCtx> contexts = opCtxMap.get(nominalPath);
+    currentCtxs = contexts.values().toArray(new MapOpCtx[contexts.size()]);
+  }
+
+  public Deserializer getCurrentDeserializer() {
+
+    return currentCtxs[0].deserializer;
+  }
 }

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java?rev=1633988&r1=1633987&r2=1633988&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java Fri Oct 24 02:42:19 2014
@@ -409,7 +409,7 @@ public final class Utilities {
         }
         gWorkMap.put(path, gWork);
       } else {
-        LOG.debug("Found plan in cache.");
+        LOG.debug("Found plan in cache for name: " + name);
         gWork = gWorkMap.get(path);
       }
       return gWork;
@@ -1610,12 +1610,13 @@ public final class Utilities {
    * Group 6: copy     [copy keyword]
    * Group 8: 2        [copy file index]
    */
+  private static final String COPY_KEYWORD = "_copy_"; // copy keyword
   private static final Pattern COPY_FILE_NAME_TO_TASK_ID_REGEX =
       Pattern.compile("^.*?"+ // any prefix
                       "([0-9]+)"+ // taskId
                       "(_)"+ // separator
                       "([0-9]{1,6})?"+ // attemptId (limited to 6 digits)
-                      "((_)(\\Bcopy\\B)(_)"+ // copy keyword
+                      "((_)(\\Bcopy\\B)(_)" +
                       "([0-9]{1,6})$)?"+ // copy file index
                       "(\\..*)?$"); // any suffix/file extension
 
@@ -2010,6 +2011,15 @@ public final class Utilities {
     return false;
   }
 
+  public static String getBucketFileNameFromPathSubString(String bucketName) {
+    try {
+      return bucketName.split(COPY_KEYWORD)[0];
+    } catch (Exception e) {
+      e.printStackTrace();
+      return bucketName;
+    }
+  }
+
   public static String getNameMessage(Exception e) {
     return e.getClass().getName() + "(" + e.getMessage() + ")";
   }

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/CustomPartitionVertex.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/CustomPartitionVertex.java?rev=1633988&r1=1633987&r2=1633988&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/CustomPartitionVertex.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/CustomPartitionVertex.java Fri Oct 24 02:42:19 2014
@@ -21,16 +21,22 @@ package org.apache.hadoop.hive.ql.exec.t
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collection;
+import java.util.Comparator;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
+import java.util.Set;
 import java.util.TreeMap;
+import java.util.TreeSet;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.exec.Utilities;
 import org.apache.hadoop.hive.ql.plan.TezWork.VertexType;
 import org.apache.hadoop.hive.shims.ShimLoader;
 import org.apache.hadoop.io.DataOutputBuffer;
@@ -38,6 +44,7 @@ import org.apache.hadoop.io.serializer.S
 import org.apache.hadoop.mapred.FileSplit;
 import org.apache.hadoop.mapred.InputSplit;
 import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.split.TezGroupedSplit;
 import org.apache.hadoop.mapreduce.split.TezMapReduceSplitsGrouper;
 import org.apache.tez.common.TezUtils;
 import org.apache.tez.dag.api.EdgeProperty;
@@ -67,11 +74,36 @@ import com.google.common.collect.Multima
 import com.google.protobuf.ByteString;
 
 /*
- * Only works with old mapred API
- * Will only work with a single MRInput for now.
+ * This is the central piece for Bucket Map Join and SMB join. It has the following
+ * responsibilities:
+ * 1. Group incoming splits based on bucketing.
+ * 2. Generate new serialized events for the grouped splits.
+ * 3. Create a routing table for the bucket map join and send a serialized version as payload
+ * for the EdgeManager.
+ * 4. For SMB join, generate a grouping according to bucketing for the "small" table side.
  */
 public class CustomPartitionVertex extends VertexManagerPlugin {
 
+  public class PathComparatorForSplit implements Comparator<InputSplit> {
+
+    @Override
+    public int compare(InputSplit inp1, InputSplit inp2) {
+      FileSplit fs1 = (FileSplit) inp1;
+      FileSplit fs2 = (FileSplit) inp2;
+
+      int retval = fs1.getPath().compareTo(fs2.getPath());
+      if (retval != 0) {
+        return retval;
+      }
+
+      if (fs1.getStart() != fs2.getStart()) {
+        return (int) (fs1.getStart() - fs2.getStart());
+      }
+
+      return 0;
+    }
+  }
+
   private static final Log LOG = LogFactory.getLog(CustomPartitionVertex.class.getName());
 
   VertexManagerPluginContext context;
@@ -89,6 +121,13 @@ public class CustomPartitionVertex exten
   private final Map<String, Multimap<Integer, InputSplit>> inputToGroupedSplitMap =
       new HashMap<String, Multimap<Integer, InputSplit>>();
 
+  private int numInputsAffectingRootInputSpecUpdate = 1;
+  private int numInputsSeenSoFar = 0;
+  private final Map<String, EdgeManagerPluginDescriptor> emMap = Maps.newHashMap();
+  private final List<InputSplit> finalSplits = Lists.newLinkedList();
+  private final Map<String, InputSpecUpdate> inputNameInputSpecMap =
+      new HashMap<String, InputSpecUpdate>();
+
   public CustomPartitionVertex(VertexManagerPluginContext context) {
     super(context);
   }
@@ -108,12 +147,13 @@ public class CustomPartitionVertex exten
     this.numBuckets = vertexConf.getNumBuckets();
     this.mainWorkName = vertexConf.getInputName();
     this.vertexType = vertexConf.getVertexType();
+    this.numInputsAffectingRootInputSpecUpdate = vertexConf.getNumInputs();
   }
 
   @Override
   public void onVertexStarted(Map<String, List<Integer>> completions) {
     int numTasks = context.getVertexNumTasks(context.getVertexName());
-    List<VertexManagerPluginContext.TaskWithLocationHint> scheduledTasks = 
+    List<VertexManagerPluginContext.TaskWithLocationHint> scheduledTasks =
       new ArrayList<VertexManagerPluginContext.TaskWithLocationHint>(numTasks);
     for (int i = 0; i < numTasks; ++i) {
       scheduledTasks.add(new VertexManagerPluginContext.TaskWithLocationHint(new Integer(i), null));
@@ -133,8 +173,8 @@ public class CustomPartitionVertex exten
   @Override
   public void onRootVertexInitialized(String inputName, InputDescriptor inputDescriptor,
       List<Event> events) {
+    numInputsSeenSoFar++;
     LOG.info("On root vertex initialized " + inputName);
-
     try {
       // This is using the payload from the RootVertexInitializer corresponding
       // to InputName. Ideally it should be using it's own configuration class -
@@ -168,18 +208,21 @@ public class CustomPartitionVertex exten
     }
 
     boolean dataInformationEventSeen = false;
-    Map<String, List<FileSplit>> pathFileSplitsMap = new TreeMap<String, List<FileSplit>>();
+    Map<String, Set<FileSplit>> pathFileSplitsMap = new TreeMap<String, Set<FileSplit>>();
 
     for (Event event : events) {
       if (event instanceof InputConfigureVertexTasksEvent) {
         // No tasks should have been started yet. Checked by initial state
         // check.
+        LOG.info("Got a input configure vertex event for input: " + inputName);
         Preconditions.checkState(dataInformationEventSeen == false);
         InputConfigureVertexTasksEvent cEvent = (InputConfigureVertexTasksEvent) event;
 
         // The vertex cannot be configured until all DataEvents are seen - to
         // build the routing table.
         configureVertexTaskEvent = cEvent;
+        LOG.info("Configure task for input name: " + inputName + " num tasks: "
+            + configureVertexTaskEvent.getNumTasks());
         dataInformationEvents =
             Lists.newArrayListWithCapacity(configureVertexTaskEvent.getNumTasks());
       }
@@ -196,15 +239,20 @@ public class CustomPartitionVertex exten
         } catch (IOException e) {
           throw new RuntimeException("Failed to get file split for event: " + diEvent);
         }
-        List<FileSplit> fsList = pathFileSplitsMap.get(fileSplit.getPath().getName());
+        Set<FileSplit> fsList =
+            pathFileSplitsMap.get(Utilities.getBucketFileNameFromPathSubString(fileSplit.getPath()
+                .getName()));
         if (fsList == null) {
-          fsList = new ArrayList<FileSplit>();
-          pathFileSplitsMap.put(fileSplit.getPath().getName(), fsList);
+          fsList = new TreeSet<FileSplit>(new PathComparatorForSplit());
+          pathFileSplitsMap.put(
+              Utilities.getBucketFileNameFromPathSubString(fileSplit.getPath().getName()), fsList);
         }
         fsList.add(fileSplit);
       }
     }
 
+    LOG.info("Path file splits map for input name: " + inputName + " is " + pathFileSplitsMap);
+
     Multimap<Integer, InputSplit> bucketToInitialSplitMap =
         getBucketSplitMapForPath(pathFileSplitsMap);
 
@@ -217,50 +265,88 @@ public class CustomPartitionVertex exten
 
       int availableSlots = totalResource / taskResource;
 
-      LOG.info("Grouping splits. " + availableSlots + " available slots, " + waves + " waves.");
+      LOG.info("Grouping splits. " + availableSlots + " available slots, " + waves
+          + " waves. Bucket initial splits map: " + bucketToInitialSplitMap);
       JobConf jobConf = new JobConf(conf);
       ShimLoader.getHadoopShims().getMergedCredentials(jobConf);
 
       Multimap<Integer, InputSplit> bucketToGroupedSplitMap =
           HashMultimap.<Integer, InputSplit> create();
-      for (Integer key : bucketToInitialSplitMap.keySet()) {
-        InputSplit[] inputSplitArray =
-            (bucketToInitialSplitMap.get(key).toArray(new InputSplit[0]));
-        Multimap<Integer, InputSplit> groupedSplit =
-            HiveSplitGenerator.generateGroupedSplits(jobConf, conf, inputSplitArray, waves,
-                availableSlots, inputName);
-        bucketToGroupedSplitMap.putAll(key, groupedSplit.values());
-      }
-
-      LOG.info("We have grouped the splits into " + bucketToGroupedSplitMap);
-      if ((mainWorkName.isEmpty() == false) && (mainWorkName.compareTo(inputName) != 0)) {
+      boolean secondLevelGroupingDone = false;
+      if ((mainWorkName.isEmpty()) || (inputName.compareTo(mainWorkName) == 0)) {
+        for (Integer key : bucketToInitialSplitMap.keySet()) {
+          InputSplit[] inputSplitArray =
+              (bucketToInitialSplitMap.get(key).toArray(new InputSplit[0]));
+          HiveSplitGenerator hiveSplitGenerator = new HiveSplitGenerator();
+          Multimap<Integer, InputSplit> groupedSplit =
+              hiveSplitGenerator.generateGroupedSplits(jobConf, conf, inputSplitArray, waves,
+                  availableSlots, inputName, mainWorkName.isEmpty());
+          if (mainWorkName.isEmpty() == false) {
+            Multimap<Integer, InputSplit> singleBucketToGroupedSplit =
+                HashMultimap.<Integer, InputSplit> create();
+            singleBucketToGroupedSplit.putAll(key, groupedSplit.values());
+            groupedSplit =
+                grouper.group(jobConf, singleBucketToGroupedSplit, availableSlots,
+                    HiveConf.getFloatVar(conf, HiveConf.ConfVars.TEZ_SMB_NUMBER_WAVES));
+            secondLevelGroupingDone = true;
+          }
+          bucketToGroupedSplitMap.putAll(key, groupedSplit.values());
+        }
+        processAllEvents(inputName, bucketToGroupedSplitMap, secondLevelGroupingDone);
+      } else {
+        // do not group across files in case of side work because there is only 1 KV reader per
+        // grouped split. This would affect SMB joins where we want to find the smallest key in
+        // all the bucket files.
+        for (Integer key : bucketToInitialSplitMap.keySet()) {
+          HiveSplitGenerator hiveSplitGenerator = new HiveSplitGenerator();
+          InputSplit[] inputSplitArray =
+              (bucketToInitialSplitMap.get(key).toArray(new InputSplit[0]));
+          Multimap<Integer, InputSplit> groupedSplit =
+              hiveSplitGenerator.generateGroupedSplits(jobConf, conf, inputSplitArray, waves,
+                    availableSlots, inputName, false);
+            bucketToGroupedSplitMap.putAll(key, groupedSplit.values());
+        }
         /*
-         * this is the small table side. In case of SMB join, we may need to send each split to the
+         * this is the small table side. In case of SMB join, we need to send each split to the
          * corresponding bucket-based task on the other side. In case a split needs to go to
          * multiple downstream tasks, we need to clone the event and send it to the right
          * destination.
          */
-        processAllSideEvents(inputName, bucketToGroupedSplitMap);
-      } else {
-        processAllEvents(inputName, bucketToGroupedSplitMap);
+        LOG.info("This is the side work - multi-mr work.");
+        processAllSideEventsSetParallelism(inputName, bucketToGroupedSplitMap);
       }
     } catch (Exception e) {
       throw new RuntimeException(e);
     }
   }
 
-  private void processAllSideEvents(String inputName,
+  private void processAllSideEventsSetParallelism(String inputName,
       Multimap<Integer, InputSplit> bucketToGroupedSplitMap) throws IOException {
     // the bucket to task map should have been setup by the big table.
+    LOG.info("Processing events for input " + inputName);
     if (bucketToTaskMap.isEmpty()) {
+      LOG.info("We don't have a routing table yet. Will need to wait for the main input"
+          + " initialization");
       inputToGroupedSplitMap.put(inputName, bucketToGroupedSplitMap);
       return;
     }
+    processAllSideEvents(inputName, bucketToGroupedSplitMap);
+    setVertexParallelismAndRootInputSpec(inputNameInputSpecMap);
+  }
+
+  private void processAllSideEvents(String inputName,
+      Multimap<Integer, InputSplit> bucketToGroupedSplitMap) throws IOException {
     List<InputDataInformationEvent> taskEvents = new ArrayList<InputDataInformationEvent>();
+    LOG.info("We have a routing table and we are going to set the destination tasks for the"
+        + " multi mr inputs. " + bucketToTaskMap);
+
+    Integer[] numSplitsForTask = new Integer[taskCount];
     for (Entry<Integer, Collection<InputSplit>> entry : bucketToGroupedSplitMap.asMap().entrySet()) {
       Collection<Integer> destTasks = bucketToTaskMap.get(entry.getKey());
       for (Integer task : destTasks) {
+        int count = 0;
         for (InputSplit split : entry.getValue()) {
+          count++;
           MRSplitProto serializedSplit = MRInputHelpers.createSplitProto(split);
           InputDataInformationEvent diEvent =
               InputDataInformationEvent.createWithSerializedPayload(task, serializedSplit
@@ -268,26 +354,45 @@ public class CustomPartitionVertex exten
           diEvent.setTargetIndex(task);
           taskEvents.add(diEvent);
         }
+        numSplitsForTask[task] = count;
       }
     }
 
+    inputNameInputSpecMap.put(inputName,
+        InputSpecUpdate.createPerTaskInputSpecUpdate(Arrays.asList(numSplitsForTask)));
+
+    LOG.info("For input name: " + inputName + " task events size is " + taskEvents.size());
+
     context.addRootInputEvents(inputName, taskEvents);
   }
 
   private void processAllEvents(String inputName,
-      Multimap<Integer, InputSplit> bucketToGroupedSplitMap) throws IOException {
+      Multimap<Integer, InputSplit> bucketToGroupedSplitMap, boolean secondLevelGroupingDone)
+      throws IOException {
 
-    List<InputSplit> finalSplits = Lists.newLinkedList();
+    int totalInputsCount = 0;
+    List<Integer> numSplitsForTask = new ArrayList<Integer>();
     for (Entry<Integer, Collection<InputSplit>> entry : bucketToGroupedSplitMap.asMap().entrySet()) {
       int bucketNum = entry.getKey();
       Collection<InputSplit> initialSplits = entry.getValue();
       finalSplits.addAll(initialSplits);
-      for (int i = 0; i < initialSplits.size(); i++) {
+      for (InputSplit inputSplit : initialSplits) {
         bucketToTaskMap.put(bucketNum, taskCount);
+        if (secondLevelGroupingDone) {
+          TezGroupedSplit groupedSplit = (TezGroupedSplit) inputSplit;
+          numSplitsForTask.add(groupedSplit.getGroupedSplits().size());
+          totalInputsCount += groupedSplit.getGroupedSplits().size();
+        } else {
+          numSplitsForTask.add(1);
+          totalInputsCount += 1;
+        }
         taskCount++;
       }
     }
 
+    inputNameInputSpecMap.put(inputName,
+        InputSpecUpdate.createPerTaskInputSpecUpdate(numSplitsForTask));
+
     // Construct the EdgeManager descriptor to be used by all edges which need
     // the routing table.
     EdgeManagerPluginDescriptor hiveEdgeManagerDesc = null;
@@ -297,7 +402,6 @@ public class CustomPartitionVertex exten
       UserPayload payload = getBytePayload(bucketToTaskMap);
       hiveEdgeManagerDesc.setUserPayload(payload);
     }
-    Map<String, EdgeManagerPluginDescriptor> emMap = Maps.newHashMap();
 
     // Replace the edge manager for all vertices which have routing type custom.
     for (Entry<String, EdgeProperty> edgeEntry : context.getInputVertexEdgeProperties().entrySet()) {
@@ -308,42 +412,66 @@ public class CustomPartitionVertex exten
       }
     }
 
-    LOG.info("Task count is " + taskCount);
+    LOG.info("Task count is " + taskCount + " for input name: " + inputName);
 
-    List<InputDataInformationEvent> taskEvents =
-        Lists.newArrayListWithCapacity(finalSplits.size());
+    List<InputDataInformationEvent> taskEvents = Lists.newArrayListWithCapacity(totalInputsCount);
     // Re-serialize the splits after grouping.
     int count = 0;
     for (InputSplit inputSplit : finalSplits) {
-      MRSplitProto serializedSplit = MRInputHelpers.createSplitProto(inputSplit);
-      InputDataInformationEvent diEvent = InputDataInformationEvent.createWithSerializedPayload(
-          count, serializedSplit.toByteString().asReadOnlyByteBuffer());
-      diEvent.setTargetIndex(count);
+      if (secondLevelGroupingDone) {
+        TezGroupedSplit tezGroupedSplit = (TezGroupedSplit)inputSplit;
+        for (InputSplit subSplit : tezGroupedSplit.getGroupedSplits()) {
+          if ((subSplit instanceof TezGroupedSplit) == false) {
+            throw new IOException("Unexpected split type found: "
+                + subSplit.getClass().getCanonicalName());
+          }
+          MRSplitProto serializedSplit = MRInputHelpers.createSplitProto(subSplit);
+          InputDataInformationEvent diEvent =
+              InputDataInformationEvent.createWithSerializedPayload(count, serializedSplit
+                  .toByteString().asReadOnlyByteBuffer());
+          diEvent.setTargetIndex(count);
+          taskEvents.add(diEvent);
+        }
+      } else {
+        MRSplitProto serializedSplit = MRInputHelpers.createSplitProto(inputSplit);
+        InputDataInformationEvent diEvent =
+            InputDataInformationEvent.createWithSerializedPayload(count, serializedSplit
+                .toByteString().asReadOnlyByteBuffer());
+        diEvent.setTargetIndex(count);
+        taskEvents.add(diEvent);
+      }
       count++;
-      taskEvents.add(diEvent);
-    }
-
-    // Replace the Edge Managers
-    Map<String, InputSpecUpdate> rootInputSpecUpdate =
-      new HashMap<String, InputSpecUpdate>();
-    rootInputSpecUpdate.put(
-        inputName,
-        InputSpecUpdate.getDefaultSinglePhysicalInputSpecUpdate());
-    if ((mainWorkName.compareTo(inputName) == 0) || (mainWorkName.isEmpty())) {
-      context.setVertexParallelism(
-          taskCount,
-          VertexLocationHint.create(grouper.createTaskLocationHints(finalSplits
-              .toArray(new InputSplit[finalSplits.size()]))), emMap, rootInputSpecUpdate);
     }
 
     // Set the actual events for the tasks.
+    LOG.info("For input name: " + inputName + " task events size is " + taskEvents.size());
     context.addRootInputEvents(inputName, taskEvents);
     if (inputToGroupedSplitMap.isEmpty() == false) {
       for (Entry<String, Multimap<Integer, InputSplit>> entry : inputToGroupedSplitMap.entrySet()) {
         processAllSideEvents(entry.getKey(), entry.getValue());
       }
+      setVertexParallelismAndRootInputSpec(inputNameInputSpecMap);
       inputToGroupedSplitMap.clear();
     }
+
+    // Only done when it is a bucket map join only no SMB.
+    if (numInputsAffectingRootInputSpecUpdate == 1) {
+      setVertexParallelismAndRootInputSpec(inputNameInputSpecMap);
+    }
+  }
+
+  private void
+      setVertexParallelismAndRootInputSpec(Map<String, InputSpecUpdate> rootInputSpecUpdate)
+          throws IOException {
+    if (numInputsAffectingRootInputSpecUpdate != numInputsSeenSoFar) {
+      return;
+    }
+
+    LOG.info("Setting vertex parallelism since we have seen all inputs.");
+
+    context.setVertexParallelism(taskCount, VertexLocationHint.create(grouper
+        .createTaskLocationHints(finalSplits.toArray(new InputSplit[finalSplits.size()]))), emMap,
+        rootInputSpecUpdate);
   }
 
   UserPayload getBytePayload(Multimap<Integer, Integer> routingTable) throws IOException {
@@ -377,14 +505,14 @@ public class CustomPartitionVertex exten
    * This method generates the map of bucket to file splits.
    */
   private Multimap<Integer, InputSplit> getBucketSplitMapForPath(
-      Map<String, List<FileSplit>> pathFileSplitsMap) {
+      Map<String, Set<FileSplit>> pathFileSplitsMap) {
 
     int bucketNum = 0;
 
     Multimap<Integer, InputSplit> bucketToInitialSplitMap =
         ArrayListMultimap.<Integer, InputSplit> create();
 
-    for (Map.Entry<String, List<FileSplit>> entry : pathFileSplitsMap.entrySet()) {
+    for (Map.Entry<String, Set<FileSplit>> entry : pathFileSplitsMap.entrySet()) {
       int bucketId = bucketNum % numBuckets;
       for (FileSplit fsplit : entry.getValue()) {
         bucketToInitialSplitMap.put(bucketId, fsplit);
@@ -392,6 +520,11 @@ public class CustomPartitionVertex exten
       bucketNum++;
     }
 
+    // this is just for SMB join use-case. The numBuckets would be equal to that of the big table
+    // and the small table could have lesser number of buckets. In this case, we want to send the
+    // data from the right buckets to the big table side. For e.g. Big table has 8 buckets and small
+    // table has 4 buckets, bucket 0 of small table needs to be sent to bucket 4 of the big table as
+    // well.
     if (bucketNum < numBuckets) {
       int loopedBucketId = 0;
       for (; bucketNum < numBuckets; bucketNum++) {

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/CustomVertexConfiguration.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/CustomVertexConfiguration.java?rev=1633988&r1=1633987&r2=1633988&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/CustomVertexConfiguration.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/CustomVertexConfiguration.java Fri Oct 24 02:42:19 2014
@@ -29,20 +29,31 @@ import org.apache.hadoop.io.Writable;
  * This class is the payload for custom vertex. It serializes and de-serializes
  * @numBuckets: the number of buckets of the "big table"
  * @vertexType: this is the type of vertex and differentiates between bucket map join and SMB joins
- * @inputName: This is the name of the input. Used in case of SMB joins
+ * @numInputs: The number of inputs that are directly connected to the vertex (MRInput/MultiMRInput).
+ *             In case of bucket map join, it is always 1.
+ * @inputName: This is the name of the input. Used in case of SMB joins. Empty in case of BucketMapJoin
  */
 public class CustomVertexConfiguration implements Writable {
 
   private int numBuckets;
   private VertexType vertexType = VertexType.AUTO_INITIALIZED_EDGES;
+  private int numInputs;
   private String inputName;
 
   public CustomVertexConfiguration() {
   }
 
-  public CustomVertexConfiguration(int numBuckets, VertexType vertexType, String inputName) {
+  // this is the constructor to use for the Bucket map join case.
+  public CustomVertexConfiguration(int numBuckets, VertexType vertexType) {
+    this(numBuckets, vertexType, "", 1);
+  }
+
+  // this is the constructor to use for SMB.
+  public CustomVertexConfiguration(int numBuckets, VertexType vertexType, String inputName,
+      int numInputs) {
     this.numBuckets = numBuckets;
     this.vertexType = vertexType;
+    this.numInputs = numInputs;
     this.inputName = inputName;
   }
 
@@ -50,6 +61,7 @@ public class CustomVertexConfiguration i
   public void write(DataOutput out) throws IOException {
     out.writeInt(this.vertexType.ordinal());
     out.writeInt(this.numBuckets);
+    out.writeInt(numInputs);
     out.writeUTF(inputName);
   }
 
@@ -57,6 +69,7 @@ public class CustomVertexConfiguration i
   public void readFields(DataInput in) throws IOException {
     this.vertexType = VertexType.values()[in.readInt()];
     this.numBuckets = in.readInt();
+    this.numInputs = in.readInt();
     this.inputName = in.readUTF();
   }
 
@@ -71,4 +84,8 @@ public class CustomVertexConfiguration i
   public String getInputName() {
     return inputName;
   }
+
+  public int getNumInputs() {
+    return numInputs;
+  }
 }

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java?rev=1633988&r1=1633987&r2=1633988&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java Fri Oct 24 02:42:19 2014
@@ -20,8 +20,6 @@ package org.apache.hadoop.hive.ql.exec.t
 import com.google.common.base.Function;
 import com.google.common.collect.Iterators;
 import com.google.common.collect.Lists;
-import com.google.protobuf.ByteString;
-
 import javax.security.auth.login.LoginException;
 import java.io.FileNotFoundException;
 import java.io.IOException;
@@ -49,7 +47,6 @@ import org.apache.hadoop.hive.conf.HiveC
 import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
 import org.apache.hadoop.hive.ql.Context;
 import org.apache.hadoop.hive.ql.ErrorMsg;
-import org.apache.hadoop.hive.ql.exec.CommonMergeJoinOperator;
 import org.apache.hadoop.hive.ql.exec.Operator;
 import org.apache.hadoop.hive.ql.exec.Utilities;
 import org.apache.hadoop.hive.ql.exec.mr.ExecMapper;
@@ -111,16 +108,13 @@ import org.apache.tez.dag.api.Vertex;
 import org.apache.tez.dag.api.VertexGroup;
 import org.apache.tez.dag.api.VertexManagerPluginDescriptor;
 import org.apache.tez.dag.library.vertexmanager.ShuffleVertexManager;
-import org.apache.tez.mapreduce.common.MRInputAMSplitGenerator;
 import org.apache.tez.mapreduce.hadoop.MRHelpers;
 import org.apache.tez.mapreduce.hadoop.MRInputHelpers;
 import org.apache.tez.mapreduce.hadoop.MRJobConfig;
-import org.apache.tez.mapreduce.input.MRInput;
 import org.apache.tez.mapreduce.input.MRInputLegacy;
 import org.apache.tez.mapreduce.input.MultiMRInput;
 import org.apache.tez.mapreduce.output.MROutput;
 import org.apache.tez.mapreduce.partition.MRPartitioner;
-import org.apache.tez.mapreduce.protos.MRRuntimeProtos;
 import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
 import org.apache.tez.runtime.library.common.comparator.TezBytesComparator;
 import org.apache.tez.runtime.library.common.serializer.TezBytesWritableSerialization;
@@ -271,8 +265,7 @@ public class DagUtils {
     case CUSTOM_EDGE: {
       mergeInputClass = ConcatenatedMergedKeyValueInput.class;
       int numBuckets = edgeProp.getNumBuckets();
-      CustomVertexConfiguration vertexConf =
-          new CustomVertexConfiguration(numBuckets, vertexType, "");
+      CustomVertexConfiguration vertexConf = new CustomVertexConfiguration(numBuckets, vertexType);
       DataOutputBuffer dob = new DataOutputBuffer();
       vertexConf.write(dob);
       VertexManagerPluginDescriptor desc =
@@ -317,8 +310,7 @@ public class DagUtils {
     switch(edgeProp.getEdgeType()) {
     case CUSTOM_EDGE: {
       int numBuckets = edgeProp.getNumBuckets();
-      CustomVertexConfiguration vertexConf =
-          new CustomVertexConfiguration(numBuckets, vertexType, "");
+      CustomVertexConfiguration vertexConf = new CustomVertexConfiguration(numBuckets, vertexType);
       DataOutputBuffer dob = new DataOutputBuffer();
       vertexConf.write(dob);
       VertexManagerPluginDescriptor desc = VertexManagerPluginDescriptor.create(
@@ -343,7 +335,6 @@ public class DagUtils {
   /*
    * Helper function to create an edge property from an edge type.
    */
-  @SuppressWarnings("rawtypes")
   private EdgeProperty createEdgeProperty(TezEdgeProperty edgeProp, Configuration conf)
       throws IOException {
     MRHelpers.translateMRConfToTez(conf);
@@ -435,7 +426,7 @@ public class DagUtils {
       HiveConf.getIntVar(conf, HiveConf.ConfVars.HIVETEZCONTAINERSIZE) :
       conf.getInt(MRJobConfig.MAP_MEMORY_MB, MRJobConfig.DEFAULT_MAP_MEMORY_MB);
     int cpus = HiveConf.getIntVar(conf, HiveConf.ConfVars.HIVETEZCPUVCORES) > 0 ?
-      HiveConf.getIntVar(conf, HiveConf.ConfVars.HIVETEZCPUVCORES) : 
+      HiveConf.getIntVar(conf, HiveConf.ConfVars.HIVETEZCPUVCORES) :
       conf.getInt(MRJobConfig.MAP_CPU_VCORES, MRJobConfig.DEFAULT_MAP_CPU_VCORES);
     return Resource.newInstance(memory, cpus);
   }
@@ -489,13 +480,9 @@ public class DagUtils {
     if (mergeJoinWork.getMainWork() instanceof MapWork) {
       List<BaseWork> mapWorkList = mergeJoinWork.getBaseWorkList();
       MapWork mapWork = (MapWork) (mergeJoinWork.getMainWork());
-      CommonMergeJoinOperator mergeJoinOp = mergeJoinWork.getMergeJoinOperator();
       Vertex mergeVx =
           createVertex(conf, mapWork, appJarLr, additionalLr, fs, mrScratchDir, ctx, vertexType);
 
-      // grouping happens in execution phase. Setting the class to TezGroupedSplitsInputFormat
-      // here would cause pre-mature grouping which would be incorrect.
-      Class inputFormatClass = HiveInputFormat.class;
       conf.setClass("mapred.input.format.class", HiveInputFormat.class, InputFormat.class);
       // mapreduce.tez.input.initializer.serialize.event.payload should be set
       // to false when using this plug-in to avoid getting a serialized event at run-time.
@@ -512,9 +499,11 @@ public class DagUtils {
 
       VertexManagerPluginDescriptor desc =
         VertexManagerPluginDescriptor.create(CustomPartitionVertex.class.getName());
+      // the +1 to the size is because of the main work.
       CustomVertexConfiguration vertexConf =
           new CustomVertexConfiguration(mergeJoinWork.getMergeJoinOperator().getConf()
-              .getNumBuckets(), vertexType, mergeJoinWork.getBigTableAlias());
+              .getNumBuckets(), vertexType, mergeJoinWork.getBigTableAlias(),
+              mapWorkList.size() + 1);
       DataOutputBuffer dob = new DataOutputBuffer();
       vertexConf.write(dob);
       byte[] userPayload = dob.getData();
@@ -554,6 +543,7 @@ public class DagUtils {
     DataSourceDescriptor dataSource;
 
     int numTasks = -1;
+    @SuppressWarnings("rawtypes")
     Class inputFormatClass = conf.getClass("mapred.input.format.class",
         InputFormat.class);
 
@@ -611,7 +601,13 @@ public class DagUtils {
             .setCustomInitializerDescriptor(descriptor).build();
       } else {
         // Not HiveInputFormat, or a custom VertexManager will take care of grouping splits
-        dataSource = MRInputLegacy.createConfigBuilder(conf, inputFormatClass).groupSplits(false).build();
+        if (vertexHasCustomInput) {
+          dataSource =
+              MultiMRInput.createConfigBuilder(conf, inputFormatClass).groupSplits(false).build();
+        } else {
+          dataSource =
+              MRInputLegacy.createConfigBuilder(conf, inputFormatClass).groupSplits(false).build();
+        }
       }
     } else {
       // Setup client side split generation.
@@ -763,6 +759,7 @@ public class DagUtils {
    * @throws LoginException if we are unable to figure user information
    * @throws IOException when any dfs operation fails.
    */
+  @SuppressWarnings("deprecation")
   public Path getDefaultDestDir(Configuration conf) throws LoginException, IOException {
     UserGroupInformation ugi = ShimLoader.getHadoopShims().getUGIForConf(conf);
     String userName = ShimLoader.getHadoopShims().getShortUserName(ugi);
@@ -875,6 +872,7 @@ public class DagUtils {
     return fstatus;
   }
 
+  @SuppressWarnings("deprecation")
   public static FileStatus validateTargetDir(Path path, Configuration conf) throws IOException {
     FileSystem fs = path.getFileSystem(conf);
     FileStatus fstatus = null;
@@ -1051,6 +1049,7 @@ public class DagUtils {
    * @param ctx This query's context
    * @return Vertex
    */
+  @SuppressWarnings("deprecation")
   public Vertex createVertex(JobConf conf, BaseWork work,
       Path scratchDir, LocalResource appJarLr,
       List<LocalResource> additionalLr, FileSystem fileSystem, Context ctx, boolean hasChildren,

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HiveSplitGenerator.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HiveSplitGenerator.java?rev=1633988&r1=1633987&r2=1633988&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HiveSplitGenerator.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HiveSplitGenerator.java Fri Oct 24 02:42:19 2014
@@ -18,6 +18,8 @@
 
 package org.apache.hadoop.hive.ql.exec.tez;
 
+import java.io.IOException;
+import java.util.Collection;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -64,7 +66,6 @@ import com.google.common.collect.Multima
  * making sure that splits from different partitions are only grouped if they
  * are of the same schema, format and serde
  */
-@SuppressWarnings("deprecation")
 public class HiveSplitGenerator extends InputInitializer {
 
   private static final Log LOG = LogFactory.getLog(HiveSplitGenerator.class);
@@ -72,11 +73,17 @@ public class HiveSplitGenerator extends 
   private static final SplitGrouper grouper = new SplitGrouper();
   private final DynamicPartitionPruner pruner = new DynamicPartitionPruner();
   private InputInitializerContext context;
+  private static Map<Map<String, PartitionDesc>, Map<String, PartitionDesc>> cache =
+      new HashMap<Map<String, PartitionDesc>, Map<String, PartitionDesc>>();
 
   public HiveSplitGenerator(InputInitializerContext initializerContext) {
     super(initializerContext);
   }
 
+  public HiveSplitGenerator() {
+    this(null);
+  }
+
   @Override
   public List<Event> initialize() throws Exception {
     InputInitializerContext rootInputContext = getContext();
@@ -150,58 +157,28 @@ public class HiveSplitGenerator extends 
   }
 
 
-  public static Multimap<Integer, InputSplit> generateGroupedSplits(JobConf jobConf,
+  public Multimap<Integer, InputSplit> generateGroupedSplits(JobConf jobConf,
       Configuration conf, InputSplit[] splits, float waves, int availableSlots)
       throws Exception {
-    return generateGroupedSplits(jobConf, conf, splits, waves, availableSlots, null);
+    return generateGroupedSplits(jobConf, conf, splits, waves, availableSlots, null, true);
   }
 
-  public static Multimap<Integer, InputSplit> generateGroupedSplits(JobConf jobConf,
-      Configuration conf, InputSplit[] splits, float waves, int availableSlots,
-      String inputName) throws Exception {
-
-    MapWork work = null;
-    if (inputName != null) {
-      work = (MapWork) Utilities.getMergeWork(jobConf, inputName);
-      // work can still be null if there is no merge work for this input
-    }
-    if (work == null) {
-      work = Utilities.getMapWork(jobConf);
-    }
+  public Multimap<Integer, InputSplit> generateGroupedSplits(JobConf jobConf,
+      Configuration conf, InputSplit[] splits, float waves, int availableSlots, String inputName,
+      boolean groupAcrossFiles) throws Exception {
 
+    MapWork work = populateMapWork(jobConf, inputName);
     Multimap<Integer, InputSplit> bucketSplitMultiMap =
         ArrayListMultimap.<Integer, InputSplit> create();
 
-    Class<?> previousInputFormatClass = null;
-    String previousDeserializerClass = null;
-    Map<Map<String, PartitionDesc>, Map<String, PartitionDesc>> cache =
-        new HashMap<Map<String, PartitionDesc>, Map<String, PartitionDesc>>();
-
     int i = 0;
-
+    InputSplit prevSplit = null;
     for (InputSplit s : splits) {
       // this is the bit where we make sure we don't group across partition
       // schema boundaries
-
-      Path path = ((FileSplit) s).getPath();
-
-      PartitionDesc pd =
-          HiveFileFormatUtils.getPartitionDescFromPathRecursively(work.getPathToPartitionInfo(),
-              path, cache);
-
-      String currentDeserializerClass = pd.getDeserializerClassName();
-      Class<?> currentInputFormatClass = pd.getInputFileFormatClass();
-
-      if ((currentInputFormatClass != previousInputFormatClass)
-          || (!currentDeserializerClass.equals(previousDeserializerClass))) {
+      if (schemaEvolved(s, prevSplit, groupAcrossFiles, work)) {
         ++i;
-      }
-
-      previousInputFormatClass = currentInputFormatClass;
-      previousDeserializerClass = currentDeserializerClass;
-
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("Adding split " + path + " to src group " + i);
+        prevSplit = s;
       }
       bucketSplitMultiMap.put(i, s);
     }
@@ -214,6 +191,54 @@ public class HiveSplitGenerator extends 
     return groupedSplits;
   }
 
+  private MapWork populateMapWork(JobConf jobConf, String inputName) {
+    MapWork work = null;
+    if (inputName != null) {
+      work = (MapWork) Utilities.getMergeWork(jobConf, inputName);
+      // work can still be null if there is no merge work for this input
+    }
+    if (work == null) {
+      work = Utilities.getMapWork(jobConf);
+    }
+
+    return work;
+  }
+
+  public boolean schemaEvolved(InputSplit s, InputSplit prevSplit, boolean groupAcrossFiles,
+      MapWork work) throws IOException {
+    boolean retval = false;
+    Path path = ((FileSplit) s).getPath();
+    PartitionDesc pd =
+        HiveFileFormatUtils.getPartitionDescFromPathRecursively(work.getPathToPartitionInfo(),
+            path, cache);
+    String currentDeserializerClass = pd.getDeserializerClassName();
+    Class<?> currentInputFormatClass = pd.getInputFileFormatClass();
+
+    Class<?> previousInputFormatClass = null;
+    String previousDeserializerClass = null;
+    if (prevSplit != null) {
+      Path prevPath = ((FileSplit) prevSplit).getPath();
+      if (!groupAcrossFiles) {
+        return !path.equals(prevPath);
+      }
+      PartitionDesc prevPD =
+          HiveFileFormatUtils.getPartitionDescFromPathRecursively(work.getPathToPartitionInfo(),
+              prevPath, cache);
+      previousDeserializerClass = prevPD.getDeserializerClassName();
+      previousInputFormatClass = prevPD.getInputFileFormatClass();
+    }
+
+    if ((currentInputFormatClass != previousInputFormatClass)
+        || (!currentDeserializerClass.equals(previousDeserializerClass))) {
+      retval = true;
+    }
+
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Adding split " + path + " to src new group? " + retval);
+    }
+    return retval;
+  }
+
   private List<Event> createEventList(boolean sendSerializedEvents, InputSplitInfoMem inputSplitInfo) {
 
     List<Event> events = Lists.newArrayListWithCapacity(inputSplitInfo.getNumTasks() + 1);

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java?rev=1633988&r1=1633987&r2=1633988&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java Fri Oct 24 02:42:19 2014
@@ -17,10 +17,10 @@
  */
 package org.apache.hadoop.hive.ql.exec.tez;
 
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -44,15 +44,15 @@ import org.apache.hadoop.hive.ql.exec.mr
 import org.apache.hadoop.hive.ql.exec.tez.TezProcessor.TezKVOutputCollector;
 import org.apache.hadoop.hive.ql.exec.tez.tools.KeyValueInputMerger;
 import org.apache.hadoop.hive.ql.exec.vector.VectorMapOperator;
-import org.apache.hadoop.hive.ql.io.IOContext;
 import org.apache.hadoop.hive.ql.log.PerfLogger;
 import org.apache.hadoop.hive.ql.plan.MapWork;
 import org.apache.hadoop.hive.ql.plan.OperatorDesc;
+import org.apache.hadoop.hive.serde2.Deserializer;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.tez.mapreduce.input.MRInputLegacy;
 import org.apache.tez.mapreduce.input.MultiMRInput;
 import org.apache.tez.mapreduce.processor.MRTaskReporter;
-import org.apache.tez.runtime.api.Input;
 import org.apache.tez.runtime.api.LogicalInput;
 import org.apache.tez.runtime.api.LogicalOutput;
 import org.apache.tez.runtime.api.ProcessorContext;
@@ -73,6 +73,7 @@ public class MapRecordProcessor extends 
   private int position = 0;
   private boolean foundCachedMergeWork = false;
   MRInputLegacy legacyMRInput = null;
+  MultiMRInput mainWorkMultiMRInput = null;
   private ExecMapperContext execContext = null;
   private boolean abort = false;
   protected static final String MAP_PLAN_KEY = "__MAP_PLAN__";
@@ -129,12 +130,14 @@ public class MapRecordProcessor extends 
     perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.TEZ_INIT_OPERATORS);
     super.init(jconf, processorContext, mrReporter, inputs, outputs);
 
-    //Update JobConf using MRInput, info like filename comes via this
+    // Update JobConf using MRInput, info like filename comes via this
     legacyMRInput = getMRInput(inputs);
-    Configuration updatedConf = legacyMRInput.getConfigUpdates();
-    if (updatedConf != null) {
-      for (Entry<String, String> entry : updatedConf) {
-        jconf.set(entry.getKey(), entry.getValue());
+    if (legacyMRInput != null) {
+      Configuration updatedConf = legacyMRInput.getConfigUpdates();
+      if (updatedConf != null) {
+        for (Entry<String, String> entry : updatedConf) {
+          jconf.set(entry.getKey(), entry.getValue());
+        }
       }
     }
 
@@ -158,8 +161,6 @@ public class MapRecordProcessor extends 
       if (mergeWorkList != null) {
         MapOperator mergeMapOp = null;
         for (MapWork mergeMapWork : mergeWorkList) {
-          processorContext.waitForAnyInputReady(Collections.singletonList((Input) (inputs
-              .get(mergeMapWork.getName()))));
           if (mergeMapWork.getVectorMode()) {
             mergeMapOp = new VectorMapOperator();
           } else {
@@ -235,11 +236,17 @@ public class MapRecordProcessor extends 
   }
 
   private void initializeMapRecordSources() throws Exception {
+
     int size = mergeMapOpList.size() + 1; // the +1 is for the main map operator itself
     sources = new MapRecordSource[size];
-    KeyValueReader reader = legacyMRInput.getReader();
     position = mapOp.getConf().getTag();
     sources[position] = new MapRecordSource();
+    KeyValueReader reader = null;
+    if (mainWorkMultiMRInput != null) {
+      reader = getKeyValueReader(mainWorkMultiMRInput.getKeyValueReaders(), mapOp);
+    } else {
+      reader = legacyMRInput.getReader();
+    }
     sources[position].init(jconf, mapOp, reader);
     for (MapOperator mapOp : mergeMapOpList) {
       int tag = mapOp.getConf().getTag();
@@ -248,13 +255,28 @@ public class MapRecordProcessor extends 
       MultiMRInput multiMRInput = multiMRInputMap.get(inputName);
       Collection<KeyValueReader> kvReaders = multiMRInput.getKeyValueReaders();
       l4j.debug("There are " + kvReaders.size() + " key-value readers for input " + inputName);
-      List<KeyValueReader> kvReaderList = new ArrayList<KeyValueReader>(kvReaders);
-      reader = new KeyValueInputMerger(kvReaderList);
+      reader = getKeyValueReader(kvReaders, mapOp);
       sources[tag].init(jconf, mapOp, reader);
     }
     ((TezContext) MapredContext.get()).setRecordSources(sources);
   }
 
+  @SuppressWarnings("deprecation")
+  private KeyValueReader getKeyValueReader(Collection<KeyValueReader> keyValueReaders,
+      MapOperator mapOp)
+      throws Exception {
+    List<KeyValueReader> kvReaderList = new ArrayList<KeyValueReader>(keyValueReaders);
+    // this sets up the map operator contexts correctly
+    mapOp.initializeContexts();
+    Deserializer deserializer = mapOp.getCurrentDeserializer();
+    KeyValueReader reader =
+        new KeyValueInputMerger(kvReaderList, deserializer,
+            new ObjectInspector[] { deserializer.getObjectInspector() }, mapOp
+                .getConf()
+                .getSortCols());
+    return reader;
+  }
+
   private DummyStoreOperator getJoinParentOp(Operator<? extends OperatorDesc> mergeMapOp) {
     for (Operator<? extends OperatorDesc> childOp : mergeMapOp.getChildOperators()) {
       if ((childOp.getChildOperators() == null) || (childOp.getChildOperators().isEmpty())) {
@@ -335,7 +357,17 @@ public class MapRecordProcessor extends 
         multiMRInputMap.put(inp.getKey(), (MultiMRInput) inp.getValue());
       }
     }
-    theMRInput.init();
+    if (theMRInput != null) {
+      theMRInput.init();
+    } else {
+      String alias = mapWork.getAliasToWork().keySet().iterator().next();
+      if (inputs.get(alias) instanceof MultiMRInput) {
+        mainWorkMultiMRInput = (MultiMRInput) inputs.get(alias);
+      } else {
+        throw new IOException("Unexpected input type found: "
+            + inputs.get(alias).getClass().getCanonicalName());
+      }
+    }
     return theMRInput;
   }
 }

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordSource.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordSource.java?rev=1633988&r1=1633987&r2=1633988&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordSource.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordSource.java Fri Oct 24 02:42:19 2014
@@ -19,7 +19,6 @@
 package org.apache.hadoop.hive.ql.exec.tez;
 
 import java.io.IOException;
-
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hive.ql.exec.MapOperator;
@@ -28,7 +27,6 @@ import org.apache.hadoop.hive.ql.metadat
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.util.StringUtils;
-import org.apache.tez.mapreduce.input.MRInput;
 import org.apache.tez.runtime.library.api.KeyValueReader;
 
 /**

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MergeFileRecordProcessor.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MergeFileRecordProcessor.java?rev=1633988&r1=1633987&r2=1633988&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MergeFileRecordProcessor.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MergeFileRecordProcessor.java Fri Oct 24 02:42:19 2014
@@ -204,6 +204,7 @@ public class MergeFileRecordProcessor ex
   private MRInputLegacy getMRInput(Map<String, LogicalInput> inputs) throws Exception {
     // there should be only one MRInput
     MRInputLegacy theMRInput = null;
+    LOG.info("VDK: the inputs are: " + inputs);
     for (Entry<String, LogicalInput> inp : inputs.entrySet()) {
       if (inp.getValue() instanceof MRInputLegacy) {
         if (theMRInput != null) {

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/SplitGrouper.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/SplitGrouper.java?rev=1633988&r1=1633987&r2=1633988&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/SplitGrouper.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/SplitGrouper.java Fri Oct 24 02:42:19 2014
@@ -102,9 +102,19 @@ public class SplitGrouper {
 
     // compute the total size per bucket
     long totalSize = 0;
+    boolean earlyExit = false;
     for (int bucketId : bucketSplitMap.keySet()) {
       long size = 0;
       for (InputSplit s : bucketSplitMap.get(bucketId)) {
+        // the incoming split may not be a file split when we are re-grouping TezGroupedSplits in
+        // the case of SMB join. So in this case, we can do an early exit by not doing the
+        // calculation for bucketSizeMap. Each bucket will assume it can fill availableSlots * waves
+        // (preset to 0.5) for SMB join.
+        if (!(s instanceof FileSplit)) {
+          bucketTaskMap.put(bucketId, (int) (availableSlots * waves));
+          earlyExit = true;
+          continue;
+        }
         FileSplit fsplit = (FileSplit) s;
         size += fsplit.getLength();
         totalSize += fsplit.getLength();
@@ -112,6 +122,10 @@ public class SplitGrouper {
       bucketSizeMap.put(bucketId, size);
     }
 
+    if (earlyExit) {
+      return bucketTaskMap;
+    }
+
     // compute the number of tasks
     for (int bucketId : bucketSizeMap.keySet()) {
       int numEstimatedTasks = 0;

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java?rev=1633988&r1=1633987&r2=1633988&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java Fri Oct 24 02:42:19 2014
@@ -19,12 +19,9 @@ package org.apache.hadoop.hive.ql.exec.t
 
 import java.io.IOException;
 import java.text.NumberFormat;
-import java.util.Arrays;
-import java.util.HashMap;
+import java.util.Collections;
 import java.util.List;
 import java.util.Map;
-import java.util.Map.Entry;
-
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -33,11 +30,10 @@ import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.OutputCollector;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.tez.common.TezUtils;
-import org.apache.tez.mapreduce.input.MRInputLegacy;
-import org.apache.tez.mapreduce.input.MultiMRInput;
 import org.apache.tez.mapreduce.processor.MRTaskReporter;
 import org.apache.tez.runtime.api.AbstractLogicalIOProcessor;
 import org.apache.tez.runtime.api.Event;
+import org.apache.tez.runtime.api.Input;
 import org.apache.tez.runtime.api.LogicalInput;
 import org.apache.tez.runtime.api.LogicalOutput;
 import org.apache.tez.runtime.api.ProcessorContext;
@@ -152,10 +148,13 @@ public class TezProcessor extends Abstra
       // Start the actual Inputs. After MRInput initialization.
       for (Map.Entry<String, LogicalInput> inputEntry : inputs.entrySet()) {
         if (!cacheAccess.isInputCached(inputEntry.getKey())) {
-          LOG.info("Input: " + inputEntry.getKey() + " is not cached");
+          LOG.info("Starting input " + inputEntry.getKey());
           inputEntry.getValue().start();
+          processorContext.waitForAnyInputReady(Collections.singletonList((Input) (inputEntry
+              .getValue())));
         } else {
-          LOG.info("Input: " + inputEntry.getKey() + " is already cached. Skipping start");
+          LOG.info("Input: " + inputEntry.getKey()
+              + " is already cached. Skipping start and wait for ready");
         }
       }
 
@@ -194,6 +193,7 @@ public class TezProcessor extends Abstra
    * Must be initialized before it is used.
    *
    */
+  @SuppressWarnings("rawtypes")
   static class TezKVOutputCollector implements OutputCollector {
     private KeyValueWriter writer;
     private final LogicalOutput output;

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/tools/KeyValueInputMerger.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/tools/KeyValueInputMerger.java?rev=1633988&r1=1633987&r2=1633988&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/tools/KeyValueInputMerger.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/tools/KeyValueInputMerger.java Fri Oct 24 02:42:19 2014
@@ -18,13 +18,23 @@
 package org.apache.hadoop.hive.ql.exec.tez.tools;
 
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.Comparator;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.PriorityQueue;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.io.BinaryComparable;
+import org.apache.hadoop.hive.serde2.Deserializer;
+import org.apache.hadoop.hive.serde2.SerDeException;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils;
+import org.apache.hadoop.hive.serde2.objectinspector.StructField;
+import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption;
+import org.apache.hadoop.io.Writable;
 import org.apache.tez.runtime.library.api.KeyValueReader;
 
 /**
@@ -34,16 +44,36 @@ import org.apache.tez.runtime.library.ap
  * Uses a priority queue to pick the KeyValuesReader of the input that is next in
  * sort order.
  */
+@SuppressWarnings("deprecation")
 public class KeyValueInputMerger extends KeyValueReader {
 
   public static final Log l4j = LogFactory.getLog(KeyValueInputMerger.class);
   private PriorityQueue<KeyValueReader> pQueue = null;
   private KeyValueReader nextKVReader = null;
+  private ObjectInspector[] inputObjInspectors = null;
+  private Deserializer deserializer = null;
+  private List<StructField> structFields = null;
+  private List<ObjectInspector> fieldOIs = null;
+  private final Map<KeyValueReader, List<Object>> kvReaderStandardObjMap =
+      new HashMap<KeyValueReader, List<Object>>();
 
-  public KeyValueInputMerger(List<KeyValueReader> multiMRInputs) throws Exception {
+  public KeyValueInputMerger(List<KeyValueReader> multiMRInputs, Deserializer deserializer,
+      ObjectInspector[] inputObjInspectors, List<String> sortCols) throws Exception {
     //get KeyValuesReaders from the LogicalInput and add them to priority queue
     int initialCapacity = multiMRInputs.size();
     pQueue = new PriorityQueue<KeyValueReader>(initialCapacity, new KVReaderComparator());
+    this.inputObjInspectors = inputObjInspectors;
+    this.deserializer = deserializer;
+    fieldOIs = new ArrayList<ObjectInspector>();
+    structFields = new ArrayList<StructField>();
+    StructObjectInspector structOI = (StructObjectInspector) inputObjInspectors[0];
+    for (String field : sortCols) {
+      StructField sf = structOI.getStructFieldRef(field);
+      structFields.add(sf);
+      ObjectInspector stdOI =
+          ObjectInspectorUtils.getStandardObjectInspector(sf.getFieldObjectInspector());
+      fieldOIs.add(stdOI);
+    }
     l4j.info("Initialized the priority queue with multi mr inputs: " + multiMRInputs.size());
     for (KeyValueReader input : multiMRInputs) {
       addToQueue(input);
@@ -58,6 +88,7 @@ public class KeyValueInputMerger extends
    */
   private void addToQueue(KeyValueReader kvReader) throws IOException {
     if (kvReader.next()) {
+      kvReaderStandardObjMap.remove(kvReader);
       pQueue.add(kvReader);
     }
   }
@@ -93,12 +124,53 @@ public class KeyValueInputMerger extends
    */
   class KVReaderComparator implements Comparator<KeyValueReader> {
 
+    @SuppressWarnings({ "unchecked" })
     @Override
     public int compare(KeyValueReader kvReadr1, KeyValueReader kvReadr2) {
       try {
-        BinaryComparable key1 = (BinaryComparable) kvReadr1.getCurrentValue();
-        BinaryComparable key2 = (BinaryComparable) kvReadr2.getCurrentValue();
-        return key1.compareTo(key2);
+        ObjectInspector oi = inputObjInspectors[0];
+        List<Object> row1, row2;
+        try {
+          if (kvReaderStandardObjMap.containsKey(kvReadr1)) {
+            row1 = kvReaderStandardObjMap.get(kvReadr1);
+          } else {
+            // we need to copy to standard object otherwise deserializer overwrites the values
+            row1 =
+                (List<Object>) ObjectInspectorUtils.copyToStandardObject(
+                    deserializer.deserialize((Writable) kvReadr1.getCurrentValue()), oi,
+                    ObjectInspectorCopyOption.WRITABLE);
+            kvReaderStandardObjMap.put(kvReadr1, row1);
+          }
+
+          if (kvReaderStandardObjMap.containsKey(kvReadr2)) {
+            row2 = kvReaderStandardObjMap.get(kvReadr2);
+          } else {
+            row2 =
+                (List<Object>) ObjectInspectorUtils.copyToStandardObject(
+                    deserializer.deserialize((Writable) kvReadr2.getCurrentValue()), oi,
+                    ObjectInspectorCopyOption.WRITABLE);
+            kvReaderStandardObjMap.put(kvReadr2, row2);
+          }
+        } catch (SerDeException e) {
+          throw new IOException(e);
+        }
+
+        StructObjectInspector structOI = (StructObjectInspector) oi;
+        int compare = 0;
+        int index = 0;
+        for (StructField sf : structFields) {
+          int pos = structOI.getAllStructFieldRefs().indexOf(sf);
+          Object key1 = row1.get(pos);
+          Object key2 = row2.get(pos);
+          ObjectInspector stdOI = fieldOIs.get(index);
+          compare = ObjectInspectorUtils.compare(key1, stdOI, key2, stdOI);
+          index++;
+          if (compare != 0) {
+            return compare;
+          }
+        }
+
+        return compare;
       } catch (IOException e) {
         l4j.error("Caught exception while reading shuffle input", e);
         //die!

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/IOContext.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/IOContext.java?rev=1633988&r1=1633987&r2=1633988&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/IOContext.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/IOContext.java Fri Oct 24 02:42:19 2014
@@ -21,11 +21,7 @@ package org.apache.hadoop.hive.ql.io;
 import java.util.HashMap;
 import java.util.Map;
 
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hive.ql.optimizer.ConvertJoinMapJoin;
-import org.apache.hadoop.hive.ql.session.SessionState;
 
 
 /**
@@ -44,8 +40,6 @@ public class IOContext {
  };
 
   private static Map<String, IOContext> inputNameIOContextMap = new HashMap<String, IOContext>();
-  private static IOContext ioContext = new IOContext();
-
   public static Map<String, IOContext> getMap() {
     return inputNameIOContextMap;
   }
@@ -61,7 +55,7 @@ public class IOContext {
 
   public static void clear() {
     IOContext.threadLocal.remove();
-    ioContext = new IOContext();
+    inputNameIOContextMap.clear();
   }
 
   long currentBlockStart;

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java?rev=1633988&r1=1633987&r2=1633988&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java Fri Oct 24 02:42:19 2014
@@ -85,13 +85,18 @@ public class ConvertJoinMapJoin implemen
 
     JoinOperator joinOp = (JoinOperator) nd;
 
-    if (!context.conf.getBoolVar(HiveConf.ConfVars.HIVECONVERTJOIN)
-        && !(context.conf.getBoolVar(HiveConf.ConfVars.HIVE_AUTO_SORTMERGE_JOIN))) {
+    TezBucketJoinProcCtx tezBucketJoinProcCtx = new TezBucketJoinProcCtx(context.conf);
+    if (!context.conf.getBoolVar(HiveConf.ConfVars.HIVECONVERTJOIN)) {
       // we are just converting to a common merge join operator. The shuffle
       // join in map-reduce case.
-      int pos = 0; // it doesn't matter which position we use in this case.
-      convertJoinSMBJoin(joinOp, context, pos, 0, false, false);
-      return null;
+      Object retval = checkAndConvertSMBJoin(context, joinOp, tezBucketJoinProcCtx);
+      if (retval == null) {
+        return retval;
+      } else {
+        int pos = 0; // it doesn't matter which position we use in this case.
+        convertJoinSMBJoin(joinOp, context, pos, 0, false, false);
+        return null;
+      }
     }
 
     // if we have traits, and table info is present in the traits, we know the
@@ -99,7 +104,6 @@ public class ConvertJoinMapJoin implemen
     // reducers from the parent operators.
     int numBuckets = -1;
     int estimatedBuckets = -1;
-    TezBucketJoinProcCtx tezBucketJoinProcCtx = new TezBucketJoinProcCtx(context.conf);
     if (context.conf.getBoolVar(HiveConf.ConfVars.HIVE_CONVERT_JOIN_BUCKET_MAPJOIN_TEZ)) {
       for (Operator<? extends OperatorDesc>parentOp : joinOp.getParentOperators()) {
         if (parentOp.getOpTraits().getNumBuckets() > 0) {
@@ -126,53 +130,15 @@ public class ConvertJoinMapJoin implemen
     LOG.info("Estimated number of buckets " + numBuckets);
     int mapJoinConversionPos = getMapJoinConversionPos(joinOp, context, numBuckets);
     if (mapJoinConversionPos < 0) {
-      // we cannot convert to bucket map join, we cannot convert to
-      // map join either based on the size. Check if we can convert to SMB join.
-      if (context.conf.getBoolVar(HiveConf.ConfVars.HIVE_AUTO_SORTMERGE_JOIN) == false) {
-        convertJoinSMBJoin(joinOp, context, 0, 0, false, false);
-        return null;
-      }
-      Class<? extends BigTableSelectorForAutoSMJ> bigTableMatcherClass = null;
-      try {
-        bigTableMatcherClass =
-            (Class<? extends BigTableSelectorForAutoSMJ>) (Class.forName(HiveConf.getVar(
-                context.parseContext.getConf(),
-                HiveConf.ConfVars.HIVE_AUTO_SORTMERGE_JOIN_BIGTABLE_SELECTOR)));
-      } catch (ClassNotFoundException e) {
-        throw new SemanticException(e.getMessage());
-      }
-
-      BigTableSelectorForAutoSMJ bigTableMatcher =
-          ReflectionUtils.newInstance(bigTableMatcherClass, null);
-      JoinDesc joinDesc = joinOp.getConf();
-      JoinCondDesc[] joinCondns = joinDesc.getConds();
-      Set<Integer> joinCandidates = MapJoinProcessor.getBigTableCandidates(joinCondns);
-      if (joinCandidates.isEmpty()) {
-        // This is a full outer join. This can never be a map-join
-        // of any type. So return false.
-        return false;
-      }
-      mapJoinConversionPos =
-          bigTableMatcher.getBigTablePosition(context.parseContext, joinOp, joinCandidates);
-      if (mapJoinConversionPos < 0) {
-        // contains aliases from sub-query
-        // we are just converting to a common merge join operator. The shuffle
-        // join in map-reduce case.
-        int pos = 0; // it doesn't matter which position we use in this case.
-        convertJoinSMBJoin(joinOp, context, pos, 0, false, false);
-        return null;
-      }
-
-      if (checkConvertJoinSMBJoin(joinOp, context, mapJoinConversionPos, tezBucketJoinProcCtx)) {
-        convertJoinSMBJoin(joinOp, context, mapJoinConversionPos,
-            tezBucketJoinProcCtx.getNumBuckets(), tezBucketJoinProcCtx.isSubQuery(), true);
+      Object retval = checkAndConvertSMBJoin(context, joinOp, tezBucketJoinProcCtx);
+      if (retval == null) {
+        return retval;
       } else {
-        // we are just converting to a common merge join operator. The shuffle
-        // join in map-reduce case.
-        int pos = 0; // it doesn't matter which position we use in this case.
-        convertJoinSMBJoin(joinOp, context, pos, 0, false, false);
+          // only case is full outer join with SMB enabled which is not possible. Convert to regular
+          // join.
+          convertJoinSMBJoin(joinOp, context, 0, 0, false, false);
+          return null;
       }
-      return null;
     }
 
     if (numBuckets > 1) {
@@ -206,6 +172,57 @@ public class ConvertJoinMapJoin implemen
     return null;
   }
 
+  private Object checkAndConvertSMBJoin(OptimizeTezProcContext context, JoinOperator joinOp,
+      TezBucketJoinProcCtx tezBucketJoinProcCtx) throws SemanticException {
+    // we cannot convert to bucket map join, we cannot convert to
+    // map join either based on the size. Check if we can convert to SMB join.
+    if (context.conf.getBoolVar(HiveConf.ConfVars.HIVE_AUTO_SORTMERGE_JOIN) == false) {
+      convertJoinSMBJoin(joinOp, context, 0, 0, false, false);
+      return null;
+    }
+    Class<? extends BigTableSelectorForAutoSMJ> bigTableMatcherClass = null;
+    try {
+      bigTableMatcherClass =
+          (Class<? extends BigTableSelectorForAutoSMJ>) (Class.forName(HiveConf.getVar(
+              context.parseContext.getConf(),
+              HiveConf.ConfVars.HIVE_AUTO_SORTMERGE_JOIN_BIGTABLE_SELECTOR)));
+    } catch (ClassNotFoundException e) {
+      throw new SemanticException(e.getMessage());
+    }
+
+    BigTableSelectorForAutoSMJ bigTableMatcher =
+        ReflectionUtils.newInstance(bigTableMatcherClass, null);
+    JoinDesc joinDesc = joinOp.getConf();
+    JoinCondDesc[] joinCondns = joinDesc.getConds();
+    Set<Integer> joinCandidates = MapJoinProcessor.getBigTableCandidates(joinCondns);
+    if (joinCandidates.isEmpty()) {
+      // This is a full outer join. This can never be a map-join
+      // of any type. So return false.
+      return false;
+    }
+    int mapJoinConversionPos =
+        bigTableMatcher.getBigTablePosition(context.parseContext, joinOp, joinCandidates);
+    if (mapJoinConversionPos < 0) {
+      // contains aliases from sub-query
+      // we are just converting to a common merge join operator. The shuffle
+      // join in map-reduce case.
+      int pos = 0; // it doesn't matter which position we use in this case.
+      convertJoinSMBJoin(joinOp, context, pos, 0, false, false);
+      return null;
+    }
+
+    if (checkConvertJoinSMBJoin(joinOp, context, mapJoinConversionPos, tezBucketJoinProcCtx)) {
+      convertJoinSMBJoin(joinOp, context, mapJoinConversionPos,
+          tezBucketJoinProcCtx.getNumBuckets(), tezBucketJoinProcCtx.isSubQuery(), true);
+    } else {
+      // we are just converting to a common merge join operator. The shuffle
+      // join in map-reduce case.
+      int pos = 0; // it doesn't matter which position we use in this case.
+      convertJoinSMBJoin(joinOp, context, pos, 0, false, false);
+    }
+    return null;
+}
+
   // replaces the join operator with a new CommonJoinOperator, removes the
   // parent reduce sinks
   private void convertJoinSMBJoin(JoinOperator joinOp, OptimizeTezProcContext context,
@@ -630,7 +647,7 @@ public class ConvertJoinMapJoin implemen
           hasDynamicPartitionPruning = true;
           break;
         }
-      
+ 
         if (op instanceof ReduceSinkOperator || op instanceof FileSinkOperator) {
           // crossing reduce sink or file sink means the pruning isn't for this parent.
           break;

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/MergeJoinProc.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/MergeJoinProc.java?rev=1633988&r1=1633987&r2=1633988&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/MergeJoinProc.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/MergeJoinProc.java Fri Oct 24 02:42:19 2014
@@ -1,7 +1,5 @@
 package org.apache.hadoop.hive.ql.optimizer;
 
-import java.util.HashMap;
-import java.util.Map;
 import java.util.Stack;
 
 import org.apache.hadoop.hive.ql.exec.CommonMergeJoinOperator;

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezWork.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezWork.java?rev=1633988&r1=1633987&r2=1633988&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezWork.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezWork.java Fri Oct 24 02:42:19 2014
@@ -123,6 +123,11 @@ public class GenTezWork implements NodeP
       context.rootToWorkMap.put(root, work);
     }
 
+    // this is where we set the sort columns that we will be using for KeyValueInputMerge
+    if (operator instanceof DummyStoreOperator) {
+      work.addSortCols(root.getOpTraits().getSortCols().get(0));
+    }
+
     if (!context.childToWorkMap.containsKey(operator)) {
       List<BaseWork> workItems = new LinkedList<BaseWork>();
       workItems.add(work);
@@ -148,6 +153,7 @@ public class GenTezWork implements NodeP
         context.opMergeJoinWorkMap.put(context.currentMergeJoinOperator, mergeJoinWork);
       }
       // connect the work correctly.
+      work.addSortCols(root.getOpTraits().getSortCols().get(0));
       mergeJoinWork.addMergedWork(work, null);
       Operator<? extends OperatorDesc> parentOp =
           getParentFromStack(context.currentMergeJoinOperator, stack);

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/BaseWork.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/BaseWork.java?rev=1633988&r1=1633987&r2=1633988&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/BaseWork.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/BaseWork.java Fri Oct 24 02:42:19 2014
@@ -18,6 +18,7 @@
 
 package org.apache.hadoop.hive.ql.plan;
 
+import java.util.ArrayList;
 import java.util.LinkedList;
 import java.util.LinkedHashSet;
 import java.util.List;
@@ -42,6 +43,7 @@ public abstract class BaseWork extends A
   // schema info.
   List<HashTableDummyOperator> dummyOps;
   int tag;
+  private final List<String> sortColNames = new ArrayList<String>();
 
   public BaseWork() {}
 
@@ -148,4 +150,12 @@ public abstract class BaseWork extends A
   public int getTag() {
     return tag;
   }
+
+  public void addSortCols(List<String> sortCols) {
+    this.sortColNames.addAll(sortCols);
+  }
+
+  public List<String> getSortCols() {
+    return sortColNames;
+  }
 }



Mime
View raw message