hive-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gunt...@apache.org
Subject svn commit: r1627235 [3/9] - in /hive/trunk: itests/src/test/resources/ itests/util/src/main/java/org/apache/hadoop/hive/ql/ metastore/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/ ql/if/ ql/src/gen/thrift/gen-cpp/ ql/src/gen/thrift...
Date Wed, 24 Sep 2014 07:03:38 GMT
Added: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/tools/KeyValuesInputMerger.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/tools/KeyValuesInputMerger.java?rev=1627235&view=auto
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/tools/KeyValuesInputMerger.java (added)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/tools/KeyValuesInputMerger.java Wed Sep 24 07:03:35 2014
@@ -0,0 +1,208 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.exec.tez.tools;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.List;
+import java.util.PriorityQueue;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.io.BinaryComparable;
+import org.apache.tez.runtime.api.Input;
+import org.apache.tez.runtime.library.api.KeyValuesReader;
+
+/**
+ * A KeyValuesReader implementation that returns a sorted stream of key-values
+ * by doing a sorted merge of the key-value in LogicalInputs.
+ * Tags are in the last byte of the key, so no special handling for tags is required.
+ * Uses a priority queue to pick the KeyValuesReader of the input that is next in
+ * sort order.
+ */
+public class KeyValuesInputMerger extends KeyValuesReader {
+
+  private class KeyValuesIterable implements Iterable<Object> {
+
+    KeyValuesIterator currentIterator = null;
+
+    KeyValuesIterable(int size) {
+      currentIterator = new KeyValuesIterator(size);
+    }
+
+    @Override
+    public Iterator<Object> iterator() {
+      return currentIterator;
+    }
+
+    public void init(List<KeyValuesReader> readerList) {
+      currentIterator.init(readerList);
+    }
+  }
+
+  private class KeyValuesIterator implements Iterator<Object> {
+    KeyValuesReader[] readerArray = null;
+    Iterator<Object> currentIterator = null;
+    int currentIndex = 0;
+    int loadedSize = 0;
+
+    KeyValuesIterator(int size) {
+      readerArray = new KeyValuesReader[size];
+    }
+
+    public void init(List<KeyValuesReader> readerList) {
+      for (int i = 0; i < readerList.size(); i++) {
+        readerArray[i] = null;
+      }
+      loadedSize = 0;
+      for (KeyValuesReader kvsReader : readerList) {
+        readerArray[loadedSize] = kvsReader;
+        loadedSize++;
+      }
+      currentIterator = null;
+      currentIndex = 0;
+    }
+
+    @Override
+    public boolean hasNext() {
+      if ((currentIterator == null) || (currentIterator.hasNext() == false)) {
+        if (currentIndex == loadedSize) {
+          return false;
+        }
+
+        try {
+          if (readerArray[currentIndex] == null) {
+            return false;
+          }
+          currentIterator = readerArray[currentIndex].getCurrentValues().iterator();
+          currentIndex++;
+          return currentIterator.hasNext();
+        } catch (IOException e) {
+          return false;
+        }
+      }
+
+      return true;
+    }
+
+    @Override
+    public Object next() {
+      l4j.info("next called on " + currentIterator);
+      return currentIterator.next();
+    }
+
+    @Override
+    public void remove() {
+      // nothing to do
+    }
+  }
+
+  public static final Log l4j = LogFactory.getLog(KeyValuesInputMerger.class);
+  private PriorityQueue<KeyValuesReader> pQueue = null;
+  private final List<KeyValuesReader> nextKVReaders = new ArrayList<KeyValuesReader>();
+  KeyValuesIterable kvsIterable = null;
+
+  public KeyValuesInputMerger(List<? extends Input> shuffleInputs) throws Exception {
+    //get KeyValuesReaders from the LogicalInput and add them to priority queue
+    int initialCapacity = shuffleInputs.size();
+    kvsIterable = new KeyValuesIterable(initialCapacity);
+    pQueue = new PriorityQueue<KeyValuesReader>(initialCapacity, new KVReaderComparator());
+    for(Input input : shuffleInputs){
+      addToQueue((KeyValuesReader)input.getReader());
+    }
+  }
+
+  /**
+   * Add KeyValuesReader to queue if it has more key-values
+   * @param kvsReadr
+   * @throws IOException
+   */
+  private void addToQueue(KeyValuesReader kvsReadr) throws IOException{
+    if(kvsReadr.next()){
+      pQueue.add(kvsReadr);
+    }
+  }
+
+  /**
+   * @return true if there are more key-values and advances to next key-values
+   * @throws IOException
+   */
+  @Override
+  public boolean next() throws IOException {
+    //add the previous nextKVReader back to queue
+    if (!nextKVReaders.isEmpty()) {
+      for (KeyValuesReader kvReader : nextKVReaders) {
+        addToQueue(kvReader);
+      }
+      nextKVReaders.clear();
+    }
+
+    KeyValuesReader nextKVReader = null;
+    //get the new nextKVReader with lowest key
+    nextKVReader = pQueue.poll();
+    if (nextKVReader != null) {
+      nextKVReaders.add(nextKVReader);
+    }
+
+    while (pQueue.peek() != null) {
+      KeyValuesReader equalValueKVReader = pQueue.poll();
+      if (pQueue.comparator().compare(nextKVReader, equalValueKVReader) == 0) {
+        nextKVReaders.add(equalValueKVReader);
+      } else {
+        pQueue.add(equalValueKVReader);
+        break;
+      }
+    }
+    return !(nextKVReaders.isEmpty());
+  }
+
+  @Override
+  public Object getCurrentKey() throws IOException {
+    // return key from any of the readers
+    return nextKVReaders.get(0).getCurrentKey();
+  }
+
+  @Override
+  public Iterable<Object> getCurrentValues() throws IOException {
+    kvsIterable.init(nextKVReaders);
+    return kvsIterable;
+  }
+
+  /**
+   * Comparator that compares KeyValuesReader on their current key
+   */
+  class KVReaderComparator implements Comparator<KeyValuesReader> {
+
+    @Override
+    public int compare(KeyValuesReader kvReadr1, KeyValuesReader kvReadr2) {
+      try {
+        BinaryComparable key1 = (BinaryComparable) kvReadr1.getCurrentKey();
+        BinaryComparable key2 = (BinaryComparable) kvReadr2.getCurrentKey();
+        return key1.compareTo(key2);
+      } catch (IOException e) {
+        l4j.error("Caught exception while reading shuffle input", e);
+        //die!
+        throw new RuntimeException(e);
+      }
+    }
+  }
+
+
+}

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/tools/TezMergedLogicalInput.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/tools/TezMergedLogicalInput.java?rev=1627235&r1=1627234&r2=1627235&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/tools/TezMergedLogicalInput.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/tools/TezMergedLogicalInput.java Wed Sep 24 07:03:35 2014
@@ -40,7 +40,7 @@ public class TezMergedLogicalInput exten
  
   @Override
   public Reader getReader() throws Exception {
-    return new InputMerger(getInputs());
+    return new KeyValuesInputMerger(getInputs());
   }
 
   @Override

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveContextAwareRecordReader.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveContextAwareRecordReader.java?rev=1627235&r1=1627234&r2=1627235&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveContextAwareRecordReader.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveContextAwareRecordReader.java Wed Sep 24 07:03:35 2014
@@ -161,10 +161,11 @@ public abstract class HiveContextAwareRe
   }
 
   public IOContext getIOContext() {
-    return IOContext.get();
+    return IOContext.get(jobConf.get(Utilities.INPUT_NAME));
   }
 
-  public void initIOContext(long startPos, boolean isBlockPointer, Path inputPath) {
+  private void initIOContext(long startPos, boolean isBlockPointer,
+      Path inputPath) {
     ioCxtRef = this.getIOContext();
     ioCxtRef.currentBlockStart = startPos;
     ioCxtRef.isBlockPointer = isBlockPointer;
@@ -183,7 +184,7 @@ public abstract class HiveContextAwareRe
 
     boolean blockPointer = false;
     long blockStart = -1;
-    FileSplit fileSplit = (FileSplit) split;
+    FileSplit fileSplit = split;
     Path path = fileSplit.getPath();
     FileSystem fs = path.getFileSystem(job);
     if (inputFormatClass.getName().contains("SequenceFile")) {
@@ -202,12 +203,15 @@ public abstract class HiveContextAwareRe
       blockStart = in.getPosition();
       in.close();
     }
+    this.jobConf = job;
     this.initIOContext(blockStart, blockPointer, path.makeQualified(fs));
 
     this.initIOContextSortedProps(split, recordReader, job);
   }
 
   public void initIOContextSortedProps(FileSplit split, RecordReader recordReader, JobConf job) {
+    this.jobConf = job;
+
     this.getIOContext().resetSortingValues();
     this.isSorted = jobConf.getBoolean("hive.input.format.sorted", false);
 

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java?rev=1627235&r1=1627234&r2=1627235&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java Wed Sep 24 07:03:35 2014
@@ -45,6 +45,7 @@ import org.apache.hadoop.hive.ql.exec.Ut
 import org.apache.hadoop.hive.ql.log.PerfLogger;
 import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc;
 import org.apache.hadoop.hive.ql.plan.MapWork;
+import org.apache.hadoop.hive.ql.plan.MergeJoinWork;
 import org.apache.hadoop.hive.ql.plan.OperatorDesc;
 import org.apache.hadoop.hive.ql.plan.PartitionDesc;
 import org.apache.hadoop.hive.ql.plan.TableScanDesc;
@@ -253,7 +254,14 @@ public class HiveInputFormat<K extends W
   }
 
   protected void init(JobConf job) {
-    mrwork = Utilities.getMapWork(job);
+    if (HiveConf.getVar(job, HiveConf.ConfVars.HIVE_EXECUTION_ENGINE).equals("tez")) {
+      mrwork = (MapWork) Utilities.getMergeWork(job);
+      if (mrwork == null) {
+        mrwork = Utilities.getMapWork(job);
+      }
+    } else {
+      mrwork = Utilities.getMapWork(job);
+    }
     pathToPartitionInfo = mrwork.getPathToPartitionInfo();
   }
 

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/IOContext.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/IOContext.java?rev=1627235&r1=1627234&r2=1627235&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/IOContext.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/IOContext.java Wed Sep 24 07:03:35 2014
@@ -18,7 +18,13 @@
 
 package org.apache.hadoop.hive.ql.io;
 
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.optimizer.ConvertJoinMapJoin;
 import org.apache.hadoop.hive.ql.session.SessionState;
 
 
@@ -32,20 +38,25 @@ import org.apache.hadoop.hive.ql.session
  */
 public class IOContext {
 
-
   private static ThreadLocal<IOContext> threadLocal = new ThreadLocal<IOContext>(){
     @Override
     protected synchronized IOContext initialValue() { return new IOContext(); }
  };
 
- private static IOContext ioContext = new IOContext();
+  private static Map<String, IOContext> inputNameIOContextMap = new HashMap<String, IOContext>();
+  private static IOContext ioContext = new IOContext();
 
-  public static IOContext get() {
-    if (SessionState.get() == null) {
-      // this happens on the backend. only one io context needed.
-      return ioContext;
+  public static Map<String, IOContext> getMap() {
+    return inputNameIOContextMap;
+  }
+
+  public static IOContext get(String inputName) {
+    if (inputNameIOContextMap.containsKey(inputName) == false) {
+      IOContext ioContext = new IOContext();
+      inputNameIOContextMap.put(inputName, ioContext);
     }
-    return IOContext.threadLocal.get();
+
+    return inputNameIOContextMap.get(inputName);
   }
 
   public static void clear() {

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java?rev=1627235&r1=1627234&r2=1627235&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java Wed Sep 24 07:03:35 2014
@@ -18,6 +18,7 @@
 
 package org.apache.hadoop.hive.ql.optimizer;
 
+import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
@@ -29,12 +30,16 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.ql.exec.AppMasterEventOperator;
+import org.apache.hadoop.hive.ql.exec.CommonMergeJoinOperator;
+import org.apache.hadoop.hive.ql.exec.DummyStoreOperator;
 import org.apache.hadoop.hive.ql.exec.GroupByOperator;
 import org.apache.hadoop.hive.ql.exec.JoinOperator;
 import org.apache.hadoop.hive.ql.exec.MapJoinOperator;
 import org.apache.hadoop.hive.ql.exec.MuxOperator;
 import org.apache.hadoop.hive.ql.exec.Operator;
+import org.apache.hadoop.hive.ql.exec.OperatorFactory;
 import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator;
+import org.apache.hadoop.hive.ql.exec.TezDummyStoreOperator;
 import org.apache.hadoop.hive.ql.lib.Node;
 import org.apache.hadoop.hive.ql.lib.NodeProcessor;
 import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx;
@@ -42,12 +47,16 @@ import org.apache.hadoop.hive.ql.parse.O
 import org.apache.hadoop.hive.ql.parse.ParseContext;
 import org.apache.hadoop.hive.ql.parse.SemanticException;
 import org.apache.hadoop.hive.ql.plan.DynamicPruningEventDesc;
+import org.apache.hadoop.hive.ql.plan.CommonMergeJoinDesc;
 import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc;
 import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
+import org.apache.hadoop.hive.ql.plan.JoinCondDesc;
+import org.apache.hadoop.hive.ql.plan.JoinDesc;
 import org.apache.hadoop.hive.ql.plan.MapJoinDesc;
 import org.apache.hadoop.hive.ql.plan.OpTraits;
 import org.apache.hadoop.hive.ql.plan.OperatorDesc;
 import org.apache.hadoop.hive.ql.plan.Statistics;
+import org.apache.hadoop.util.ReflectionUtils;
 
 /**
  * ConvertJoinMapJoin is an optimization that replaces a common join
@@ -60,39 +69,46 @@ public class ConvertJoinMapJoin implemen
 
   static final private Log LOG = LogFactory.getLog(ConvertJoinMapJoin.class.getName());
 
+  @SuppressWarnings("unchecked")
   @Override
-    /*
-     * (non-Javadoc)
-     * we should ideally not modify the tree we traverse.
-     * However, since we need to walk the tree at any time when we modify the
-     * operator, we might as well do it here.
-     */
-    public Object process(Node nd, Stack<Node> stack,
-        NodeProcessorCtx procCtx, Object... nodeOutputs)
-    throws SemanticException {
+  /*
+   * (non-Javadoc) we should ideally not modify the tree we traverse. However,
+   * since we need to walk the tree at any time when we modify the operator, we
+   * might as well do it here.
+   */
+  public Object
+      process(Node nd, Stack<Node> stack, NodeProcessorCtx procCtx, Object... nodeOutputs)
+          throws SemanticException {
 
     OptimizeTezProcContext context = (OptimizeTezProcContext) procCtx;
 
-    if (!context.conf.getBoolVar(HiveConf.ConfVars.HIVECONVERTJOIN)) {
+    JoinOperator joinOp = (JoinOperator) nd;
+
+    if (!context.conf.getBoolVar(HiveConf.ConfVars.HIVECONVERTJOIN)
+        && !(context.conf.getBoolVar(HiveConf.ConfVars.HIVE_AUTO_SORTMERGE_JOIN))) {
+      // we are just converting to a common merge join operator. The shuffle
+      // join in map-reduce case.
+      int pos = 0; // it doesn't matter which position we use in this case.
+      convertJoinSMBJoin(joinOp, context, pos, 0, false, false);
       return null;
     }
 
-    JoinOperator joinOp = (JoinOperator) nd;
-    // if we have traits, and table info is present in the traits, we know the 
+    // if we have traits, and table info is present in the traits, we know the
     // exact number of buckets. Else choose the largest number of estimated
     // reducers from the parent operators.
     int numBuckets = -1;
     int estimatedBuckets = -1;
+    TezBucketJoinProcCtx tezBucketJoinProcCtx = new TezBucketJoinProcCtx(context.conf);
     if (context.conf.getBoolVar(HiveConf.ConfVars.HIVE_CONVERT_JOIN_BUCKET_MAPJOIN_TEZ)) {
       for (Operator<? extends OperatorDesc>parentOp : joinOp.getParentOperators()) {
         if (parentOp.getOpTraits().getNumBuckets() > 0) {
-          numBuckets = (numBuckets < parentOp.getOpTraits().getNumBuckets()) ? 
-              parentOp.getOpTraits().getNumBuckets() : numBuckets; 
+          numBuckets = (numBuckets < parentOp.getOpTraits().getNumBuckets()) ?
+              parentOp.getOpTraits().getNumBuckets() : numBuckets;
         }
 
         if (parentOp instanceof ReduceSinkOperator) {
           ReduceSinkOperator rs = (ReduceSinkOperator)parentOp;
-          estimatedBuckets = (estimatedBuckets < rs.getConf().getNumReducers()) ? 
+          estimatedBuckets = (estimatedBuckets < rs.getConf().getNumReducers()) ?
               rs.getConf().getNumReducers() : estimatedBuckets;
         }
       }
@@ -107,29 +123,80 @@ public class ConvertJoinMapJoin implemen
       numBuckets = 1;
     }
     LOG.info("Estimated number of buckets " + numBuckets);
-    int mapJoinConversionPos = mapJoinConversionPos(joinOp, context, numBuckets);
+    int mapJoinConversionPos = getMapJoinConversionPos(joinOp, context, numBuckets);
     if (mapJoinConversionPos < 0) {
-      // we cannot convert to bucket map join, we cannot convert to 
-      // map join either based on the size
+      // we cannot convert to bucket map join, we cannot convert to
+      // map join either based on the size. Check if we can convert to SMB join.
+      if (context.conf.getBoolVar(HiveConf.ConfVars.HIVE_AUTO_SORTMERGE_JOIN) == false) {
+        convertJoinSMBJoin(joinOp, context, 0, 0, false, false);
+        return null;
+      }
+      Class<? extends BigTableSelectorForAutoSMJ> bigTableMatcherClass = null;
+      try {
+        bigTableMatcherClass =
+            (Class<? extends BigTableSelectorForAutoSMJ>) (Class.forName(HiveConf.getVar(
+                context.parseContext.getConf(),
+                HiveConf.ConfVars.HIVE_AUTO_SORTMERGE_JOIN_BIGTABLE_SELECTOR)));
+      } catch (ClassNotFoundException e) {
+        throw new SemanticException(e.getMessage());
+      }
+
+      BigTableSelectorForAutoSMJ bigTableMatcher =
+          ReflectionUtils.newInstance(bigTableMatcherClass, null);
+      JoinDesc joinDesc = joinOp.getConf();
+      JoinCondDesc[] joinCondns = joinDesc.getConds();
+      Set<Integer> joinCandidates = MapJoinProcessor.getBigTableCandidates(joinCondns);
+      if (joinCandidates.isEmpty()) {
+        // This is a full outer join. This can never be a map-join
+        // of any type. So return false.
+        return false;
+      }
+      mapJoinConversionPos =
+          bigTableMatcher.getBigTablePosition(context.parseContext, joinOp, joinCandidates);
+      if (mapJoinConversionPos < 0) {
+        // contains aliases from sub-query
+        // we are just converting to a common merge join operator. The shuffle
+        // join in map-reduce case.
+        int pos = 0; // it doesn't matter which position we use in this case.
+        convertJoinSMBJoin(joinOp, context, pos, 0, false, false);
+        return null;
+      }
+
+      if (checkConvertJoinSMBJoin(joinOp, context, mapJoinConversionPos, tezBucketJoinProcCtx)) {
+        convertJoinSMBJoin(joinOp, context, mapJoinConversionPos,
+            tezBucketJoinProcCtx.getNumBuckets(), tezBucketJoinProcCtx.isSubQuery(), true);
+      } else {
+        // we are just converting to a common merge join operator. The shuffle
+        // join in map-reduce case.
+        int pos = 0; // it doesn't matter which position we use in this case.
+        convertJoinSMBJoin(joinOp, context, pos, 0, false, false);
+      }
       return null;
     }
 
-    if (context.conf.getBoolVar(HiveConf.ConfVars.HIVE_CONVERT_JOIN_BUCKET_MAPJOIN_TEZ)) {
-      if (convertJoinBucketMapJoin(joinOp, context, mapJoinConversionPos)) {
-        return null;
+    if (numBuckets > 1) {
+      if (context.conf.getBoolVar(HiveConf.ConfVars.HIVE_CONVERT_JOIN_BUCKET_MAPJOIN_TEZ)) {
+        if (convertJoinBucketMapJoin(joinOp, context, mapJoinConversionPos, tezBucketJoinProcCtx)) {
+          return null;
+        }
       }
     }
 
     LOG.info("Convert to non-bucketed map join");
     // check if we can convert to map join no bucket scaling.
-    mapJoinConversionPos = mapJoinConversionPos(joinOp, context, 1);
+    mapJoinConversionPos = getMapJoinConversionPos(joinOp, context, 1);
     if (mapJoinConversionPos < 0) {
+      // we are just converting to a common merge join operator. The shuffle
+      // join in map-reduce case.
+      int pos = 0; // it doesn't matter which position we use in this case.
+      convertJoinSMBJoin(joinOp, context, pos, 0, false, false);
       return null;
     }
 
     MapJoinOperator mapJoinOp = convertJoinMapJoin(joinOp, context, mapJoinConversionPos);
     // map join operator by default has no bucket cols
-    mapJoinOp.setOpTraits(new OpTraits(null, -1));
+    mapJoinOp.setOpTraits(new OpTraits(null, -1, null));
+    mapJoinOp.setStatistics(joinOp.getStatistics());
     // propagate this change till the next RS
     for (Operator<? extends OperatorDesc> childOp : mapJoinOp.getChildOperators()) {
       setAllChildrenTraitsToNull(childOp);
@@ -138,11 +205,107 @@ public class ConvertJoinMapJoin implemen
     return null;
   }
 
+  // replaces the join operator with a new CommonJoinOperator, removes the
+  // parent reduce sinks
+  private void convertJoinSMBJoin(JoinOperator joinOp, OptimizeTezProcContext context,
+      int mapJoinConversionPos, int numBuckets, boolean isSubQuery, boolean adjustParentsChildren)
+      throws SemanticException {
+    ParseContext parseContext = context.parseContext;
+    MapJoinDesc mapJoinDesc = null;
+    if (adjustParentsChildren) {
+        mapJoinDesc = MapJoinProcessor.getMapJoinDesc(context.conf, parseContext.getOpParseCtx(),
+            joinOp, parseContext.getJoinContext().get(joinOp), mapJoinConversionPos, true);
+    } else {
+      JoinDesc joinDesc = joinOp.getConf();
+      // retain the original join desc in the map join.
+      mapJoinDesc =
+          new MapJoinDesc(null, null, joinDesc.getExprs(), null, null,
+              joinDesc.getOutputColumnNames(), mapJoinConversionPos, joinDesc.getConds(),
+              joinDesc.getFilters(), joinDesc.getNoOuterJoin(), null);
+    }
+
+    @SuppressWarnings("unchecked")
+    CommonMergeJoinOperator mergeJoinOp =
+        (CommonMergeJoinOperator) OperatorFactory.get(new CommonMergeJoinDesc(numBuckets,
+            isSubQuery, mapJoinConversionPos, mapJoinDesc));
+    OpTraits opTraits =
+        new OpTraits(joinOp.getOpTraits().getBucketColNames(), numBuckets, joinOp.getOpTraits()
+            .getSortCols());
+    mergeJoinOp.setOpTraits(opTraits);
+    mergeJoinOp.setStatistics(joinOp.getStatistics());
+
+    for (Operator<? extends OperatorDesc> parentOp : joinOp.getParentOperators()) {
+      int pos = parentOp.getChildOperators().indexOf(joinOp);
+      parentOp.getChildOperators().remove(pos);
+      parentOp.getChildOperators().add(pos, mergeJoinOp);
+    }
+
+    for (Operator<? extends OperatorDesc> childOp : joinOp.getChildOperators()) {
+      int pos = childOp.getParentOperators().indexOf(joinOp);
+      childOp.getParentOperators().remove(pos);
+      childOp.getParentOperators().add(pos, mergeJoinOp);
+    }
+
+    List<Operator<? extends OperatorDesc>> childOperators = mergeJoinOp.getChildOperators();
+    if (childOperators == null) {
+      childOperators = new ArrayList<Operator<? extends OperatorDesc>>();
+      mergeJoinOp.setChildOperators(childOperators);
+    }
+
+    List<Operator<? extends OperatorDesc>> parentOperators = mergeJoinOp.getParentOperators();
+    if (parentOperators == null) {
+      parentOperators = new ArrayList<Operator<? extends OperatorDesc>>();
+      mergeJoinOp.setParentOperators(parentOperators);
+    }
+
+    childOperators.clear();
+    parentOperators.clear();
+    childOperators.addAll(joinOp.getChildOperators());
+    parentOperators.addAll(joinOp.getParentOperators());
+    mergeJoinOp.getConf().setGenJoinKeys(false);
+
+    if (adjustParentsChildren) {
+      mergeJoinOp.getConf().setGenJoinKeys(true);
+      List<Operator<? extends OperatorDesc>> newParentOpList =
+          new ArrayList<Operator<? extends OperatorDesc>>();
+      for (Operator<? extends OperatorDesc> parentOp : mergeJoinOp.getParentOperators()) {
+        for (Operator<? extends OperatorDesc> grandParentOp : parentOp.getParentOperators()) {
+          grandParentOp.getChildOperators().remove(parentOp);
+          grandParentOp.getChildOperators().add(mergeJoinOp);
+          newParentOpList.add(grandParentOp);
+        }
+      }
+      mergeJoinOp.getParentOperators().clear();
+      mergeJoinOp.getParentOperators().addAll(newParentOpList);
+      List<Operator<? extends OperatorDesc>> parentOps =
+          new ArrayList<Operator<? extends OperatorDesc>>(mergeJoinOp.getParentOperators());
+      for (Operator<? extends OperatorDesc> parentOp : parentOps) {
+        int parentIndex = mergeJoinOp.getParentOperators().indexOf(parentOp);
+        if (parentIndex == mapJoinConversionPos) {
+          continue;
+        }
+
+        // insert the dummy store operator here
+        DummyStoreOperator dummyStoreOp = new TezDummyStoreOperator();
+        dummyStoreOp.setParentOperators(new ArrayList<Operator<? extends OperatorDesc>>());
+        dummyStoreOp.setChildOperators(new ArrayList<Operator<? extends OperatorDesc>>());
+        dummyStoreOp.getChildOperators().add(mergeJoinOp);
+        int index = parentOp.getChildOperators().indexOf(mergeJoinOp);
+        parentOp.getChildOperators().remove(index);
+        parentOp.getChildOperators().add(index, dummyStoreOp);
+        dummyStoreOp.getParentOperators().add(parentOp);
+        mergeJoinOp.getParentOperators().remove(parentIndex);
+        mergeJoinOp.getParentOperators().add(parentIndex, dummyStoreOp);
+      }
+    }
+    mergeJoinOp.cloneOriginalParentsList(mergeJoinOp.getParentOperators());
+  }
+
   private void setAllChildrenTraitsToNull(Operator<? extends OperatorDesc> currentOp) {
     if (currentOp instanceof ReduceSinkOperator) {
       return;
     }
-    currentOp.setOpTraits(new OpTraits(null, -1));
+    currentOp.setOpTraits(new OpTraits(null, -1, null));
     for (Operator<? extends OperatorDesc> childOp : currentOp.getChildOperators()) {
       if ((childOp instanceof ReduceSinkOperator) || (childOp instanceof GroupByOperator)) {
         break;
@@ -151,28 +314,26 @@ public class ConvertJoinMapJoin implemen
     }
   }
 
-  private boolean convertJoinBucketMapJoin(JoinOperator joinOp, OptimizeTezProcContext context, 
-      int bigTablePosition) throws SemanticException {
-
-    TezBucketJoinProcCtx tezBucketJoinProcCtx = new TezBucketJoinProcCtx(context.conf);
+  private boolean convertJoinBucketMapJoin(JoinOperator joinOp, OptimizeTezProcContext context,
+      int bigTablePosition, TezBucketJoinProcCtx tezBucketJoinProcCtx) throws SemanticException {
 
     if (!checkConvertJoinBucketMapJoin(joinOp, context, bigTablePosition, tezBucketJoinProcCtx)) {
       LOG.info("Check conversion to bucket map join failed.");
       return false;
     }
 
-    MapJoinOperator mapJoinOp = 
-      convertJoinMapJoin(joinOp, context, bigTablePosition);
+    MapJoinOperator mapJoinOp = convertJoinMapJoin(joinOp, context, bigTablePosition);
     MapJoinDesc joinDesc = mapJoinOp.getConf();
     joinDesc.setBucketMapJoin(true);
 
     // we can set the traits for this join operator
     OpTraits opTraits = new OpTraits(joinOp.getOpTraits().getBucketColNames(),
-        tezBucketJoinProcCtx.getNumBuckets());
+        tezBucketJoinProcCtx.getNumBuckets(), null);
     mapJoinOp.setOpTraits(opTraits);
+    mapJoinOp.setStatistics(joinOp.getStatistics());
     setNumberOfBucketsOnChildren(mapJoinOp);
 
-    // Once the conversion is done, we can set the partitioner to bucket cols on the small table    
+    // Once the conversion is done, we can set the partitioner to bucket cols on the small table
     Map<String, Integer> bigTableBucketNumMapping = new HashMap<String, Integer>();
     bigTableBucketNumMapping.put(joinDesc.getBigTableAlias(), tezBucketJoinProcCtx.getNumBuckets());
     joinDesc.setBigTableBucketNumMapping(bigTableBucketNumMapping);
@@ -182,6 +343,54 @@ public class ConvertJoinMapJoin implemen
     return true;
   }
 
+  /*
+   * This method tries to convert a join to an SMB. This is done based on
+   * traits. If the sorted by columns are the same as the join columns then, we
+   * can convert the join to an SMB. Otherwise retain the bucket map join as it
+   * is still more efficient than a regular join.
+   */
+  private boolean checkConvertJoinSMBJoin(JoinOperator joinOp, OptimizeTezProcContext context,
+      int bigTablePosition, TezBucketJoinProcCtx tezBucketJoinProcCtx) throws SemanticException {
+
+    ReduceSinkOperator bigTableRS =
+        (ReduceSinkOperator) joinOp.getParentOperators().get(bigTablePosition);
+    int numBuckets = bigTableRS.getParentOperators().get(0).getOpTraits()
+            .getNumBuckets();
+
+    // the sort and bucket cols have to match on both sides for this
+    // transformation of the join operation
+    for (Operator<? extends OperatorDesc> parentOp : joinOp.getParentOperators()) {
+      if (!(parentOp instanceof ReduceSinkOperator)) {
+        // could be mux/demux operators. Currently not supported
+        LOG.info("Found correlation optimizer operators. Cannot convert to SMB at this time.");
+        return false;
+      }
+      ReduceSinkOperator rsOp = (ReduceSinkOperator) parentOp;
+      if (checkColEquality(rsOp.getParentOperators().get(0).getOpTraits().getSortCols(), rsOp
+          .getOpTraits().getSortCols(), rsOp.getColumnExprMap(), tezBucketJoinProcCtx) == false) {
+        LOG.info("We cannot convert to SMB because the sort column names do not match.");
+        return false;
+      }
+
+      if (checkColEquality(rsOp.getParentOperators().get(0).getOpTraits().getBucketColNames(), rsOp
+          .getOpTraits().getBucketColNames(), rsOp.getColumnExprMap(), tezBucketJoinProcCtx)
+          == false) {
+        LOG.info("We cannot convert to SMB because bucket column names do not match.");
+        return false;
+      }
+    }
+
+    boolean isSubQuery = false;
+    if (numBuckets < 0) {
+      isSubQuery = true;
+      numBuckets = bigTableRS.getConf().getNumReducers();
+    }
+    tezBucketJoinProcCtx.setNumBuckets(numBuckets);
+    tezBucketJoinProcCtx.setIsSubQuery(isSubQuery);
+    LOG.info("We can convert the join to an SMB join.");
+    return true;
+  }
+
   private void setNumberOfBucketsOnChildren(Operator<? extends OperatorDesc> currentOp) {
     int numBuckets = currentOp.getOpTraits().getNumBuckets();
     for (Operator<? extends OperatorDesc>op : currentOp.getChildOperators()) {
@@ -193,15 +402,13 @@ public class ConvertJoinMapJoin implemen
   }
 
   /*
-   *  We perform the following checks to see if we can convert to a bucket map join
-   *  1. If the parent reduce sink of the big table side has the same emit key cols as 
-   *  its parent, we can create a bucket map join eliminating the reduce sink.
-   *  2. If we have the table information, we can check the same way as in Mapreduce to 
-   *  determine if we can perform a Bucket Map Join.
+   * If the parent reduce sink of the big table side has the same emit key cols
+   * as its parent, we can create a bucket map join eliminating the reduce sink.
    */
-  private boolean checkConvertJoinBucketMapJoin(JoinOperator joinOp, 
-      OptimizeTezProcContext context, int bigTablePosition, 
-      TezBucketJoinProcCtx tezBucketJoinProcCtx) throws SemanticException {
+  private boolean checkConvertJoinBucketMapJoin(JoinOperator joinOp,
+      OptimizeTezProcContext context, int bigTablePosition,
+      TezBucketJoinProcCtx tezBucketJoinProcCtx)
+  throws SemanticException {
     // bail on mux-operator because mux operator masks the emit keys of the
     // constituent reduce sinks
     if (!(joinOp.getParentOperators().get(0) instanceof ReduceSinkOperator)) {
@@ -211,14 +418,41 @@ public class ConvertJoinMapJoin implemen
     }
 
     ReduceSinkOperator rs = (ReduceSinkOperator) joinOp.getParentOperators().get(bigTablePosition);
+    List<List<String>> parentColNames = rs.getOpTraits().getBucketColNames();
+    Operator<? extends OperatorDesc> parentOfParent = rs.getParentOperators().get(0);
+    List<List<String>> grandParentColNames = parentOfParent.getOpTraits().getBucketColNames();
+    int numBuckets = parentOfParent.getOpTraits().getNumBuckets();
+    // all keys matched.
+    if (checkColEquality(grandParentColNames, parentColNames, rs.getColumnExprMap(),
+        tezBucketJoinProcCtx) == false) {
+      LOG.info("No info available to check for bucket map join. Cannot convert");
+      return false;
+    }
+
     /*
      * this is the case when the big table is a sub-query and is probably
-     * already bucketed by the join column in say a group by operation 
+     * already bucketed by the join column in say a group by operation
      */
-    List<List<String>> colNames = rs.getParentOperators().get(0).getOpTraits().getBucketColNames();
-    if ((colNames != null) && (colNames.isEmpty() == false)) {
-      Operator<? extends OperatorDesc>parentOfParent = rs.getParentOperators().get(0);
-      for (List<String>listBucketCols : parentOfParent.getOpTraits().getBucketColNames()) {
+    boolean isSubQuery = false;
+    if (numBuckets < 0) {
+      isSubQuery = true;
+      numBuckets = rs.getConf().getNumReducers();
+    }
+    tezBucketJoinProcCtx.setNumBuckets(numBuckets);
+    tezBucketJoinProcCtx.setIsSubQuery(isSubQuery);
+    return true;
+  }
+
+  private boolean checkColEquality(List<List<String>> grandParentColNames,
+      List<List<String>> parentColNames, Map<String, ExprNodeDesc> colExprMap,
+      TezBucketJoinProcCtx tezBucketJoinProcCtx) {
+
+    if ((grandParentColNames == null) || (parentColNames == null)) {
+      return false;
+    }
+
+    if ((parentColNames != null) && (parentColNames.isEmpty() == false)) {
+      for (List<String> listBucketCols : grandParentColNames) {
         // can happen if this operator does not carry forward the previous bucketing columns
         // for e.g. another join operator which does not carry one of the sides' key columns
         if (listBucketCols.isEmpty()) {
@@ -226,9 +460,9 @@ public class ConvertJoinMapJoin implemen
         }
         int colCount = 0;
         // parent op is guaranteed to have a single list because it is a reduce sink
-        for (String colName : rs.getOpTraits().getBucketColNames().get(0)) {
+        for (String colName : parentColNames.get(0)) {
           // all columns need to be at least a subset of the parentOfParent's bucket cols
-          ExprNodeDesc exprNodeDesc = rs.getColumnExprMap().get(colName);
+          ExprNodeDesc exprNodeDesc = colExprMap.get(colName);
           if (exprNodeDesc instanceof ExprNodeColumnDesc) {
             if (((ExprNodeColumnDesc)exprNodeDesc).getColumn().equals(listBucketCols.get(colCount))) {
               colCount++;
@@ -236,32 +470,21 @@ public class ConvertJoinMapJoin implemen
               break;
             }
           }
-          
-          if (colCount == rs.getOpTraits().getBucketColNames().get(0).size()) {
-            // all keys matched.
-            int numBuckets = parentOfParent.getOpTraits().getNumBuckets();
-            boolean isSubQuery = false;
-            if (numBuckets < 0) {
-              isSubQuery = true;
-              numBuckets = rs.getConf().getNumReducers();
-            }
-            tezBucketJoinProcCtx.setNumBuckets(numBuckets);
-            tezBucketJoinProcCtx.setIsSubQuery(isSubQuery);
+
+          if (colCount == parentColNames.get(0).size()) {
             return true;
           }
         }
       }
       return false;
     }
-
-    LOG.info("No info available to check for bucket map join. Cannot convert");
     return false;
   }
 
-  public int mapJoinConversionPos(JoinOperator joinOp, OptimizeTezProcContext context, 
+  public int getMapJoinConversionPos(JoinOperator joinOp, OptimizeTezProcContext context,
       int buckets) {
-    Set<Integer> bigTableCandidateSet = MapJoinProcessor.
-      getBigTableCandidates(joinOp.getConf().getConds());
+    Set<Integer> bigTableCandidateSet =
+        MapJoinProcessor.getBigTableCandidates(joinOp.getConf().getConds());
 
     long maxSize = context.conf.getLongVar(
         HiveConf.ConfVars.HIVECONVERTJOINNOCONDITIONALTASKTHRESHOLD);
@@ -287,7 +510,7 @@ public class ConvertJoinMapJoin implemen
       long inputSize = currInputStat.getDataSize();
       if ((bigInputStat == null) ||
           ((bigInputStat != null) &&
-           (inputSize > bigInputStat.getDataSize()))) {
+          (inputSize > bigInputStat.getDataSize()))) {
 
         if (bigTableFound) {
           // cannot convert to map join; we've already chosen a big table
@@ -347,9 +570,9 @@ public class ConvertJoinMapJoin implemen
    * for tez.
    */
 
-  public MapJoinOperator convertJoinMapJoin(JoinOperator joinOp, OptimizeTezProcContext context, 
+  public MapJoinOperator convertJoinMapJoin(JoinOperator joinOp, OptimizeTezProcContext context,
       int bigTablePosition) throws SemanticException {
-    // bail on mux operator because currently the mux operator masks the emit keys 
+    // bail on mux operator because currently the mux operator masks the emit keys
     // of the constituent reduce sinks.
     for (Operator<? extends OperatorDesc> parentOp : joinOp.getParentOperators()) {
       if (parentOp instanceof MuxOperator) {
@@ -359,12 +582,12 @@ public class ConvertJoinMapJoin implemen
 
     //can safely convert the join to a map join.
     ParseContext parseContext = context.parseContext;
-    MapJoinOperator mapJoinOp = MapJoinProcessor.
-      convertJoinOpMapJoinOp(context.conf, parseContext.getOpParseCtx(),
-          joinOp, parseContext.getJoinContext().get(joinOp), bigTablePosition, true);
+    MapJoinOperator mapJoinOp =
+        MapJoinProcessor.convertJoinOpMapJoinOp(context.conf, parseContext.getOpParseCtx(), joinOp,
+            parseContext.getJoinContext().get(joinOp), bigTablePosition, true);
 
-    Operator<? extends OperatorDesc> parentBigTableOp
-      = mapJoinOp.getParentOperators().get(bigTablePosition);
+    Operator<? extends OperatorDesc> parentBigTableOp =
+        mapJoinOp.getParentOperators().get(bigTablePosition);
     if (parentBigTableOp instanceof ReduceSinkOperator) {
       for (Operator<?> p : parentBigTableOp.getParentOperators()) {
         // we might have generated a dynamic partition operator chain. Since
@@ -380,11 +603,10 @@ public class ConvertJoinMapJoin implemen
         }
       }
       mapJoinOp.getParentOperators().remove(bigTablePosition);
-      if (!(mapJoinOp.getParentOperators().contains(
-              parentBigTableOp.getParentOperators().get(0)))) {
+      if (!(mapJoinOp.getParentOperators().contains(parentBigTableOp.getParentOperators().get(0)))) {
         mapJoinOp.getParentOperators().add(bigTablePosition,
             parentBigTableOp.getParentOperators().get(0));
-              }
+      }
       parentBigTableOp.getParentOperators().get(0).removeChild(parentBigTableOp);
       for (Operator<? extends OperatorDesc> op : mapJoinOp.getParentOperators()) {
         if (!(op.getChildOperators().contains(mapJoinOp))) {

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinProcessor.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinProcessor.java?rev=1627235&r1=1627234&r2=1627235&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinProcessor.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinProcessor.java Wed Sep 24 07:03:35 2014
@@ -389,157 +389,8 @@ public class MapJoinProcessor implements
       JoinOperator op, QBJoinTree joinTree, int mapJoinPos, boolean noCheckOuterJoin)
       throws SemanticException {
 
-    JoinDesc desc = op.getConf();
-    JoinCondDesc[] condns = desc.getConds();
-    Byte[] tagOrder = desc.getTagOrder();
-
-    // outer join cannot be performed on a table which is being cached
-    if (!noCheckOuterJoin) {
-      if (checkMapJoin(mapJoinPos, condns) < 0) {
-        throw new SemanticException(ErrorMsg.NO_OUTER_MAPJOIN.getMsg());
-      }
-    }
-
-    // Walk over all the sources (which are guaranteed to be reduce sink
-    // operators).
-    // The join outputs a concatenation of all the inputs.
-    QBJoinTree leftSrc = joinTree.getJoinSrc();
-    List<ReduceSinkOperator> oldReduceSinkParentOps =
-        new ArrayList<ReduceSinkOperator>(op.getNumParent());
-    if (leftSrc != null) {
-      // assert mapJoinPos == 0;
-      Operator<? extends OperatorDesc> parentOp = op.getParentOperators().get(0);
-      assert parentOp.getParentOperators().size() == 1;
-      oldReduceSinkParentOps.add((ReduceSinkOperator) parentOp);
-    }
-
-
-    byte pos = 0;
-    for (String src : joinTree.getBaseSrc()) {
-      if (src != null) {
-        Operator<? extends OperatorDesc> parentOp = op.getParentOperators().get(pos);
-        assert parentOp.getParentOperators().size() == 1;
-        oldReduceSinkParentOps.add((ReduceSinkOperator) parentOp);
-      }
-      pos++;
-    }
-
-    Map<String, ExprNodeDesc> colExprMap = op.getColumnExprMap();
-    List<ColumnInfo> schema = new ArrayList<ColumnInfo>(op.getSchema().getSignature());
-    Map<Byte, List<ExprNodeDesc>> valueExprs = op.getConf().getExprs();
-    Map<Byte, List<ExprNodeDesc>> newValueExprs = new HashMap<Byte, List<ExprNodeDesc>>();
-    for (Map.Entry<Byte, List<ExprNodeDesc>> entry : valueExprs.entrySet()) {
-      byte tag = entry.getKey();
-      Operator<?> terminal = oldReduceSinkParentOps.get(tag);
-
-      List<ExprNodeDesc> values = entry.getValue();
-      List<ExprNodeDesc> newValues = ExprNodeDescUtils.backtrack(values, op, terminal);
-      newValueExprs.put(tag, newValues);
-      for (int i = 0; i < schema.size(); i++) {
-        ColumnInfo column = schema.get(i);
-        if (column == null) {
-          continue;
-        }
-        ExprNodeDesc expr = colExprMap.get(column.getInternalName());
-        int index = ExprNodeDescUtils.indexOf(expr, values);
-        if (index >= 0) {
-          colExprMap.put(column.getInternalName(), newValues.get(index));
-          schema.set(i, null);
-        }
-      }
-    }
-
-    // rewrite value index for mapjoin
-    Map<Byte, int[]> valueIndices = new HashMap<Byte, int[]>();
-
-    // get the join keys from old parent ReduceSink operators
-    Map<Byte, List<ExprNodeDesc>> keyExprMap = new HashMap<Byte, List<ExprNodeDesc>>();
-
-    // construct valueTableDescs and valueFilteredTableDescs
-    List<TableDesc> valueTableDescs = new ArrayList<TableDesc>();
-    List<TableDesc> valueFilteredTableDescs = new ArrayList<TableDesc>();
-    int[][] filterMap = desc.getFilterMap();
-    for (pos = 0; pos < op.getParentOperators().size(); pos++) {
-      ReduceSinkOperator inputRS = oldReduceSinkParentOps.get(pos);
-      List<ExprNodeDesc> keyCols = inputRS.getConf().getKeyCols();
-      List<ExprNodeDesc> valueCols = newValueExprs.get(pos);
-      if (pos != mapJoinPos) {
-        // remove values in key exprs for value table schema
-        // value expression for hashsink will be modified in LocalMapJoinProcessor
-        int[] valueIndex = new int[valueCols.size()];
-        List<ExprNodeDesc> valueColsInValueExpr = new ArrayList<ExprNodeDesc>();
-        for (int i = 0; i < valueIndex.length; i++) {
-          ExprNodeDesc expr = valueCols.get(i);
-          int kindex = ExprNodeDescUtils.indexOf(expr, keyCols);
-          if (kindex >= 0) {
-            valueIndex[i] = kindex;
-          } else {
-            valueIndex[i] = -valueColsInValueExpr.size() - 1;
-            valueColsInValueExpr.add(expr);
-          }
-        }
-        if (needValueIndex(valueIndex)) {
-          valueIndices.put(pos, valueIndex);
-        }
-        valueCols = valueColsInValueExpr;
-      }
-      // deep copy expr node desc
-      List<ExprNodeDesc> valueFilteredCols = ExprNodeDescUtils.clone(valueCols);
-      if (filterMap != null && filterMap[pos] != null && pos != mapJoinPos) {
-        ExprNodeColumnDesc isFilterDesc = new ExprNodeColumnDesc(TypeInfoFactory
-            .getPrimitiveTypeInfo(serdeConstants.SMALLINT_TYPE_NAME), "filter", "filter", false);
-        valueFilteredCols.add(isFilterDesc);
-      }
-
-      TableDesc valueTableDesc = PlanUtils.getMapJoinValueTableDesc(PlanUtils
-          .getFieldSchemasFromColumnList(valueCols, "mapjoinvalue"));
-      TableDesc valueFilteredTableDesc = PlanUtils.getMapJoinValueTableDesc(PlanUtils
-          .getFieldSchemasFromColumnList(valueFilteredCols, "mapjoinvalue"));
-
-      valueTableDescs.add(valueTableDesc);
-      valueFilteredTableDescs.add(valueFilteredTableDesc);
-
-      keyExprMap.put(pos, keyCols);
-    }
-
-    Map<Byte, List<ExprNodeDesc>> filters = desc.getFilters();
-    Map<Byte, List<ExprNodeDesc>> newFilters = new HashMap<Byte, List<ExprNodeDesc>>();
-    for (Map.Entry<Byte, List<ExprNodeDesc>> entry : filters.entrySet()) {
-      byte srcTag = entry.getKey();
-      List<ExprNodeDesc> filter = entry.getValue();
-
-      Operator<?> terminal = oldReduceSinkParentOps.get(srcTag);
-      newFilters.put(srcTag, ExprNodeDescUtils.backtrack(filter, op, terminal));
-    }
-    desc.setFilters(filters = newFilters);
-
-    // create dumpfile prefix needed to create descriptor
-    String dumpFilePrefix = "";
-    if( joinTree.getMapAliases() != null ) {
-      for(String mapAlias : joinTree.getMapAliases()) {
-        dumpFilePrefix = dumpFilePrefix + mapAlias;
-      }
-      dumpFilePrefix = dumpFilePrefix+"-"+PlanUtils.getCountForMapJoinDumpFilePrefix();
-    } else {
-      dumpFilePrefix = "mapfile"+PlanUtils.getCountForMapJoinDumpFilePrefix();
-    }
-
-    List<ExprNodeDesc> keyCols = keyExprMap.get((byte)mapJoinPos);
-
-    List<String> outputColumnNames = op.getConf().getOutputColumnNames();
-    TableDesc keyTableDesc = PlanUtils.getMapJoinKeyTableDesc(hconf,
-        PlanUtils.getFieldSchemasFromColumnList(keyCols, MAPJOINKEY_FIELDPREFIX));
-    JoinCondDesc[] joinCondns = op.getConf().getConds();
-    MapJoinDesc mapJoinDescriptor = new MapJoinDesc(keyExprMap, keyTableDesc, newValueExprs,
-        valueTableDescs, valueFilteredTableDescs, outputColumnNames, mapJoinPos, joinCondns,
-        filters, op.getConf().getNoOuterJoin(), dumpFilePrefix);
-    mapJoinDescriptor.setStatistics(op.getConf().getStatistics());
-    mapJoinDescriptor.setTagOrder(tagOrder);
-    mapJoinDescriptor.setNullSafes(desc.getNullSafes());
-    mapJoinDescriptor.setFilterMap(desc.getFilterMap());
-    if (!valueIndices.isEmpty()) {
-      mapJoinDescriptor.setValueIndices(valueIndices);
-    }
+    MapJoinDesc mapJoinDescriptor =
+        getMapJoinDesc(hconf, opParseCtxMap, op, joinTree, mapJoinPos, noCheckOuterJoin);
 
     // reduce sink row resolver used to generate map join op
     RowResolver outputRS = opParseCtxMap.get(op).getRowResolver();
@@ -551,6 +402,7 @@ public class MapJoinProcessor implements
     opParseCtxMap.put(mapJoinOp, ctx);
 
     mapJoinOp.getConf().setReversedExprs(op.getConf().getReversedExprs());
+    Map<String, ExprNodeDesc> colExprMap = op.getColumnExprMap();
     mapJoinOp.setColumnExprMap(colExprMap);
 
     List<Operator<? extends OperatorDesc>> childOps = op.getChildOperators();
@@ -1176,4 +1028,168 @@ public class MapJoinProcessor implements
     }
 
   }
+
+  public static MapJoinDesc getMapJoinDesc(HiveConf hconf,
+      LinkedHashMap<Operator<? extends OperatorDesc>, OpParseContext> opParseCtxMap,
+      JoinOperator op, QBJoinTree joinTree, int mapJoinPos, boolean noCheckOuterJoin) throws SemanticException {
+    JoinDesc desc = op.getConf();
+    JoinCondDesc[] condns = desc.getConds();
+    Byte[] tagOrder = desc.getTagOrder();
+
+    // outer join cannot be performed on a table which is being cached
+    if (!noCheckOuterJoin) {
+      if (checkMapJoin(mapJoinPos, condns) < 0) {
+        throw new SemanticException(ErrorMsg.NO_OUTER_MAPJOIN.getMsg());
+      }
+    }
+
+    // Walk over all the sources (which are guaranteed to be reduce sink
+    // operators).
+    // The join outputs a concatenation of all the inputs.
+    QBJoinTree leftSrc = joinTree.getJoinSrc();
+    List<ReduceSinkOperator> oldReduceSinkParentOps =
+        new ArrayList<ReduceSinkOperator>(op.getNumParent());
+    if (leftSrc != null) {
+      // assert mapJoinPos == 0;
+      Operator<? extends OperatorDesc> parentOp = op.getParentOperators().get(0);
+      assert parentOp.getParentOperators().size() == 1;
+      oldReduceSinkParentOps.add((ReduceSinkOperator) parentOp);
+    }
+
+    byte pos = 0;
+    for (String src : joinTree.getBaseSrc()) {
+      if (src != null) {
+        Operator<? extends OperatorDesc> parentOp = op.getParentOperators().get(pos);
+        assert parentOp.getParentOperators().size() == 1;
+        oldReduceSinkParentOps.add((ReduceSinkOperator) parentOp);
+      }
+      pos++;
+    }
+
+    Map<String, ExprNodeDesc> colExprMap = op.getColumnExprMap();
+    List<ColumnInfo> schema = new ArrayList<ColumnInfo>(op.getSchema().getSignature());
+    Map<Byte, List<ExprNodeDesc>> valueExprs = op.getConf().getExprs();
+    Map<Byte, List<ExprNodeDesc>> newValueExprs = new HashMap<Byte, List<ExprNodeDesc>>();
+    for (Map.Entry<Byte, List<ExprNodeDesc>> entry : valueExprs.entrySet()) {
+      byte tag = entry.getKey();
+      Operator<?> terminal = oldReduceSinkParentOps.get(tag);
+
+      List<ExprNodeDesc> values = entry.getValue();
+      List<ExprNodeDesc> newValues = ExprNodeDescUtils.backtrack(values, op, terminal);
+      newValueExprs.put(tag, newValues);
+      for (int i = 0; i < schema.size(); i++) {
+        ColumnInfo column = schema.get(i);
+        if (column == null) {
+          continue;
+        }
+        ExprNodeDesc expr = colExprMap.get(column.getInternalName());
+        int index = ExprNodeDescUtils.indexOf(expr, values);
+        if (index >= 0) {
+          colExprMap.put(column.getInternalName(), newValues.get(index));
+          schema.set(i, null);
+        }
+      }
+    }
+
+    // rewrite value index for mapjoin
+    Map<Byte, int[]> valueIndices = new HashMap<Byte, int[]>();
+
+    // get the join keys from old parent ReduceSink operators
+    Map<Byte, List<ExprNodeDesc>> keyExprMap = new HashMap<Byte, List<ExprNodeDesc>>();
+
+    // construct valueTableDescs and valueFilteredTableDescs
+    List<TableDesc> valueTableDescs = new ArrayList<TableDesc>();
+    List<TableDesc> valueFilteredTableDescs = new ArrayList<TableDesc>();
+    int[][] filterMap = desc.getFilterMap();
+    for (pos = 0; pos < op.getParentOperators().size(); pos++) {
+      ReduceSinkOperator inputRS = oldReduceSinkParentOps.get(pos);
+      List<ExprNodeDesc> keyCols = inputRS.getConf().getKeyCols();
+      List<ExprNodeDesc> valueCols = newValueExprs.get(pos);
+      if (pos != mapJoinPos) {
+        // remove values in key exprs for value table schema
+        // value expression for hashsink will be modified in
+        // LocalMapJoinProcessor
+        int[] valueIndex = new int[valueCols.size()];
+        List<ExprNodeDesc> valueColsInValueExpr = new ArrayList<ExprNodeDesc>();
+        for (int i = 0; i < valueIndex.length; i++) {
+          ExprNodeDesc expr = valueCols.get(i);
+          int kindex = ExprNodeDescUtils.indexOf(expr, keyCols);
+          if (kindex >= 0) {
+            valueIndex[i] = kindex;
+          } else {
+            valueIndex[i] = -valueColsInValueExpr.size() - 1;
+            valueColsInValueExpr.add(expr);
+          }
+        }
+        if (needValueIndex(valueIndex)) {
+          valueIndices.put(pos, valueIndex);
+        }
+        valueCols = valueColsInValueExpr;
+      }
+      // deep copy expr node desc
+      List<ExprNodeDesc> valueFilteredCols = ExprNodeDescUtils.clone(valueCols);
+      if (filterMap != null && filterMap[pos] != null && pos != mapJoinPos) {
+        ExprNodeColumnDesc isFilterDesc =
+            new ExprNodeColumnDesc(
+                TypeInfoFactory.getPrimitiveTypeInfo(serdeConstants.SMALLINT_TYPE_NAME), "filter",
+                "filter", false);
+        valueFilteredCols.add(isFilterDesc);
+      }
+
+      TableDesc valueTableDesc =
+          PlanUtils.getMapJoinValueTableDesc(PlanUtils.getFieldSchemasFromColumnList(valueCols,
+              "mapjoinvalue"));
+      TableDesc valueFilteredTableDesc =
+          PlanUtils.getMapJoinValueTableDesc(PlanUtils.getFieldSchemasFromColumnList(
+              valueFilteredCols, "mapjoinvalue"));
+
+      valueTableDescs.add(valueTableDesc);
+      valueFilteredTableDescs.add(valueFilteredTableDesc);
+
+      keyExprMap.put(pos, keyCols);
+    }
+
+    Map<Byte, List<ExprNodeDesc>> filters = desc.getFilters();
+    Map<Byte, List<ExprNodeDesc>> newFilters = new HashMap<Byte, List<ExprNodeDesc>>();
+    for (Map.Entry<Byte, List<ExprNodeDesc>> entry : filters.entrySet()) {
+      byte srcTag = entry.getKey();
+      List<ExprNodeDesc> filter = entry.getValue();
+
+      Operator<?> terminal = oldReduceSinkParentOps.get(srcTag);
+      newFilters.put(srcTag, ExprNodeDescUtils.backtrack(filter, op, terminal));
+    }
+    desc.setFilters(filters = newFilters);
+
+    // create dumpfile prefix needed to create descriptor
+    String dumpFilePrefix = "";
+    if (joinTree.getMapAliases() != null) {
+      for (String mapAlias : joinTree.getMapAliases()) {
+        dumpFilePrefix = dumpFilePrefix + mapAlias;
+      }
+      dumpFilePrefix = dumpFilePrefix + "-" + PlanUtils.getCountForMapJoinDumpFilePrefix();
+    } else {
+      dumpFilePrefix = "mapfile" + PlanUtils.getCountForMapJoinDumpFilePrefix();
+    }
+
+    List<ExprNodeDesc> keyCols = keyExprMap.get((byte) mapJoinPos);
+
+    List<String> outputColumnNames = op.getConf().getOutputColumnNames();
+    TableDesc keyTableDesc =
+        PlanUtils.getMapJoinKeyTableDesc(hconf,
+            PlanUtils.getFieldSchemasFromColumnList(keyCols, MAPJOINKEY_FIELDPREFIX));
+    JoinCondDesc[] joinCondns = op.getConf().getConds();
+    MapJoinDesc mapJoinDescriptor =
+        new MapJoinDesc(keyExprMap, keyTableDesc, newValueExprs, valueTableDescs,
+            valueFilteredTableDescs, outputColumnNames, mapJoinPos, joinCondns, filters, op
+                .getConf().getNoOuterJoin(), dumpFilePrefix);
+    mapJoinDescriptor.setStatistics(op.getConf().getStatistics());
+    mapJoinDescriptor.setTagOrder(tagOrder);
+    mapJoinDescriptor.setNullSafes(desc.getNullSafes());
+    mapJoinDescriptor.setFilterMap(desc.getFilterMap());
+    if (!valueIndices.isEmpty()) {
+      mapJoinDescriptor.setValueIndices(valueIndices);
+    }
+
+    return mapJoinDescriptor;
+  }
 }

Added: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/MergeJoinProc.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/MergeJoinProc.java?rev=1627235&view=auto
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/MergeJoinProc.java (added)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/MergeJoinProc.java Wed Sep 24 07:03:35 2014
@@ -0,0 +1,100 @@
+package org.apache.hadoop.hive.ql.optimizer;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Stack;
+
+import org.apache.hadoop.hive.ql.exec.CommonMergeJoinOperator;
+import org.apache.hadoop.hive.ql.exec.DummyStoreOperator;
+import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
+import org.apache.hadoop.hive.ql.exec.Operator;
+import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator;
+import org.apache.hadoop.hive.ql.lib.Node;
+import org.apache.hadoop.hive.ql.lib.NodeProcessor;
+import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx;
+import org.apache.hadoop.hive.ql.parse.GenTezProcContext;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.ql.plan.BaseWork;
+import org.apache.hadoop.hive.ql.plan.MergeJoinWork;
+import org.apache.hadoop.hive.ql.plan.OperatorDesc;
+import org.apache.hadoop.hive.ql.plan.TezEdgeProperty;
+import org.apache.hadoop.hive.ql.plan.TezWork;
+import org.apache.hadoop.hive.ql.plan.TezWork.VertexType;
+
+public class MergeJoinProc implements NodeProcessor {
+
+  public Operator<? extends OperatorDesc> getLeafOperator(Operator<? extends OperatorDesc> op) {
+    for (Operator<? extends OperatorDesc> childOp : op.getChildOperators()) {
+      // FileSink or ReduceSink operators are used to create vertices. See
+      // TezCompiler.
+      if ((childOp instanceof ReduceSinkOperator) || (childOp instanceof FileSinkOperator)) {
+        return childOp;
+      } else {
+        return getLeafOperator(childOp);
+      }
+    }
+
+    return null;
+  }
+
+  @Override
+  public Object
+      process(Node nd, Stack<Node> stack, NodeProcessorCtx procCtx, Object... nodeOutputs)
+          throws SemanticException {
+    GenTezProcContext context = (GenTezProcContext) procCtx;
+    CommonMergeJoinOperator mergeJoinOp = (CommonMergeJoinOperator) nd;
+    if (stack.size() < 2 || !(stack.get(stack.size() - 2) instanceof DummyStoreOperator)) {
+      context.currentMergeJoinOperator = mergeJoinOp;
+      return null;
+    }
+
+    TezWork tezWork = context.currentTask.getWork();
+    @SuppressWarnings("unchecked")
+    Operator<? extends OperatorDesc> parentOp =
+        (Operator<? extends OperatorDesc>) ((stack.get(stack.size() - 2)));
+    // Guaranteed to be just 1 because each DummyStoreOperator can be part of only one work.
+    BaseWork parentWork = context.childToWorkMap.get(parentOp).get(0);
+
+
+    // we need to set the merge work that has been created as part of the dummy store walk. If a
+    // merge work already exists for this merge join operator, add the dummy store work to the
+    // merge work. Else create a merge work, add above work to the merge work
+    MergeJoinWork mergeWork = null;
+    if (context.opMergeJoinWorkMap.containsKey(getLeafOperator(mergeJoinOp))) {
+      // we already have the merge work corresponding to this merge join operator
+      mergeWork = context.opMergeJoinWorkMap.get(getLeafOperator(mergeJoinOp));
+    } else {
+      mergeWork = new MergeJoinWork();
+      tezWork.add(mergeWork);
+      context.opMergeJoinWorkMap.put(getLeafOperator(mergeJoinOp), mergeWork);
+    }
+
+    mergeWork.setMergeJoinOperator(mergeJoinOp);
+    mergeWork.addMergedWork(null, parentWork);
+    tezWork.setVertexType(mergeWork, VertexType.MULTI_INPUT_UNINITIALIZED_EDGES);
+
+    for (BaseWork grandParentWork : tezWork.getParents(parentWork)) {
+      parentWork.setName(grandParentWork.getName());
+      TezEdgeProperty edgeProp = tezWork.getEdgeProperty(grandParentWork, parentWork);
+      tezWork.disconnect(grandParentWork, parentWork);
+      tezWork.connect(grandParentWork, mergeWork, edgeProp);
+    }
+
+    for (BaseWork childWork : tezWork.getChildren(parentWork)) {
+      TezEdgeProperty edgeProp = tezWork.getEdgeProperty(parentWork, childWork);
+      tezWork.disconnect(parentWork, childWork);
+      tezWork.connect(mergeWork, childWork, edgeProp);
+    }
+
+    tezWork.remove(parentWork);
+
+    DummyStoreOperator dummyOp = (DummyStoreOperator) (stack.get(stack.size() - 2));
+
+    parentWork.setTag(mergeJoinOp.getTagForOperator(dummyOp));
+
+    mergeJoinOp.getParentOperators().remove(dummyOp);
+    dummyOp.getChildOperators().clear();
+
+    return true;
+  }
+}

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java?rev=1627235&r1=1627234&r2=1627235&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java Wed Sep 24 07:03:35 2014
@@ -51,7 +51,12 @@ public class Optimizer {
    * @param hiveConf
    */
   public void initialize(HiveConf hiveConf) {
+
+    boolean isTezExecEngine = HiveConf.getVar(hiveConf, HiveConf.ConfVars.HIVE_EXECUTION_ENGINE).equals("tez");
+    boolean bucketMapJoinOptimizer = false;
+
     transformations = new ArrayList<Transform>();
+
     // Add the transformation that computes the lineage information.
     transformations.add(new Generator());
     if (HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.HIVEOPTPPD)) {
@@ -81,15 +86,16 @@ public class Optimizer {
     }
     transformations.add(new SamplePruner());
     transformations.add(new MapJoinProcessor());
-    boolean bucketMapJoinOptimizer = false;
-    if (HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.HIVEOPTBUCKETMAPJOIN)) {
+
+    if ((HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.HIVEOPTBUCKETMAPJOIN)) && !isTezExecEngine) {
       transformations.add(new BucketMapJoinOptimizer());
       bucketMapJoinOptimizer = true;
     }
 
     // If optimize hive.optimize.bucketmapjoin.sortedmerge is set, add both
     // BucketMapJoinOptimizer and SortedMergeBucketMapJoinOptimizer
-    if (HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.HIVEOPTSORTMERGEBUCKETMAPJOIN)) {
+    if ((HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.HIVEOPTSORTMERGEBUCKETMAPJOIN))
+        && !isTezExecEngine) {
       if (!bucketMapJoinOptimizer) {
         // No need to add BucketMapJoinOptimizer twice
         transformations.add(new BucketMapJoinOptimizer());
@@ -119,7 +125,7 @@ public class Optimizer {
     if(HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.HIVEOPTCORRELATION) &&
         !HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.HIVEGROUPBYSKEW) &&
         !HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.HIVE_OPTIMIZE_SKEWJOIN_COMPILETIME) &&
-        !HiveConf.getVar(hiveConf, HiveConf.ConfVars.HIVE_EXECUTION_ENGINE).equals("tez")) {
+        !isTezExecEngine) {
       transformations.add(new CorrelationOptimizer());
     }
     if (HiveConf.getFloatVar(hiveConf, HiveConf.ConfVars.HIVELIMITPUSHDOWNMEMORYUSAGE) > 0) {
@@ -128,8 +134,7 @@ public class Optimizer {
     if(HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.HIVEOPTIMIZEMETADATAQUERIES)) {
       transformations.add(new StatsOptimizer());
     }
-    if (pctx.getContext().getExplain()
-        && !HiveConf.getVar(hiveConf, HiveConf.ConfVars.HIVE_EXECUTION_ENGINE).equals("tez")) {
+    if (pctx.getContext().getExplain() && !isTezExecEngine) {
       transformations.add(new AnnotateWithStatistics());
       transformations.add(new AnnotateWithOpTraits());
     }

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ReduceSinkMapJoinProc.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ReduceSinkMapJoinProc.java?rev=1627235&r1=1627234&r2=1627235&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ReduceSinkMapJoinProc.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ReduceSinkMapJoinProc.java Wed Sep 24 07:03:35 2014
@@ -52,6 +52,7 @@ import org.apache.hadoop.hive.ql.plan.Ta
 import org.apache.hadoop.hive.ql.plan.TezEdgeProperty;
 import org.apache.hadoop.hive.ql.plan.TezEdgeProperty.EdgeType;
 import org.apache.hadoop.hive.ql.plan.TezWork;
+import org.apache.hadoop.hive.ql.plan.TezWork.VertexType;
 import org.apache.hadoop.hive.ql.stats.StatsUtils;
 
 public class ReduceSinkMapJoinProc implements NodeProcessor {
@@ -183,7 +184,10 @@ public class ReduceSinkMapJoinProc imple
         TezWork tezWork = context.currentTask.getWork();
         LOG.debug("connecting "+parentWork.getName()+" with "+myWork.getName());
         tezWork.connect(parentWork, myWork, edgeProp);
-        
+        if (edgeType == EdgeType.CUSTOM_EDGE) {
+          tezWork.setVertexType(myWork, VertexType.INITIALIZED_EDGES);
+        }
+
         ReduceSinkOperator r = null;
         if (parentRS.getConf().getOutputName() != null) {
           LOG.debug("Cloning reduce sink for multi-child broadcast edge");

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/metainfo/annotation/OpTraitsRulesProcFactory.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/metainfo/annotation/OpTraitsRulesProcFactory.java?rev=1627235&r1=1627234&r2=1627235&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/metainfo/annotation/OpTraitsRulesProcFactory.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/metainfo/annotation/OpTraitsRulesProcFactory.java Wed Sep 24 07:03:35 2014
@@ -23,6 +23,7 @@ import java.util.List;
 import java.util.Map.Entry;
 import java.util.Stack;
 
+import org.apache.hadoop.hive.metastore.api.Order;
 import org.apache.hadoop.hive.ql.exec.GroupByOperator;
 import org.apache.hadoop.hive.ql.exec.JoinOperator;
 import org.apache.hadoop.hive.ql.exec.Operator;
@@ -104,7 +105,12 @@ public class OpTraitsRulesProcFactory {
 
       List<List<String>> listBucketCols = new ArrayList<List<String>>();
       listBucketCols.add(bucketCols);
-      OpTraits opTraits = new OpTraits(listBucketCols, -1);
+      int numBuckets = -1;
+      OpTraits parentOpTraits = rs.getParentOperators().get(0).getConf().getOpTraits();
+      if (parentOpTraits != null) {
+        numBuckets = parentOpTraits.getNumBuckets();
+      }
+      OpTraits opTraits = new OpTraits(listBucketCols, numBuckets, listBucketCols);
       rs.setOpTraits(opTraits);
       return null;
     }
@@ -163,15 +169,21 @@ public class OpTraitsRulesProcFactory {
       } catch (HiveException e) {
         prunedPartList = null;
       }
-      boolean bucketMapJoinConvertible = checkBucketedTable(table, 
+      boolean isBucketed = checkBucketedTable(table,
           opTraitsCtx.getParseContext(), prunedPartList);
-      List<List<String>>bucketCols = new ArrayList<List<String>>();
+      List<List<String>> bucketColsList = new ArrayList<List<String>>();
+      List<List<String>> sortedColsList = new ArrayList<List<String>>();
       int numBuckets = -1;
-      if (bucketMapJoinConvertible) {
-        bucketCols.add(table.getBucketCols());
+      if (isBucketed) {
+        bucketColsList.add(table.getBucketCols());
         numBuckets = table.getNumBuckets();
+        List<String> sortCols = new ArrayList<String>();
+        for (Order colSortOrder : table.getSortCols()) {
+          sortCols.add(colSortOrder.getCol());
+        }
+        sortedColsList.add(sortCols);
       }
-      OpTraits opTraits = new OpTraits(bucketCols, numBuckets);
+      OpTraits opTraits = new OpTraits(bucketColsList, numBuckets, sortedColsList);
       ts.setOpTraits(opTraits);
       return null;
     }
@@ -197,7 +209,7 @@ public class OpTraitsRulesProcFactory {
 
       List<List<String>> listBucketCols = new ArrayList<List<String>>();
       listBucketCols.add(gbyKeys);
-      OpTraits opTraits = new OpTraits(listBucketCols, -1);
+      OpTraits opTraits = new OpTraits(listBucketCols, -1, listBucketCols);
       gbyOp.setOpTraits(opTraits);
       return null;
     }
@@ -205,22 +217,17 @@ public class OpTraitsRulesProcFactory {
 
   public static class SelectRule implements NodeProcessor {
 
-    @Override
-    public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx procCtx,
-        Object... nodeOutputs) throws SemanticException {
-      SelectOperator selOp = (SelectOperator)nd;
-      List<List<String>> parentBucketColNames = 
-          selOp.getParentOperators().get(0).getOpTraits().getBucketColNames();
-
+    public List<List<String>> getConvertedColNames(List<List<String>> parentColNames,
+        SelectOperator selOp) {
       List<List<String>> listBucketCols = new ArrayList<List<String>>();
       if (selOp.getColumnExprMap() != null) {
-        if (parentBucketColNames != null) {
-          for (List<String> colNames : parentBucketColNames) {
+        if (parentColNames != null) {
+          for (List<String> colNames : parentColNames) {
             List<String> bucketColNames = new ArrayList<String>();
             for (String colName : colNames) {
               for (Entry<String, ExprNodeDesc> entry : selOp.getColumnExprMap().entrySet()) {
                 if (entry.getValue() instanceof ExprNodeColumnDesc) {
-                  if(((ExprNodeColumnDesc)(entry.getValue())).getColumn().equals(colName)) {
+                  if (((ExprNodeColumnDesc) (entry.getValue())).getColumn().equals(colName)) {
                     bucketColNames.add(entry.getKey());
                   }
                 }
@@ -231,11 +238,34 @@ public class OpTraitsRulesProcFactory {
         }
       }
 
+      return listBucketCols;
+    }
+
+    @Override
+    public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx procCtx,
+        Object... nodeOutputs) throws SemanticException {
+      SelectOperator selOp = (SelectOperator)nd;
+      List<List<String>> parentBucketColNames =
+          selOp.getParentOperators().get(0).getOpTraits().getBucketColNames();
+
+      List<List<String>> listBucketCols = null;
+      List<List<String>> listSortCols = null;
+      if (selOp.getColumnExprMap() != null) {
+        if (parentBucketColNames != null) {
+          listBucketCols = getConvertedColNames(parentBucketColNames, selOp);
+        }
+        List<List<String>> parentSortColNames = selOp.getParentOperators().get(0).getOpTraits()
+            .getSortCols();
+        if (parentSortColNames != null) {
+          listSortCols = getConvertedColNames(parentSortColNames, selOp);
+        }
+      }
+
       int numBuckets = -1;
       if (selOp.getParentOperators().get(0).getOpTraits() != null) {
         numBuckets = selOp.getParentOperators().get(0).getOpTraits().getNumBuckets();
       }
-      OpTraits opTraits = new OpTraits(listBucketCols, numBuckets);
+      OpTraits opTraits = new OpTraits(listBucketCols, numBuckets, listSortCols);
       selOp.setOpTraits(opTraits);
       return null;
     }
@@ -248,6 +278,7 @@ public class OpTraitsRulesProcFactory {
         Object... nodeOutputs) throws SemanticException {
       JoinOperator joinOp = (JoinOperator)nd;
       List<List<String>> bucketColsList = new ArrayList<List<String>>();
+      List<List<String>> sortColsList = new ArrayList<List<String>>();
       byte pos = 0;
       for (Operator<? extends OperatorDesc> parentOp : joinOp.getParentOperators()) {
         if (!(parentOp instanceof ReduceSinkOperator)) {
@@ -259,26 +290,24 @@ public class OpTraitsRulesProcFactory {
           ReduceSinkRule rsRule = new ReduceSinkRule();
           rsRule.process(rsOp, stack, procCtx, nodeOutputs);
         }
-        bucketColsList.add(getOutputColNames(joinOp, rsOp, pos));
+        bucketColsList.add(getOutputColNames(joinOp, rsOp.getOpTraits().getBucketColNames(), pos));
+        sortColsList.add(getOutputColNames(joinOp, rsOp.getOpTraits().getSortCols(), pos));
         pos++;
       }
 
-      joinOp.setOpTraits(new OpTraits(bucketColsList, -1));
+      joinOp.setOpTraits(new OpTraits(bucketColsList, -1, bucketColsList));
       return null;
     }
 
-    private List<String> getOutputColNames(JoinOperator joinOp,
-        ReduceSinkOperator rs, byte pos) {
-      List<List<String>> parentBucketColNames =
-          rs.getOpTraits().getBucketColNames();
-
-      if (parentBucketColNames != null) {
+    private List<String> getOutputColNames(JoinOperator joinOp, List<List<String>> parentColNames,
+        byte pos) {
+      if (parentColNames != null) {
         List<String> bucketColNames = new ArrayList<String>();
 
         // guaranteed that there is only 1 list within this list because
         // a reduce sink always brings down the bucketing cols to a single list.
         // may not be true with correlation operators (mux-demux)
-        List<String> colNames = parentBucketColNames.get(0);
+        List<String> colNames = parentColNames.get(0);
         for (String colName : colNames) {
           for (ExprNodeDesc exprNode : joinOp.getConf().getExprs().get(pos)) {
             if (exprNode instanceof ExprNodeColumnDesc) {
@@ -317,7 +346,7 @@ public class OpTraitsRulesProcFactory {
     @Override
     public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx procCtx,
         Object... nodeOutputs) throws SemanticException {
-      OpTraits opTraits = new OpTraits(null, -1);
+      OpTraits opTraits = new OpTraits(null, -1, null);
       @SuppressWarnings("unchecked")
       Operator<? extends OperatorDesc> operator = (Operator<? extends OperatorDesc>)nd;
       operator.setOpTraits(opTraits);

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/CrossProductCheck.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/CrossProductCheck.java?rev=1627235&r1=1627234&r2=1627235&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/CrossProductCheck.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/CrossProductCheck.java Wed Sep 24 07:03:35 2014
@@ -32,6 +32,7 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hive.ql.exec.AbstractMapJoinOperator;
 import org.apache.hadoop.hive.ql.exec.ConditionalTask;
+import org.apache.hadoop.hive.ql.exec.CommonMergeJoinOperator;
 import org.apache.hadoop.hive.ql.exec.JoinOperator;
 import org.apache.hadoop.hive.ql.exec.MapJoinOperator;
 import org.apache.hadoop.hive.ql.exec.Operator;
@@ -56,6 +57,7 @@ import org.apache.hadoop.hive.ql.plan.Ex
 import org.apache.hadoop.hive.ql.plan.MapJoinDesc;
 import org.apache.hadoop.hive.ql.plan.MapWork;
 import org.apache.hadoop.hive.ql.plan.MapredWork;
+import org.apache.hadoop.hive.ql.plan.MergeJoinWork;
 import org.apache.hadoop.hive.ql.plan.OperatorDesc;
 import org.apache.hadoop.hive.ql.plan.ReduceSinkDesc;
 import org.apache.hadoop.hive.ql.plan.ReduceWork;
@@ -152,6 +154,11 @@ public class CrossProductCheck implement
 
   private void checkMapJoins(TezWork tzWrk) throws SemanticException {
     for(BaseWork wrk : tzWrk.getAllWork() ) {
+
+      if ( wrk instanceof MergeJoinWork ) {
+        wrk = ((MergeJoinWork)wrk).getMainWork();
+      }
+
       List<String> warnings = new MapJoinCheck(wrk.getName()).analyze(wrk);
       if ( !warnings.isEmpty() ) {
         for(String w : warnings) {
@@ -163,12 +170,17 @@ public class CrossProductCheck implement
 
   private void checkTezReducer(TezWork tzWrk) throws SemanticException {
     for(BaseWork wrk : tzWrk.getAllWork() ) {
-      if ( !(wrk instanceof ReduceWork) ) {
+
+      if ( wrk instanceof MergeJoinWork ) {
+        wrk = ((MergeJoinWork)wrk).getMainWork();
+      }
+
+      if ( !(wrk instanceof ReduceWork ) ) {
         continue;
       }
       ReduceWork rWork = (ReduceWork) wrk;
       Operator<? extends OperatorDesc> reducer = ((ReduceWork)wrk).getReducer();
-      if ( reducer instanceof JoinOperator ) {
+      if ( reducer instanceof JoinOperator || reducer instanceof CommonMergeJoinOperator ) {
         Map<Integer, ExtractReduceSinkInfo.Info> rsInfo =
             new HashMap<Integer, ExtractReduceSinkInfo.Info>();
         for(Map.Entry<Integer, String> e : rWork.getTagToInput().entrySet()) {
@@ -185,7 +197,7 @@ public class CrossProductCheck implement
       return;
     }
     Operator<? extends OperatorDesc> reducer = rWrk.getReducer();
-    if ( reducer instanceof JoinOperator ) {
+    if ( reducer instanceof JoinOperator|| reducer instanceof CommonMergeJoinOperator ) {
       BaseWork prntWork = mrWrk.getMapWork();
       checkForCrossProduct(taskName, reducer,
           new ExtractReduceSinkInfo(null).analyze(prntWork));

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezProcContext.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezProcContext.java?rev=1627235&r1=1627234&r2=1627235&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezProcContext.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezProcContext.java Wed Sep 24 07:03:35 2014
@@ -29,6 +29,7 @@ import java.util.Set;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.ql.exec.AppMasterEventOperator;
+import org.apache.hadoop.hive.ql.exec.CommonMergeJoinOperator;
 import org.apache.hadoop.hive.ql.exec.DependencyCollectionTask;
 import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
 import org.apache.hadoop.hive.ql.exec.MapJoinOperator;
@@ -45,6 +46,7 @@ import org.apache.hadoop.hive.ql.lib.Nod
 import org.apache.hadoop.hive.ql.plan.BaseWork;
 import org.apache.hadoop.hive.ql.plan.DependencyCollectionWork;
 import org.apache.hadoop.hive.ql.plan.FileSinkDesc;
+import org.apache.hadoop.hive.ql.plan.MergeJoinWork;
 import org.apache.hadoop.hive.ql.plan.MoveWork;
 import org.apache.hadoop.hive.ql.plan.OperatorDesc;
 import org.apache.hadoop.hive.ql.plan.TezEdgeProperty;
@@ -132,6 +134,8 @@ public class GenTezProcContext implement
 
   // remember which reducesinks we've already connected
   public final Set<ReduceSinkOperator> connectedReduceSinks;
+  public final Map<Operator<?>, MergeJoinWork> opMergeJoinWorkMap;
+  public CommonMergeJoinOperator currentMergeJoinOperator;
 
   // remember the event operators we've seen
   public final Set<AppMasterEventOperator> eventOperatorSet;
@@ -176,6 +180,8 @@ public class GenTezProcContext implement
     this.eventOperatorSet = new LinkedHashSet<AppMasterEventOperator>();
     this.abandonedEventOperatorSet = new LinkedHashSet<AppMasterEventOperator>();
     this.tsToEventMap = new LinkedHashMap<TableScanOperator, List<AppMasterEventOperator>>();
+    this.opMergeJoinWorkMap = new LinkedHashMap<Operator<?>, MergeJoinWork>();
+    this.currentMergeJoinOperator = null;
 
     rootTasks.add(currentTask);
   }

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezUtils.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezUtils.java?rev=1627235&r1=1627234&r2=1627235&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezUtils.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezUtils.java Wed Sep 24 07:03:35 2014
@@ -167,7 +167,8 @@ public class GenTezUtils {
     GenMapRedUtils.setKeyAndValueDesc(reduceWork, reduceSink);
 
     // remember which parent belongs to which tag
-    reduceWork.getTagToInput().put(reduceSink.getConf().getTag(),
+    int tag = reduceSink.getConf().getTag();
+    reduceWork.getTagToInput().put(tag == -1 ? 0 : tag,
          context.preceedingWork.getName());
 
     // remember the output name of the reduce sink



Mime
View raw message