hive-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From c..@apache.org
Subject svn commit: r1382098 [1/2] - in /hive/trunk: ql/src/java/org/apache/hadoop/hive/ql/exec/ ql/src/java/org/apache/hadoop/hive/ql/optimizer/ ql/src/java/org/apache/hadoop/hive/ql/plan/ ql/src/java/org/apache/hadoop/hive/ql/util/ ql/src/test/queries/client...
Date Fri, 07 Sep 2012 17:40:15 GMT
Author: cws
Date: Fri Sep  7 17:40:14 2012
New Revision: 1382098

URL: http://svn.apache.org/viewvc?rev=1382098&view=rev
Log:
HIVE-3171. Bucketed sort merge join doesn't work when multiple files exist for small alias (Navis Ryu via cws)

Added:
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/util/ObjectPair.java   (with props)
    hive/trunk/ql/src/test/queries/clientpositive/bucketcontext_5.q
    hive/trunk/ql/src/test/queries/clientpositive/bucketcontext_6.q
    hive/trunk/ql/src/test/results/clientpositive/bucketcontext_5.q.out
    hive/trunk/ql/src/test/results/clientpositive/bucketcontext_6.q.out
Modified:
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecMapperContext.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchOperator.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapredLocalTask.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/SMBMapJoinOperator.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SortedMergeBucketMapJoinOptimizer.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/FetchWork.java
    hive/trunk/ql/src/test/queries/clientpositive/bucketcontext_1.q
    hive/trunk/ql/src/test/queries/clientpositive/bucketcontext_2.q
    hive/trunk/ql/src/test/queries/clientpositive/bucketcontext_3.q
    hive/trunk/ql/src/test/queries/clientpositive/bucketcontext_4.q
    hive/trunk/ql/src/test/results/clientpositive/bucketcontext_2.q.out
    hive/trunk/ql/src/test/results/clientpositive/bucketcontext_3.q.out
    hive/trunk/ql/src/test/results/clientpositive/bucketcontext_4.q.out
    hive/trunk/serde/src/java/org/apache/hadoop/hive/serde2/ColumnProjectionUtils.java

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecMapperContext.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecMapperContext.java?rev=1382098&r1=1382097&r2=1382098&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecMapperContext.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecMapperContext.java Fri Sep  7 17:40:14 2012
@@ -17,18 +17,12 @@
  */
 package org.apache.hadoop.hive.ql.exec;
 
-import java.util.Iterator;
-import java.util.List;
 import java.util.Map;
 
 import org.apache.commons.logging.Log;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.ql.io.IOContext;
-import org.apache.hadoop.hive.ql.plan.BucketMapJoinContext;
 import org.apache.hadoop.hive.ql.plan.MapredLocalWork;
 import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.util.ReflectionUtils;
 
 public class ExecMapperContext {
 
@@ -110,26 +104,6 @@ public class ExecMapperContext {
     this.lastInputFile = lastInputFile;
   }
 
-
-  private void setUpFetchOpContext(FetchOperator fetchOp, String alias)
-      throws Exception {
-    String currentInputFile = HiveConf.getVar(jc,
-        HiveConf.ConfVars.HADOOPMAPFILENAME);
-    BucketMapJoinContext bucketMatcherCxt = this.localWork
-        .getBucketMapjoinContext();
-    Class<? extends BucketMatcher> bucketMatcherCls = bucketMatcherCxt
-        .getBucketMatcherClass();
-    BucketMatcher bucketMatcher = (BucketMatcher) ReflectionUtils.newInstance(
-        bucketMatcherCls, null);
-    bucketMatcher.setAliasBucketFileNameMapping(bucketMatcherCxt
-        .getAliasBucketFileNameMapping());
-
-    List<Path> aliasFiles = bucketMatcher.getAliasBucketFiles(currentInputFile,
-        bucketMatcherCxt.getMapJoinBigTableAlias(), alias);
-    Iterator<Path> iter = aliasFiles.iterator();
-    fetchOp.setupContext(iter, null);
-  }
-
   public String getCurrentInputFile() {
     currentInputFile = this.ioCxt.getInputFile();
     return currentInputFile;

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchOperator.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchOperator.java?rev=1382098&r1=1382097&r2=1382098&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchOperator.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchOperator.java Fri Sep  7 17:40:14 2012
@@ -143,6 +143,10 @@ public class FetchOperator implements Se
     } else {
       isNativeTable = true;
     }
+    setupExecContext();
+  }
+
+  private void setupExecContext() {
     if (hasVC || work.getSplitSample() != null) {
       context = new ExecMapperContext();
       if (operator != null) {
@@ -536,6 +540,7 @@ public class FetchOperator implements Se
         context.clear();
         context = null;
       }
+      this.currTbl = null;
       this.currPath = null;
       this.iterPath = null;
       this.iterPartDesc = null;
@@ -546,21 +551,16 @@ public class FetchOperator implements Se
   }
 
   /**
-   * used for bucket map join. there is a hack for getting partitionDesc. bucket map join right now
-   * only allow one partition present in bucket map join.
+   * used for bucket map join
    */
-  public void setupContext(Iterator<Path> iterPath, Iterator<PartitionDesc> iterPartDesc) {
-    this.iterPath = iterPath;
-    this.iterPartDesc = iterPartDesc;
-    if (iterPartDesc == null) {
-      if (work.isNotPartitioned()) {
-        this.currTbl = work.getTblDesc();
-      } else {
-        // hack, get the first.
-        List<PartitionDesc> listParts = work.getPartDesc();
-        currPart = listParts.isEmpty() ? null : listParts.get(0);
-      }
+  public void setupContext(List<Path> paths) {
+    this.iterPath = paths.iterator();
+    if (work.isNotPartitioned()) {
+      this.currTbl = work.getTblDesc();
+    } else {
+      this.iterPartDesc = work.getPartDescs(paths).iterator();
     }
+    setupExecContext();
   }
 
   /**

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapredLocalTask.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapredLocalTask.java?rev=1382098&r1=1382097&r2=1382098&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapredLocalTask.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapredLocalTask.java Fri Sep  7 17:40:14 2012
@@ -28,7 +28,6 @@ import java.util.ArrayList;
 import java.util.Calendar;
 import java.util.Collection;
 import java.util.HashMap;
-import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
@@ -434,8 +433,7 @@ public class MapredLocalTask extends Tas
 
     List<Path> aliasFiles = bucketMatcher.getAliasBucketFiles(currentInputFile, bucketMatcherCxt
         .getMapJoinBigTableAlias(), alias);
-    Iterator<Path> iter = aliasFiles.iterator();
-    fetchOp.setupContext(iter, null);
+    fetchOp.setupContext(aliasFiles);
   }
 
   @Override

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/SMBMapJoinOperator.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/SMBMapJoinOperator.java?rev=1382098&r1=1382097&r2=1382098&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/SMBMapJoinOperator.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/SMBMapJoinOperator.java Fri Sep  7 17:40:14 2012
@@ -17,10 +17,12 @@
  */
 package org.apache.hadoop.hive.ql.exec;
 
+import java.io.IOException;
 import java.io.Serializable;
 import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
 import java.util.HashMap;
-import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 
@@ -38,12 +40,14 @@ import org.apache.hadoop.hive.ql.plan.Ma
 import org.apache.hadoop.hive.ql.plan.OperatorDesc;
 import org.apache.hadoop.hive.ql.plan.SMBJoinDesc;
 import org.apache.hadoop.hive.ql.plan.api.OperatorType;
+import org.apache.hadoop.hive.ql.util.ObjectPair;
 import org.apache.hadoop.hive.serde2.ColumnProjectionUtils;
 import org.apache.hadoop.hive.serde2.objectinspector.InspectableObject;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.io.WritableComparator;
 import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.util.PriorityQueue;
 import org.apache.hadoop.util.ReflectionUtils;
 
 /**
@@ -58,14 +62,15 @@ public class SMBMapJoinOperator extends 
       .getName());
 
   private MapredLocalWork localWork = null;
-  private Map<String, FetchOperator> fetchOperators;
+  private Map<String, MergeQueue> aliasToMergeQueue = Collections.emptyMap();
+
   transient ArrayList<Object>[] keyWritables;
   transient ArrayList<Object>[] nextKeyWritables;
   RowContainer<ArrayList<Object>>[] nextGroupStorage;
   RowContainer<ArrayList<Object>>[] candidateStorage;
 
   transient Map<Byte, String> tagToAlias;
-  private transient boolean[] fetchOpDone;
+  private transient boolean[] fetchDone;
   private transient boolean[] foundNextKeyGroup;
   transient boolean firstFetchHappened = false;
   private transient boolean inputFileChanged = false;
@@ -102,7 +107,7 @@ public class SMBMapJoinOperator extends 
     candidateStorage = new RowContainer[maxAlias];
     keyWritables = new ArrayList[maxAlias];
     nextKeyWritables = new ArrayList[maxAlias];
-    fetchOpDone = new boolean[maxAlias];
+    fetchDone = new boolean[maxAlias];
     foundNextKeyGroup = new boolean[maxAlias];
 
     int bucketSize = HiveConf.getIntVar(hconf,
@@ -123,7 +128,7 @@ public class SMBMapJoinOperator extends 
 
     for (Byte alias : order) {
       if(alias != (byte) posBigTable) {
-        fetchOpDone[alias] = false;
+        fetchDone[alias] = false;
       }
       foundNextKeyGroup[alias] = false;
     }
@@ -142,42 +147,51 @@ public class SMBMapJoinOperator extends 
     }
     localWorkInited = true;
     this.localWork = localWork;
-    fetchOperators = new HashMap<String, FetchOperator>();
+    aliasToMergeQueue = new HashMap<String, MergeQueue>();
 
-    Map<FetchOperator, JobConf> fetchOpJobConfMap = new HashMap<FetchOperator, JobConf>();
     // create map local operators
-    for (Map.Entry<String, FetchWork> entry : localWork.getAliasToFetchWork()
-        .entrySet()) {
-      JobConf jobClone = new JobConf(hconf);
-      Operator<? extends OperatorDesc> tableScan = localWork.getAliasToWork()
-        .get(entry.getKey());
-      if (tableScan instanceof TableScanOperator) {
-        ArrayList<Integer> list = ((TableScanOperator)tableScan).getNeededColumnIDs();
-        if (list != null) {
-          ColumnProjectionUtils.appendReadColumnIDs(jobClone, list);
-        }
-      } else {
-        ColumnProjectionUtils.setFullyReadColumns(jobClone);
+    Map<String,FetchWork> aliasToFetchWork = localWork.getAliasToFetchWork();
+    Map<String, Operator<? extends OperatorDesc>> aliasToWork = localWork.getAliasToWork();
+
+    for (Map.Entry<String, FetchWork> entry : aliasToFetchWork.entrySet()) {
+      String alias = entry.getKey();
+      FetchWork fetchWork = entry.getValue();
+
+      Operator<? extends OperatorDesc> forwardOp = aliasToWork.get(alias);
+      forwardOp.setExecContext(getExecContext());
+
+      JobConf jobClone = cloneJobConf(hconf, forwardOp);
+      FetchOperator fetchOp = new FetchOperator(fetchWork, jobClone);
+      forwardOp.initialize(jobClone, new ObjectInspector[]{fetchOp.getOutputObjectInspector()});
+      fetchOp.clearFetchContext();
+
+      MergeQueue mergeQueue = new MergeQueue(alias, fetchWork, jobClone);
+
+      aliasToMergeQueue.put(alias, mergeQueue);
+      l4j.info("fetch operators for " + alias + " initialized");
+    }
+  }
+
+  private JobConf cloneJobConf(Configuration hconf, Operator<?> op) {
+    JobConf jobClone = new JobConf(hconf);
+    if (op instanceof TableScanOperator) {
+      List<Integer> list = ((TableScanOperator)op).getNeededColumnIDs();
+      if (list != null) {
+        ColumnProjectionUtils.appendReadColumnIDs(jobClone, list);
       }
-      FetchOperator fetchOp = new FetchOperator(entry.getValue(),jobClone);
-      fetchOpJobConfMap.put(fetchOp, jobClone);
-      fetchOperators.put(entry.getKey(), fetchOp);
-      l4j.info("fetchoperator for " + entry.getKey() + " created");
-    }
-
-    for (Map.Entry<String, FetchOperator> entry : fetchOperators.entrySet()) {
-      Operator<? extends OperatorDesc> forwardOp = localWork.getAliasToWork()
-        .get(entry.getKey());
-      // All the operators need to be initialized before process
-      forwardOp.setExecContext(this.getExecContext());
-      FetchOperator fetchOp = entry.getValue();
-      JobConf jobConf = fetchOpJobConfMap.get(fetchOp);
-      if (jobConf == null) {
-        jobConf = this.getExecContext().getJc();
+    } else {
+      ColumnProjectionUtils.setFullyReadColumns(jobClone);
+    }
+    return jobClone;
+  }
+
+  private byte tagForAlias(String alias) {
+    for (Map.Entry<Byte, String> entry : tagToAlias.entrySet()) {
+      if (entry.getValue().equals(alias)) {
+        return entry.getKey();
       }
-      forwardOp.initialize(jobConf, new ObjectInspector[] {fetchOp.getOutputObjectInspector()});
-      l4j.info("fetchoperator for " + entry.getKey() + " initialized");
     }
+    return -1;
   }
 
   // The input file has changed - load the correct hash bucket
@@ -196,11 +210,10 @@ public class SMBMapJoinOperator extends 
           joinFinalLeftData();
         }
         // set up the fetch operator for the new input file.
-        for (Map.Entry<String, FetchOperator> entry : fetchOperators.entrySet()) {
+        for (Map.Entry<String, MergeQueue> entry : aliasToMergeQueue.entrySet()) {
           String alias = entry.getKey();
-          FetchOperator fetchOp = entry.getValue();
-          fetchOp.clearFetchContext();
-          setUpFetchOpContext(fetchOp, alias);
+          MergeQueue mergeQueue = entry.getValue();
+          setUpFetchContexts(alias, mergeQueue);
         }
         firstFetchHappened = false;
         inputFileChanged = false;
@@ -218,7 +231,6 @@ public class SMBMapJoinOperator extends 
     }
 
     byte alias = (byte) tag;
-    // compute keys and values as StandardObjects
 
     // compute keys and values as StandardObjects
     ArrayList<Object> key = JoinUtil.computeKeys(row, joinKeys.get(alias),
@@ -268,24 +280,24 @@ public class SMBMapJoinOperator extends 
   private void joinFinalLeftData() throws HiveException {
     RowContainer bigTblRowContainer = this.candidateStorage[this.posBigTable];
 
-    boolean allFetchOpDone = allFetchOpDone();
+    boolean allFetchDone = allFetchDone();
     // if all left data in small tables are less than and equal to the left data
     // in big table, let's them catch up
     while (bigTblRowContainer != null && bigTblRowContainer.size() > 0
-        && !allFetchOpDone) {
+        && !allFetchDone) {
       joinOneGroup();
       bigTblRowContainer = this.candidateStorage[this.posBigTable];
-      allFetchOpDone = allFetchOpDone();
+      allFetchDone = allFetchDone();
     }
 
-    while (!allFetchOpDone) {
+    while (!allFetchDone) {
       List<Byte> ret = joinOneGroup();
       if (ret == null || ret.size() == 0) {
         break;
       }
       reportProgress();
       numMapRowsRead++;
-      allFetchOpDone = allFetchOpDone();
+      allFetchDone = allFetchDone();
     }
 
     boolean dataInCache = true;
@@ -307,15 +319,15 @@ public class SMBMapJoinOperator extends 
     }
   }
 
-  private boolean allFetchOpDone() {
-    boolean allFetchOpDone = true;
+  private boolean allFetchDone() {
+    boolean allFetchDone = true;
     for (Byte tag : order) {
       if(tag == (byte) posBigTable) {
         continue;
       }
-      allFetchOpDone = allFetchOpDone && fetchOpDone[tag];
+      allFetchDone = allFetchDone && fetchDone[tag];
     }
-    return allFetchOpDone;
+    return allFetchDone;
   }
 
   private List<Byte> joinOneGroup() throws HiveException {
@@ -381,12 +393,12 @@ public class SMBMapJoinOperator extends 
 
     //for tables other than the big table, we need to fetch more data until reach a new group or done.
     while (!foundNextKeyGroup[t]) {
-      if (fetchOpDone[t]) {
+      if (fetchDone[t]) {
         break;
       }
       fetchOneRow(t);
     }
-    if (!foundNextKeyGroup[t] && fetchOpDone[t]) {
+    if (!foundNextKeyGroup[t] && fetchDone[t]) {
       this.nextKeyWritables[t] = null;
     }
   }
@@ -400,10 +412,10 @@ public class SMBMapJoinOperator extends 
     this.nextGroupStorage[t] = oldRowContainer;
   }
 
-  private int compareKeys (ArrayList<Object> k1, ArrayList<Object> k2) {
+  private int compareKeys (List<Object> k1, List<Object> k2) {
     int ret = 0;
 
-   // join keys have difference sizes?
+    // join keys have difference sizes?
     ret = k1.size() - k2.size();
     if (ret != 0) {
       return ret;
@@ -475,53 +487,51 @@ public class SMBMapJoinOperator extends 
     }
   }
 
-  private void setUpFetchOpContext(FetchOperator fetchOp, String alias) {
+  private void setUpFetchContexts(String alias, MergeQueue mergeQueue) throws HiveException {
+    mergeQueue.clearFetchContext();
+
     String currentInputFile = getExecContext().getCurrentInputFile();
-    BucketMapJoinContext bucketMatcherCxt = localWork.getBucketMapjoinContext();
 
-    Class<? extends BucketMatcher> bucketMatcherCls = bucketMatcherCxt
-        .getBucketMatcherClass();
-    BucketMatcher bucketMatcher = (BucketMatcher) ReflectionUtils.newInstance(
-        bucketMatcherCls, null);
+    BucketMapJoinContext bucketMatcherCxt = localWork.getBucketMapjoinContext();
+    Class<? extends BucketMatcher> bucketMatcherCls = bucketMatcherCxt.getBucketMatcherClass();
+    BucketMatcher bucketMatcher = ReflectionUtils.newInstance(bucketMatcherCls, null);
 
     getExecContext().setFileId(bucketMatcherCxt.createFileId(currentInputFile));
     LOG.info("set task id: " + getExecContext().getFileId());
 
     bucketMatcher.setAliasBucketFileNameMapping(bucketMatcherCxt
         .getAliasBucketFileNameMapping());
+
     List<Path> aliasFiles = bucketMatcher.getAliasBucketFiles(currentInputFile,
         bucketMatcherCxt.getMapJoinBigTableAlias(), alias);
 
-    Iterator<Path> iter = aliasFiles.iterator();
-    fetchOp.setupContext(iter, null);
+    mergeQueue.setupContext(aliasFiles);
   }
 
   private void fetchOneRow(byte tag) {
-    if (fetchOperators != null) {
-      String tble = this.tagToAlias.get(tag);
-      FetchOperator fetchOp = fetchOperators.get(tble);
+    String table = tagToAlias.get(tag);
+    MergeQueue mergeQueue = aliasToMergeQueue.get(table);
 
-      Operator<? extends OperatorDesc> forwardOp = localWork.getAliasToWork()
-          .get(tble);
-      try {
-        InspectableObject row = fetchOp.getNextRow();
-        if (row == null) {
-          this.fetchOpDone[tag] = true;
-          return;
-        }
-        forwardOp.process(row.o, 0);
-        // check if any operator had a fatal error or early exit during
-        // execution
-        if (forwardOp.getDone()) {
-          this.fetchOpDone[tag] = true;
-        }
-      } catch (Throwable e) {
-        if (e instanceof OutOfMemoryError) {
-          // Don't create a new object if we are already out of memory
-          throw (OutOfMemoryError) e;
-        } else {
-          throw new RuntimeException("Map local work failed", e);
-        }
+    Operator<? extends OperatorDesc> forwardOp = localWork.getAliasToWork()
+        .get(table);
+    try {
+      InspectableObject row = mergeQueue.getNextRow();
+      if (row == null) {
+        fetchDone[tag] = true;
+        return;
+      }
+      forwardOp.process(row.o, 0);
+      // check if any operator had a fatal error or early exit during
+      // execution
+      if (forwardOp.getDone()) {
+        fetchDone[tag] = true;
+      }
+    } catch (Throwable e) {
+      if (e instanceof OutOfMemoryError) {
+        // Don't create a new object if we are already out of memory
+        throw (OutOfMemoryError) e;
+      } else {
+        throw new RuntimeException("Map local work failed", e);
       }
     }
   }
@@ -536,11 +546,10 @@ public class SMBMapJoinOperator extends 
 
     if (inputFileChanged || !firstFetchHappened) {
       //set up the fetch operator for the new input file.
-      for (Map.Entry<String, FetchOperator> entry : fetchOperators.entrySet()) {
+      for (Map.Entry<String, MergeQueue> entry : aliasToMergeQueue.entrySet()) {
         String alias = entry.getKey();
-        FetchOperator fetchOp = entry.getValue();
-        fetchOp.clearFetchContext();
-        setUpFetchOpContext(fetchOp, alias);
+        MergeQueue mergeQueue = entry.getValue();
+        setUpFetchContexts(alias, mergeQueue);
       }
       firstFetchHappened = true;
       for (Byte t : order) {
@@ -556,7 +565,7 @@ public class SMBMapJoinOperator extends 
     //clean up
     for (Byte alias : order) {
       if(alias != (byte) posBigTable) {
-        fetchOpDone[alias] = false;
+        fetchDone[alias] = false;
       }
       foundNextKeyGroup[alias] = false;
     }
@@ -564,12 +573,12 @@ public class SMBMapJoinOperator extends 
     localWorkInited = false;
 
     super.closeOp(abort);
-    if (fetchOperators != null) {
-      for (Map.Entry<String, FetchOperator> entry : fetchOperators.entrySet()) {
-        Operator<? extends OperatorDesc> forwardOp = localWork
-            .getAliasToWork().get(entry.getKey());
-        forwardOp.close(abort);
-      }
+    for (Map.Entry<String, MergeQueue> entry : aliasToMergeQueue.entrySet()) {
+      String alias = entry.getKey();
+      MergeQueue mergeQueue = entry.getValue();
+      Operator forwardOp = localWork.getAliasToWork().get(alias);
+      forwardOp.close(abort);
+      mergeQueue.clearFetchContext();
     }
   }
 
@@ -592,4 +601,145 @@ public class SMBMapJoinOperator extends 
   public OperatorType getType() {
     return OperatorType.MAPJOIN;
   }
+
+  // returns rows from possibly multiple bucket files of small table in ascending order
+  // by utilizing primary queue (borrowed from hadoop)
+  // elements of queue (Integer) are index to FetchOperator[] (segments)
+  private class MergeQueue extends PriorityQueue<Integer> {
+
+    private final String alias;
+    private final FetchWork fetchWork;
+    private final JobConf jobConf;
+
+    // for keeping track of the number of elements read. just for debugging
+    transient int counter;
+
+    transient FetchOperator[] segments;
+    transient List<ExprNodeEvaluator> keyFields;
+    transient List<ObjectInspector> keyFieldOIs;
+
+    // index of FetchOperator which is providing smallest one
+    transient Integer currentMinSegment;
+    transient ObjectPair<List<Object>, InspectableObject>[] keys;
+
+    public MergeQueue(String alias, FetchWork fetchWork, JobConf jobConf) {
+      this.alias = alias;
+      this.fetchWork = fetchWork;
+      this.jobConf = jobConf;
+    }
+
+    // paths = bucket files of small table for current bucket file of big table
+    // initializes a FetchOperator for each file in paths, reuses FetchOperator if possible
+    // currently, number of paths is always the same (bucket numbers are all the same over
+    // all partitions in a table).
+    // But if hive supports assigning bucket number for each partition, this can be vary
+    public void setupContext(List<Path> paths) throws HiveException {
+      int segmentLen = paths.size();
+      FetchOperator[] segments = segmentsForSize(segmentLen);
+      for (int i = 0 ; i < segmentLen; i++) {
+        Path path = paths.get(i);
+        if (segments[i] == null) {
+          segments[i] = new FetchOperator(fetchWork, new JobConf(jobConf));
+        }
+        segments[i].setupContext(Arrays.asList(path));
+      }
+      initialize(segmentLen);
+      for (int i = 0; i < segmentLen; i++) {
+        if (nextHive(i)) {
+          put(i);
+        }
+      }
+      counter = 0;
+    }
+
+    @SuppressWarnings("unchecked")
+    private FetchOperator[] segmentsForSize(int segmentLen) {
+      if (segments == null || segments.length < segmentLen) {
+        FetchOperator[] newSegments = new FetchOperator[segmentLen];
+        ObjectPair<List<Object>, InspectableObject>[] newKeys = new ObjectPair[segmentLen];
+        if (segments != null) {
+          System.arraycopy(segments, 0, newSegments, 0, segments.length);
+          System.arraycopy(keys, 0, newKeys, 0, keys.length);
+        }
+        segments = newSegments;
+        keys = newKeys;
+      }
+      return segments;
+    }
+
+    public void clearFetchContext() throws HiveException {
+      if (segments != null) {
+        for (FetchOperator op : segments) {
+          if (op != null) {
+            op.clearFetchContext();
+          }
+        }
+      }
+    }
+
+    protected boolean lessThan(Object a, Object b) {
+      return compareKeys(keys[(Integer) a].getFirst(), keys[(Integer)b].getFirst()) < 0;
+    }
+
+    public final InspectableObject getNextRow() throws IOException {
+      if (currentMinSegment != null) {
+        adjustPriorityQueue(currentMinSegment);
+      }
+      Integer current = top();
+      if (current == null) {
+        LOG.info("MergeQueue forwarded " + counter + " rows");
+        return null;
+      }
+      counter++;
+      return keys[currentMinSegment = current].getSecond();
+    }
+
+    private void adjustPriorityQueue(Integer current) throws IOException {
+      if (nextIO(current)) {
+        adjustTop();  // sort
+      } else {
+        pop();
+      }
+    }
+
+    // wrapping for exception handling
+    private boolean nextHive(Integer current) throws HiveException {
+      try {
+        return next(current);
+      } catch (IOException e) {
+        throw new HiveException(e);
+      }
+    }
+
+    // wrapping for exception handling
+    private boolean nextIO(Integer current) throws IOException {
+      try {
+        return next(current);
+      } catch (HiveException e) {
+        throw new IOException(e);
+      }
+    }
+
+    // return true if current min segment(FetchOperator) has next row
+    private boolean next(Integer current) throws IOException, HiveException {
+      if (keyFields == null) {
+        // joinKeys/joinKeysOI are initialized after making merge queue, so setup lazily at runtime
+        byte tag = tagForAlias(alias);
+        keyFields = joinKeys.get(tag);
+        keyFieldOIs = joinKeysObjectInspectors.get(tag);
+      }
+      InspectableObject nextRow = segments[current].getNextRow();
+      if (nextRow != null) {
+        if (keys[current] == null) {
+          keys[current] = new ObjectPair<List<Object>, InspectableObject>();
+        }
+        // todo this should be changed to be evaluated lazily, especially for single segment case
+        keys[current].setFirst(JoinUtil.computeKeys(nextRow.o, keyFields, keyFieldOIs));
+        keys[current].setSecond(nextRow);
+        return true;
+      }
+      keys[current] = null;
+      return false;
+    }
+  }
 }

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SortedMergeBucketMapJoinOptimizer.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SortedMergeBucketMapJoinOptimizer.java?rev=1382098&r1=1382097&r2=1382098&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SortedMergeBucketMapJoinOptimizer.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SortedMergeBucketMapJoinOptimizer.java Fri Sep  7 17:40:14 2012
@@ -242,15 +242,6 @@ public class SortedMergeBucketMapJoinOpt
       if (tso == null) {
         return false;
       }
-      if (pos != op.getConf().getPosBigTable()) {
-        // currently, a file from a big table can be joined with only 1 file from a small table
-        for (List<String> files :
-            op.getConf().getAliasBucketFileNameMapping().get(alias).values()) {
-          if (files != null && files.size() > 1) {
-            return false;
-          }
-        }
-      }
 
       List<ExprNodeDesc> keys = op.getConf().getKeys().get((byte) pos);
       // get all join columns from join keys stored in MapJoinDesc

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/FetchWork.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/FetchWork.java?rev=1382098&r1=1382097&r2=1382098&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/FetchWork.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/FetchWork.java Fri Sep  7 17:40:14 2012
@@ -193,6 +193,17 @@ public class FetchWork implements Serial
   }
 
   /**
+   * @return the partDescs for paths
+   */
+  public List<PartitionDesc> getPartDescs(List<Path> paths) {
+    List<PartitionDesc> parts = new ArrayList<PartitionDesc>(paths.size());
+    for (Path path : paths) {
+      parts.add(partDesc.get(partDir.indexOf(path.getParent().toString())));
+    }
+    return parts;
+  }
+
+  /**
    * @param partDesc
    *          the partDesc to set
    */

Added: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/util/ObjectPair.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/util/ObjectPair.java?rev=1382098&view=auto
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/util/ObjectPair.java (added)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/util/ObjectPair.java Fri Sep  7 17:40:14 2012
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.util;
+
+public class ObjectPair<F, S> {
+  private F first;
+  private S second;
+
+  public ObjectPair() {}
+
+  public ObjectPair(F first, S second) {
+    this.first = first;
+    this.second = second;
+  }
+
+  public F getFirst() {
+    return first;
+  }
+
+  public void setFirst(F first) {
+    this.first = first;
+  }
+
+  public S getSecond() {
+    return second;
+  }
+
+  public void setSecond(S second) {
+    this.second = second;
+  }
+}

Propchange: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/util/ObjectPair.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: hive/trunk/ql/src/test/queries/clientpositive/bucketcontext_1.q
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/queries/clientpositive/bucketcontext_1.q?rev=1382098&r1=1382097&r2=1382098&view=diff
==============================================================================
--- hive/trunk/ql/src/test/queries/clientpositive/bucketcontext_1.q (original)
+++ hive/trunk/ql/src/test/queries/clientpositive/bucketcontext_1.q Fri Sep  7 17:40:14 2012
@@ -19,6 +19,5 @@ explain extended select /* + MAPJOIN(a) 
 select /* + MAPJOIN(a) */ count(*) FROM bucket_small a JOIN bucket_big b ON a.key = b.key;
 
 set hive.optimize.bucketmapjoin.sortedmerge = true;
-set hive.input.format = org.apache.hadoop.hive.ql.io.BucketizedHiveInputFormat;
 explain extended select /* + MAPJOIN(a) */ count(*) FROM bucket_small a JOIN bucket_big b ON a.key = b.key;
 select /* + MAPJOIN(a) */ count(*) FROM bucket_small a JOIN bucket_big b ON a.key = b.key;

Modified: hive/trunk/ql/src/test/queries/clientpositive/bucketcontext_2.q
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/queries/clientpositive/bucketcontext_2.q?rev=1382098&r1=1382097&r2=1382098&view=diff
==============================================================================
--- hive/trunk/ql/src/test/queries/clientpositive/bucketcontext_2.q (original)
+++ hive/trunk/ql/src/test/queries/clientpositive/bucketcontext_2.q Fri Sep  7 17:40:14 2012
@@ -17,6 +17,5 @@ explain extended select /* + MAPJOIN(a) 
 select /* + MAPJOIN(a) */ count(*) FROM bucket_small a JOIN bucket_big b ON a.key = b.key;
 
 set hive.optimize.bucketmapjoin.sortedmerge = true;
-set hive.input.format = org.apache.hadoop.hive.ql.io.BucketizedHiveInputFormat;
 explain extended select /* + MAPJOIN(a) */ count(*) FROM bucket_small a JOIN bucket_big b ON a.key = b.key;
-
+select /* + MAPJOIN(a) */ count(*) FROM bucket_small a JOIN bucket_big b ON a.key = b.key;

Modified: hive/trunk/ql/src/test/queries/clientpositive/bucketcontext_3.q
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/queries/clientpositive/bucketcontext_3.q?rev=1382098&r1=1382097&r2=1382098&view=diff
==============================================================================
--- hive/trunk/ql/src/test/queries/clientpositive/bucketcontext_3.q (original)
+++ hive/trunk/ql/src/test/queries/clientpositive/bucketcontext_3.q Fri Sep  7 17:40:14 2012
@@ -17,6 +17,5 @@ explain extended select /* + MAPJOIN(a) 
 select /* + MAPJOIN(a) */ count(*) FROM bucket_small a JOIN bucket_big b ON a.key = b.key;
 
 set hive.optimize.bucketmapjoin.sortedmerge = true;
-set hive.input.format = org.apache.hadoop.hive.ql.io.BucketizedHiveInputFormat;
 explain extended select /* + MAPJOIN(a) */ count(*) FROM bucket_small a JOIN bucket_big b ON a.key = b.key;
-
+select /* + MAPJOIN(a) */ count(*) FROM bucket_small a JOIN bucket_big b ON a.key = b.key;

Modified: hive/trunk/ql/src/test/queries/clientpositive/bucketcontext_4.q
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/queries/clientpositive/bucketcontext_4.q?rev=1382098&r1=1382097&r2=1382098&view=diff
==============================================================================
--- hive/trunk/ql/src/test/queries/clientpositive/bucketcontext_4.q (original)
+++ hive/trunk/ql/src/test/queries/clientpositive/bucketcontext_4.q Fri Sep  7 17:40:14 2012
@@ -19,7 +19,5 @@ explain extended select /* + MAPJOIN(a) 
 select /* + MAPJOIN(a) */ count(*) FROM bucket_small a JOIN bucket_big b ON a.key = b.key;
 
 set hive.optimize.bucketmapjoin.sortedmerge = true;
-set hive.input.format = org.apache.hadoop.hive.ql.io.BucketizedHiveInputFormat;
 explain extended select /* + MAPJOIN(a) */ count(*) FROM bucket_small a JOIN bucket_big b ON a.key = b.key;
-
-
+select /* + MAPJOIN(a) */ count(*) FROM bucket_small a JOIN bucket_big b ON a.key = b.key;

Added: hive/trunk/ql/src/test/queries/clientpositive/bucketcontext_5.q
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/queries/clientpositive/bucketcontext_5.q?rev=1382098&view=auto
==============================================================================
--- hive/trunk/ql/src/test/queries/clientpositive/bucketcontext_5.q (added)
+++ hive/trunk/ql/src/test/queries/clientpositive/bucketcontext_5.q Fri Sep  7 17:40:14 2012
@@ -0,0 +1,18 @@
+-- small no part, 4 bucket & big no part, 2 bucket
+CREATE TABLE bucket_small (key string, value string) CLUSTERED BY (key) SORTED BY (key) INTO 4 BUCKETS STORED AS TEXTFILE;
+load data local inpath '../data/files/srcsortbucket1outof4.txt' INTO TABLE bucket_small;
+load data local inpath '../data/files/srcsortbucket2outof4.txt' INTO TABLE bucket_small;
+load data local inpath '../data/files/srcsortbucket3outof4.txt' INTO TABLE bucket_small;
+load data local inpath '../data/files/srcsortbucket4outof4.txt' INTO TABLE bucket_small;
+
+CREATE TABLE bucket_big (key string, value string) CLUSTERED BY (key) SORTED BY (key) INTO 2 BUCKETS STORED AS TEXTFILE;
+load data local inpath '../data/files/srcsortbucket1outof4.txt' INTO TABLE bucket_big;
+load data local inpath '../data/files/srcsortbucket2outof4.txt' INTO TABLE bucket_big;
+
+set hive.optimize.bucketmapjoin = true;
+explain extended select /* + MAPJOIN(a) */ count(*) FROM bucket_small a JOIN bucket_big b ON a.key = b.key;
+select /* + MAPJOIN(a) */ count(*) FROM bucket_small a JOIN bucket_big b ON a.key = b.key;
+
+set hive.optimize.bucketmapjoin.sortedmerge = true;
+explain extended select /* + MAPJOIN(a) */ count(*) FROM bucket_small a JOIN bucket_big b ON a.key = b.key;
+select /* + MAPJOIN(a) */ count(*) FROM bucket_small a JOIN bucket_big b ON a.key = b.key;

Added: hive/trunk/ql/src/test/queries/clientpositive/bucketcontext_6.q
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/queries/clientpositive/bucketcontext_6.q?rev=1382098&view=auto
==============================================================================
--- hive/trunk/ql/src/test/queries/clientpositive/bucketcontext_6.q (added)
+++ hive/trunk/ql/src/test/queries/clientpositive/bucketcontext_6.q Fri Sep  7 17:40:14 2012
@@ -0,0 +1,21 @@
+-- small no part, 4 bucket & big 2 part, 2 bucket
+CREATE TABLE bucket_small (key string, value string) CLUSTERED BY (key) SORTED BY (key) INTO 4 BUCKETS STORED AS TEXTFILE;
+load data local inpath '../data/files/srcsortbucket1outof4.txt' INTO TABLE bucket_small;
+load data local inpath '../data/files/srcsortbucket2outof4.txt' INTO TABLE bucket_small;
+load data local inpath '../data/files/srcsortbucket3outof4.txt' INTO TABLE bucket_small;
+load data local inpath '../data/files/srcsortbucket4outof4.txt' INTO TABLE bucket_small;
+
+CREATE TABLE bucket_big (key string, value string) partitioned by (ds string) CLUSTERED BY (key) SORTED BY (key) INTO 2 BUCKETS STORED AS TEXTFILE;
+load data local inpath '../data/files/srcsortbucket1outof4.txt' INTO TABLE bucket_big partition(ds='2008-04-08');
+load data local inpath '../data/files/srcsortbucket2outof4.txt' INTO TABLE bucket_big partition(ds='2008-04-08');
+
+load data local inpath '../data/files/srcsortbucket1outof4.txt' INTO TABLE bucket_big partition(ds='2008-04-09');
+load data local inpath '../data/files/srcsortbucket2outof4.txt' INTO TABLE bucket_big partition(ds='2008-04-09');
+
+set hive.optimize.bucketmapjoin = true;
+explain extended select /* + MAPJOIN(a) */ count(*) FROM bucket_small a JOIN bucket_big b ON a.key = b.key;
+select /* + MAPJOIN(a) */ count(*) FROM bucket_small a JOIN bucket_big b ON a.key = b.key;
+
+set hive.optimize.bucketmapjoin.sortedmerge = true;
+explain extended select /* + MAPJOIN(a) */ count(*) FROM bucket_small a JOIN bucket_big b ON a.key = b.key;
+select /* + MAPJOIN(a) */ count(*) FROM bucket_small a JOIN bucket_big b ON a.key = b.key;

Modified: hive/trunk/ql/src/test/results/clientpositive/bucketcontext_2.q.out
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/results/clientpositive/bucketcontext_2.q.out?rev=1382098&r1=1382097&r2=1382098&view=diff
==============================================================================
--- hive/trunk/ql/src/test/results/clientpositive/bucketcontext_2.q.out (original)
+++ hive/trunk/ql/src/test/results/clientpositive/bucketcontext_2.q.out Fri Sep  7 17:40:14 2012
@@ -337,40 +337,11 @@ ABSTRACT SYNTAX TREE:
   (TOK_QUERY (TOK_FROM (TOK_JOIN (TOK_TABREF (TOK_TABNAME bucket_small) a) (TOK_TABREF (TOK_TABNAME bucket_big) b) (= (. (TOK_TABLE_OR_COL a) key) (. (TOK_TABLE_OR_COL b) key)))) (TOK_INSERT (TOK_DESTINATION (TOK_DIR TOK_TMP_FILE)) (TOK_SELECT (TOK_HINTLIST (TOK_HINT TOK_MAPJOIN (TOK_HINTARGLIST a))) (TOK_SELEXPR (TOK_FUNCTIONSTAR count)))))
 
 STAGE DEPENDENCIES:
-  Stage-4 is a root stage
-  Stage-1 depends on stages: Stage-4
+  Stage-1 is a root stage
   Stage-2 depends on stages: Stage-1
   Stage-0 is a root stage
 
 STAGE PLANS:
-  Stage: Stage-4
-    Map Reduce Local Work
-      Alias -> Map Local Tables:
-        a 
-          Fetch Operator
-            limit: -1
-      Alias -> Map Local Operator Tree:
-        a 
-          TableScan
-            alias: a
-            GatherStats: false
-            HashTable Sink Operator
-              condition expressions:
-                0 
-                1 
-              handleSkewJoin: false
-              keys:
-                0 [Column[key]]
-                1 [Column[key]]
-              Position of Big Table: 1
-      Bucket Mapjoin Context:
-          Alias Bucket Base File Name Mapping:
-            a {ds=2008-04-08/srcsortbucket1outof4.txt=[ds=2008-04-08/srcsortbucket1outof4.txt, ds=2008-04-08/srcsortbucket3outof4.txt], ds=2008-04-08/srcsortbucket2outof4.txt=[ds=2008-04-08/srcsortbucket2outof4.txt, ds=2008-04-08/srcsortbucket4outof4.txt], ds=2008-04-09/srcsortbucket1outof4.txt=[ds=2008-04-08/srcsortbucket1outof4.txt, ds=2008-04-08/srcsortbucket3outof4.txt], ds=2008-04-09/srcsortbucket2outof4.txt=[ds=2008-04-08/srcsortbucket2outof4.txt, ds=2008-04-08/srcsortbucket4outof4.txt]}
-          Alias Bucket File Name Mapping:
-#### A masked pattern was here ####
-          Alias Bucket Output File Name Mapping:
-#### A masked pattern was here ####
-
   Stage: Stage-1
     Map Reduce
       Alias -> Map Operator Tree:
@@ -378,7 +349,7 @@ STAGE PLANS:
           TableScan
             alias: b
             GatherStats: false
-            Map Join Operator
+            Sorted Merge Bucket Map Join Operator
               condition map:
                    Inner Join 0 to 1
               condition expressions:
@@ -404,8 +375,6 @@ STAGE PLANS:
                 TotalFiles: 1
                 GatherStats: false
                 MultiFileSpray: false
-      Local Work:
-        Map Reduce Local Work
       Needs Tagging: false
       Path -> Alias:
 #### A masked pattern was here ####
@@ -584,3 +553,16 @@ STAGE PLANS:
       limit: -1
 
 
+PREHOOK: query: select /* + MAPJOIN(a) */ count(*) FROM bucket_small a JOIN bucket_big b ON a.key = b.key
+PREHOOK: type: QUERY
+PREHOOK: Input: default@bucket_big@ds=2008-04-08
+PREHOOK: Input: default@bucket_big@ds=2008-04-09
+PREHOOK: Input: default@bucket_small@ds=2008-04-08
+#### A masked pattern was here ####
+POSTHOOK: query: select /* + MAPJOIN(a) */ count(*) FROM bucket_small a JOIN bucket_big b ON a.key = b.key
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@bucket_big@ds=2008-04-08
+POSTHOOK: Input: default@bucket_big@ds=2008-04-09
+POSTHOOK: Input: default@bucket_small@ds=2008-04-08
+#### A masked pattern was here ####
+928

Modified: hive/trunk/ql/src/test/results/clientpositive/bucketcontext_3.q.out
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/results/clientpositive/bucketcontext_3.q.out?rev=1382098&r1=1382097&r2=1382098&view=diff
==============================================================================
--- hive/trunk/ql/src/test/results/clientpositive/bucketcontext_3.q.out (original)
+++ hive/trunk/ql/src/test/results/clientpositive/bucketcontext_3.q.out Fri Sep  7 17:40:14 2012
@@ -287,40 +287,11 @@ ABSTRACT SYNTAX TREE:
   (TOK_QUERY (TOK_FROM (TOK_JOIN (TOK_TABREF (TOK_TABNAME bucket_small) a) (TOK_TABREF (TOK_TABNAME bucket_big) b) (= (. (TOK_TABLE_OR_COL a) key) (. (TOK_TABLE_OR_COL b) key)))) (TOK_INSERT (TOK_DESTINATION (TOK_DIR TOK_TMP_FILE)) (TOK_SELECT (TOK_HINTLIST (TOK_HINT TOK_MAPJOIN (TOK_HINTARGLIST a))) (TOK_SELEXPR (TOK_FUNCTIONSTAR count)))))
 
 STAGE DEPENDENCIES:
-  Stage-4 is a root stage
-  Stage-1 depends on stages: Stage-4
+  Stage-1 is a root stage
   Stage-2 depends on stages: Stage-1
   Stage-0 is a root stage
 
 STAGE PLANS:
-  Stage: Stage-4
-    Map Reduce Local Work
-      Alias -> Map Local Tables:
-        a 
-          Fetch Operator
-            limit: -1
-      Alias -> Map Local Operator Tree:
-        a 
-          TableScan
-            alias: a
-            GatherStats: false
-            HashTable Sink Operator
-              condition expressions:
-                0 
-                1 
-              handleSkewJoin: false
-              keys:
-                0 [Column[key]]
-                1 [Column[key]]
-              Position of Big Table: 1
-      Bucket Mapjoin Context:
-          Alias Bucket Base File Name Mapping:
-            a {ds=2008-04-08/srcsortbucket1outof4.txt=[ds=2008-04-08/srcsortbucket1outof4.txt, ds=2008-04-09/srcsortbucket1outof4.txt], ds=2008-04-08/srcsortbucket2outof4.txt=[ds=2008-04-08/srcsortbucket2outof4.txt, ds=2008-04-09/srcsortbucket2outof4.txt], ds=2008-04-08/srcsortbucket3outof4.txt=[ds=2008-04-08/srcsortbucket1outof4.txt, ds=2008-04-09/srcsortbucket1outof4.txt], ds=2008-04-08/srcsortbucket4outof4.txt=[ds=2008-04-08/srcsortbucket2outof4.txt, ds=2008-04-09/srcsortbucket2outof4.txt]}
-          Alias Bucket File Name Mapping:
-#### A masked pattern was here ####
-          Alias Bucket Output File Name Mapping:
-#### A masked pattern was here ####
-
   Stage: Stage-1
     Map Reduce
       Alias -> Map Operator Tree:
@@ -328,7 +299,7 @@ STAGE PLANS:
           TableScan
             alias: b
             GatherStats: false
-            Map Join Operator
+            Sorted Merge Bucket Map Join Operator
               condition map:
                    Inner Join 0 to 1
               condition expressions:
@@ -354,8 +325,6 @@ STAGE PLANS:
                 TotalFiles: 1
                 GatherStats: false
                 MultiFileSpray: false
-      Local Work:
-        Map Reduce Local Work
       Needs Tagging: false
       Path -> Alias:
 #### A masked pattern was here ####
@@ -484,3 +453,16 @@ STAGE PLANS:
       limit: -1
 
 
+PREHOOK: query: select /* + MAPJOIN(a) */ count(*) FROM bucket_small a JOIN bucket_big b ON a.key = b.key
+PREHOOK: type: QUERY
+PREHOOK: Input: default@bucket_big@ds=2008-04-08
+PREHOOK: Input: default@bucket_small@ds=2008-04-08
+PREHOOK: Input: default@bucket_small@ds=2008-04-09
+#### A masked pattern was here ####
+POSTHOOK: query: select /* + MAPJOIN(a) */ count(*) FROM bucket_small a JOIN bucket_big b ON a.key = b.key
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@bucket_big@ds=2008-04-08
+POSTHOOK: Input: default@bucket_small@ds=2008-04-08
+POSTHOOK: Input: default@bucket_small@ds=2008-04-09
+#### A masked pattern was here ####
+928

Modified: hive/trunk/ql/src/test/results/clientpositive/bucketcontext_4.q.out
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/results/clientpositive/bucketcontext_4.q.out?rev=1382098&r1=1382097&r2=1382098&view=diff
==============================================================================
--- hive/trunk/ql/src/test/results/clientpositive/bucketcontext_4.q.out (original)
+++ hive/trunk/ql/src/test/results/clientpositive/bucketcontext_4.q.out Fri Sep  7 17:40:14 2012
@@ -299,40 +299,11 @@ ABSTRACT SYNTAX TREE:
   (TOK_QUERY (TOK_FROM (TOK_JOIN (TOK_TABREF (TOK_TABNAME bucket_small) a) (TOK_TABREF (TOK_TABNAME bucket_big) b) (= (. (TOK_TABLE_OR_COL a) key) (. (TOK_TABLE_OR_COL b) key)))) (TOK_INSERT (TOK_DESTINATION (TOK_DIR TOK_TMP_FILE)) (TOK_SELECT (TOK_HINTLIST (TOK_HINT TOK_MAPJOIN (TOK_HINTARGLIST a))) (TOK_SELEXPR (TOK_FUNCTIONSTAR count)))))
 
 STAGE DEPENDENCIES:
-  Stage-4 is a root stage
-  Stage-1 depends on stages: Stage-4
+  Stage-1 is a root stage
   Stage-2 depends on stages: Stage-1
   Stage-0 is a root stage
 
 STAGE PLANS:
-  Stage: Stage-4
-    Map Reduce Local Work
-      Alias -> Map Local Tables:
-        a 
-          Fetch Operator
-            limit: -1
-      Alias -> Map Local Operator Tree:
-        a 
-          TableScan
-            alias: a
-            GatherStats: false
-            HashTable Sink Operator
-              condition expressions:
-                0 
-                1 
-              handleSkewJoin: false
-              keys:
-                0 [Column[key]]
-                1 [Column[key]]
-              Position of Big Table: 1
-      Bucket Mapjoin Context:
-          Alias Bucket Base File Name Mapping:
-            a {ds=2008-04-08/srcsortbucket1outof4.txt=[ds=2008-04-08/srcsortbucket1outof4.txt, ds=2008-04-08/srcsortbucket3outof4.txt, ds=2008-04-09/srcsortbucket1outof4.txt, ds=2008-04-09/srcsortbucket3outof4.txt], ds=2008-04-08/srcsortbucket2outof4.txt=[ds=2008-04-08/srcsortbucket2outof4.txt, ds=2008-04-08/srcsortbucket4outof4.txt, ds=2008-04-09/srcsortbucket2outof4.txt, ds=2008-04-09/srcsortbucket4outof4.txt]}
-          Alias Bucket File Name Mapping:
-#### A masked pattern was here ####
-          Alias Bucket Output File Name Mapping:
-#### A masked pattern was here ####
-
   Stage: Stage-1
     Map Reduce
       Alias -> Map Operator Tree:
@@ -340,7 +311,7 @@ STAGE PLANS:
           TableScan
             alias: b
             GatherStats: false
-            Map Join Operator
+            Sorted Merge Bucket Map Join Operator
               condition map:
                    Inner Join 0 to 1
               condition expressions:
@@ -366,8 +337,6 @@ STAGE PLANS:
                 TotalFiles: 1
                 GatherStats: false
                 MultiFileSpray: false
-      Local Work:
-        Map Reduce Local Work
       Needs Tagging: false
       Path -> Alias:
 #### A masked pattern was here ####
@@ -496,3 +465,16 @@ STAGE PLANS:
       limit: -1
 
 
+PREHOOK: query: select /* + MAPJOIN(a) */ count(*) FROM bucket_small a JOIN bucket_big b ON a.key = b.key
+PREHOOK: type: QUERY
+PREHOOK: Input: default@bucket_big@ds=2008-04-08
+PREHOOK: Input: default@bucket_small@ds=2008-04-08
+PREHOOK: Input: default@bucket_small@ds=2008-04-09
+#### A masked pattern was here ####
+POSTHOOK: query: select /* + MAPJOIN(a) */ count(*) FROM bucket_small a JOIN bucket_big b ON a.key = b.key
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@bucket_big@ds=2008-04-08
+POSTHOOK: Input: default@bucket_small@ds=2008-04-08
+POSTHOOK: Input: default@bucket_small@ds=2008-04-09
+#### A masked pattern was here ####
+928

Added: hive/trunk/ql/src/test/results/clientpositive/bucketcontext_5.q.out
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/results/clientpositive/bucketcontext_5.q.out?rev=1382098&view=auto
==============================================================================
--- hive/trunk/ql/src/test/results/clientpositive/bucketcontext_5.q.out (added)
+++ hive/trunk/ql/src/test/results/clientpositive/bucketcontext_5.q.out Fri Sep  7 17:40:14 2012
@@ -0,0 +1,441 @@
+PREHOOK: query: -- small no part, 4 bucket & big no part, 2 bucket
+CREATE TABLE bucket_small (key string, value string) CLUSTERED BY (key) SORTED BY (key) INTO 4 BUCKETS STORED AS TEXTFILE
+PREHOOK: type: CREATETABLE
+POSTHOOK: query: -- small no part, 4 bucket & big no part, 2 bucket
+CREATE TABLE bucket_small (key string, value string) CLUSTERED BY (key) SORTED BY (key) INTO 4 BUCKETS STORED AS TEXTFILE
+POSTHOOK: type: CREATETABLE
+POSTHOOK: Output: default@bucket_small
+PREHOOK: query: load data local inpath '../data/files/srcsortbucket1outof4.txt' INTO TABLE bucket_small
+PREHOOK: type: LOAD
+PREHOOK: Output: default@bucket_small
+POSTHOOK: query: load data local inpath '../data/files/srcsortbucket1outof4.txt' INTO TABLE bucket_small
+POSTHOOK: type: LOAD
+POSTHOOK: Output: default@bucket_small
+PREHOOK: query: load data local inpath '../data/files/srcsortbucket2outof4.txt' INTO TABLE bucket_small
+PREHOOK: type: LOAD
+PREHOOK: Output: default@bucket_small
+POSTHOOK: query: load data local inpath '../data/files/srcsortbucket2outof4.txt' INTO TABLE bucket_small
+POSTHOOK: type: LOAD
+POSTHOOK: Output: default@bucket_small
+PREHOOK: query: load data local inpath '../data/files/srcsortbucket3outof4.txt' INTO TABLE bucket_small
+PREHOOK: type: LOAD
+PREHOOK: Output: default@bucket_small
+POSTHOOK: query: load data local inpath '../data/files/srcsortbucket3outof4.txt' INTO TABLE bucket_small
+POSTHOOK: type: LOAD
+POSTHOOK: Output: default@bucket_small
+PREHOOK: query: load data local inpath '../data/files/srcsortbucket4outof4.txt' INTO TABLE bucket_small
+PREHOOK: type: LOAD
+PREHOOK: Output: default@bucket_small
+POSTHOOK: query: load data local inpath '../data/files/srcsortbucket4outof4.txt' INTO TABLE bucket_small
+POSTHOOK: type: LOAD
+POSTHOOK: Output: default@bucket_small
+PREHOOK: query: CREATE TABLE bucket_big (key string, value string) CLUSTERED BY (key) SORTED BY (key) INTO 2 BUCKETS STORED AS TEXTFILE
+PREHOOK: type: CREATETABLE
+POSTHOOK: query: CREATE TABLE bucket_big (key string, value string) CLUSTERED BY (key) SORTED BY (key) INTO 2 BUCKETS STORED AS TEXTFILE
+POSTHOOK: type: CREATETABLE
+POSTHOOK: Output: default@bucket_big
+PREHOOK: query: load data local inpath '../data/files/srcsortbucket1outof4.txt' INTO TABLE bucket_big
+PREHOOK: type: LOAD
+PREHOOK: Output: default@bucket_big
+POSTHOOK: query: load data local inpath '../data/files/srcsortbucket1outof4.txt' INTO TABLE bucket_big
+POSTHOOK: type: LOAD
+POSTHOOK: Output: default@bucket_big
+PREHOOK: query: load data local inpath '../data/files/srcsortbucket2outof4.txt' INTO TABLE bucket_big
+PREHOOK: type: LOAD
+PREHOOK: Output: default@bucket_big
+POSTHOOK: query: load data local inpath '../data/files/srcsortbucket2outof4.txt' INTO TABLE bucket_big
+POSTHOOK: type: LOAD
+POSTHOOK: Output: default@bucket_big
+PREHOOK: query: explain extended select /* + MAPJOIN(a) */ count(*) FROM bucket_small a JOIN bucket_big b ON a.key = b.key
+PREHOOK: type: QUERY
+POSTHOOK: query: explain extended select /* + MAPJOIN(a) */ count(*) FROM bucket_small a JOIN bucket_big b ON a.key = b.key
+POSTHOOK: type: QUERY
+ABSTRACT SYNTAX TREE:
+  (TOK_QUERY (TOK_FROM (TOK_JOIN (TOK_TABREF (TOK_TABNAME bucket_small) a) (TOK_TABREF (TOK_TABNAME bucket_big) b) (= (. (TOK_TABLE_OR_COL a) key) (. (TOK_TABLE_OR_COL b) key)))) (TOK_INSERT (TOK_DESTINATION (TOK_DIR TOK_TMP_FILE)) (TOK_SELECT (TOK_HINTLIST (TOK_HINT TOK_MAPJOIN (TOK_HINTARGLIST a))) (TOK_SELEXPR (TOK_FUNCTIONSTAR count)))))
+
+STAGE DEPENDENCIES:
+  Stage-4 is a root stage
+  Stage-1 depends on stages: Stage-4
+  Stage-2 depends on stages: Stage-1
+  Stage-0 is a root stage
+
+STAGE PLANS:
+  Stage: Stage-4
+    Map Reduce Local Work
+      Alias -> Map Local Tables:
+        a 
+          Fetch Operator
+            limit: -1
+      Alias -> Map Local Operator Tree:
+        a 
+          TableScan
+            alias: a
+            GatherStats: false
+            HashTable Sink Operator
+              condition expressions:
+                0 
+                1 
+              handleSkewJoin: false
+              keys:
+                0 [Column[key]]
+                1 [Column[key]]
+              Position of Big Table: 1
+      Bucket Mapjoin Context:
+          Alias Bucket Base File Name Mapping:
+            a {srcsortbucket1outof4.txt=[srcsortbucket1outof4.txt, srcsortbucket3outof4.txt], srcsortbucket2outof4.txt=[srcsortbucket2outof4.txt, srcsortbucket4outof4.txt]}
+          Alias Bucket File Name Mapping:
+#### A masked pattern was here ####
+          Alias Bucket Output File Name Mapping:
+#### A masked pattern was here ####
+
+  Stage: Stage-1
+    Map Reduce
+      Alias -> Map Operator Tree:
+        b 
+          TableScan
+            alias: b
+            GatherStats: false
+            Map Join Operator
+              condition map:
+                   Inner Join 0 to 1
+              condition expressions:
+                0 
+                1 
+              handleSkewJoin: false
+              keys:
+                0 [Column[key]]
+                1 [Column[key]]
+              Position of Big Table: 1
+              File Output Operator
+                compressed: false
+                GlobalTableId: 0
+#### A masked pattern was here ####
+                NumFilesPerFileSink: 1
+                table:
+                    input format: org.apache.hadoop.mapred.SequenceFileInputFormat
+                    output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
+                    properties:
+                      columns 
+                      columns.types 
+                      escape.delim \
+                TotalFiles: 1
+                GatherStats: false
+                MultiFileSpray: false
+      Local Work:
+        Map Reduce Local Work
+      Needs Tagging: false
+      Path -> Alias:
+#### A masked pattern was here ####
+      Path -> Partition:
+#### A masked pattern was here ####
+          Partition
+            base file name: bucket_big
+            input format: org.apache.hadoop.mapred.TextInputFormat
+            output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
+            properties:
+              SORTBUCKETCOLSPREFIX TRUE
+              bucket_count 2
+              bucket_field_name key
+              columns key,value
+              columns.types string:string
+#### A masked pattern was here ####
+              name default.bucket_big
+              numFiles 2
+              numPartitions 0
+              numRows 0
+              rawDataSize 0
+              serialization.ddl struct bucket_big { string key, string value}
+              serialization.format 1
+              serialization.lib org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
+              totalSize 2750
+#### A masked pattern was here ####
+            serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
+          
+              input format: org.apache.hadoop.mapred.TextInputFormat
+              output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
+              properties:
+                SORTBUCKETCOLSPREFIX TRUE
+                bucket_count 2
+                bucket_field_name key
+                columns key,value
+                columns.types string:string
+#### A masked pattern was here ####
+                name default.bucket_big
+                numFiles 2
+                numPartitions 0
+                numRows 0
+                rawDataSize 0
+                serialization.ddl struct bucket_big { string key, string value}
+                serialization.format 1
+                serialization.lib org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
+                totalSize 2750
+#### A masked pattern was here ####
+              serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
+              name: default.bucket_big
+            name: default.bucket_big
+
+  Stage: Stage-2
+    Map Reduce
+      Alias -> Map Operator Tree:
+#### A masked pattern was here ####
+          Select Operator
+            Select Operator
+              Group By Operator
+                aggregations:
+                      expr: count()
+                bucketGroup: false
+                mode: hash
+                outputColumnNames: _col0
+                Reduce Output Operator
+                  sort order: 
+                  tag: -1
+                  value expressions:
+                        expr: _col0
+                        type: bigint
+      Needs Tagging: false
+      Path -> Alias:
+#### A masked pattern was here ####
+      Path -> Partition:
+#### A masked pattern was here ####
+          Partition
+            base file name: -mr-10002
+            input format: org.apache.hadoop.mapred.SequenceFileInputFormat
+            output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
+            properties:
+              columns 
+              columns.types 
+              escape.delim \
+          
+              input format: org.apache.hadoop.mapred.SequenceFileInputFormat
+              output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
+              properties:
+                columns 
+                columns.types 
+                escape.delim \
+      Reduce Operator Tree:
+        Group By Operator
+          aggregations:
+                expr: count(VALUE._col0)
+          bucketGroup: false
+          mode: mergepartial
+          outputColumnNames: _col0
+          Select Operator
+            expressions:
+                  expr: _col0
+                  type: bigint
+            outputColumnNames: _col0
+            File Output Operator
+              compressed: false
+              GlobalTableId: 0
+#### A masked pattern was here ####
+              NumFilesPerFileSink: 1
+#### A masked pattern was here ####
+              table:
+                  input format: org.apache.hadoop.mapred.TextInputFormat
+                  output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
+                  properties:
+                    columns _col0
+                    columns.types bigint
+                    escape.delim \
+                    serialization.format 1
+              TotalFiles: 1
+              GatherStats: false
+              MultiFileSpray: false
+
+  Stage: Stage-0
+    Fetch Operator
+      limit: -1
+
+
+PREHOOK: query: select /* + MAPJOIN(a) */ count(*) FROM bucket_small a JOIN bucket_big b ON a.key = b.key
+PREHOOK: type: QUERY
+PREHOOK: Input: default@bucket_big
+PREHOOK: Input: default@bucket_small
+#### A masked pattern was here ####
+POSTHOOK: query: select /* + MAPJOIN(a) */ count(*) FROM bucket_small a JOIN bucket_big b ON a.key = b.key
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@bucket_big
+POSTHOOK: Input: default@bucket_small
+#### A masked pattern was here ####
+464
+PREHOOK: query: explain extended select /* + MAPJOIN(a) */ count(*) FROM bucket_small a JOIN bucket_big b ON a.key = b.key
+PREHOOK: type: QUERY
+POSTHOOK: query: explain extended select /* + MAPJOIN(a) */ count(*) FROM bucket_small a JOIN bucket_big b ON a.key = b.key
+POSTHOOK: type: QUERY
+ABSTRACT SYNTAX TREE:
+  (TOK_QUERY (TOK_FROM (TOK_JOIN (TOK_TABREF (TOK_TABNAME bucket_small) a) (TOK_TABREF (TOK_TABNAME bucket_big) b) (= (. (TOK_TABLE_OR_COL a) key) (. (TOK_TABLE_OR_COL b) key)))) (TOK_INSERT (TOK_DESTINATION (TOK_DIR TOK_TMP_FILE)) (TOK_SELECT (TOK_HINTLIST (TOK_HINT TOK_MAPJOIN (TOK_HINTARGLIST a))) (TOK_SELEXPR (TOK_FUNCTIONSTAR count)))))
+
+STAGE DEPENDENCIES:
+  Stage-1 is a root stage
+  Stage-2 depends on stages: Stage-1
+  Stage-0 is a root stage
+
+STAGE PLANS:
+  Stage: Stage-1
+    Map Reduce
+      Alias -> Map Operator Tree:
+        b 
+          TableScan
+            alias: b
+            GatherStats: false
+            Sorted Merge Bucket Map Join Operator
+              condition map:
+                   Inner Join 0 to 1
+              condition expressions:
+                0 
+                1 
+              handleSkewJoin: false
+              keys:
+                0 [Column[key]]
+                1 [Column[key]]
+              Position of Big Table: 1
+              File Output Operator
+                compressed: false
+                GlobalTableId: 0
+#### A masked pattern was here ####
+                NumFilesPerFileSink: 1
+                table:
+                    input format: org.apache.hadoop.mapred.SequenceFileInputFormat
+                    output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
+                    properties:
+                      columns 
+                      columns.types 
+                      escape.delim \
+                TotalFiles: 1
+                GatherStats: false
+                MultiFileSpray: false
+      Needs Tagging: false
+      Path -> Alias:
+#### A masked pattern was here ####
+      Path -> Partition:
+#### A masked pattern was here ####
+          Partition
+            base file name: bucket_big
+            input format: org.apache.hadoop.mapred.TextInputFormat
+            output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
+            properties:
+              SORTBUCKETCOLSPREFIX TRUE
+              bucket_count 2
+              bucket_field_name key
+              columns key,value
+              columns.types string:string
+#### A masked pattern was here ####
+              name default.bucket_big
+              numFiles 2
+              numPartitions 0
+              numRows 0
+              rawDataSize 0
+              serialization.ddl struct bucket_big { string key, string value}
+              serialization.format 1
+              serialization.lib org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
+              totalSize 2750
+#### A masked pattern was here ####
+            serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
+          
+              input format: org.apache.hadoop.mapred.TextInputFormat
+              output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
+              properties:
+                SORTBUCKETCOLSPREFIX TRUE
+                bucket_count 2
+                bucket_field_name key
+                columns key,value
+                columns.types string:string
+#### A masked pattern was here ####
+                name default.bucket_big
+                numFiles 2
+                numPartitions 0
+                numRows 0
+                rawDataSize 0
+                serialization.ddl struct bucket_big { string key, string value}
+                serialization.format 1
+                serialization.lib org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
+                totalSize 2750
+#### A masked pattern was here ####
+              serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
+              name: default.bucket_big
+            name: default.bucket_big
+
+  Stage: Stage-2
+    Map Reduce
+      Alias -> Map Operator Tree:
+#### A masked pattern was here ####
+          Select Operator
+            Select Operator
+              Group By Operator
+                aggregations:
+                      expr: count()
+                bucketGroup: false
+                mode: hash
+                outputColumnNames: _col0
+                Reduce Output Operator
+                  sort order: 
+                  tag: -1
+                  value expressions:
+                        expr: _col0
+                        type: bigint
+      Needs Tagging: false
+      Path -> Alias:
+#### A masked pattern was here ####
+      Path -> Partition:
+#### A masked pattern was here ####
+          Partition
+            base file name: -mr-10002
+            input format: org.apache.hadoop.mapred.SequenceFileInputFormat
+            output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
+            properties:
+              columns 
+              columns.types 
+              escape.delim \
+          
+              input format: org.apache.hadoop.mapred.SequenceFileInputFormat
+              output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
+              properties:
+                columns 
+                columns.types 
+                escape.delim \
+      Reduce Operator Tree:
+        Group By Operator
+          aggregations:
+                expr: count(VALUE._col0)
+          bucketGroup: false
+          mode: mergepartial
+          outputColumnNames: _col0
+          Select Operator
+            expressions:
+                  expr: _col0
+                  type: bigint
+            outputColumnNames: _col0
+            File Output Operator
+              compressed: false
+              GlobalTableId: 0
+#### A masked pattern was here ####
+              NumFilesPerFileSink: 1
+#### A masked pattern was here ####
+              table:
+                  input format: org.apache.hadoop.mapred.TextInputFormat
+                  output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
+                  properties:
+                    columns _col0
+                    columns.types bigint
+                    escape.delim \
+                    serialization.format 1
+              TotalFiles: 1
+              GatherStats: false
+              MultiFileSpray: false
+
+  Stage: Stage-0
+    Fetch Operator
+      limit: -1
+
+
+PREHOOK: query: select /* + MAPJOIN(a) */ count(*) FROM bucket_small a JOIN bucket_big b ON a.key = b.key
+PREHOOK: type: QUERY
+PREHOOK: Input: default@bucket_big
+PREHOOK: Input: default@bucket_small
+#### A masked pattern was here ####
+POSTHOOK: query: select /* + MAPJOIN(a) */ count(*) FROM bucket_small a JOIN bucket_big b ON a.key = b.key
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@bucket_big
+POSTHOOK: Input: default@bucket_small
+#### A masked pattern was here ####
+464



Mime
View raw message