Return-Path: X-Original-To: apmail-hive-commits-archive@www.apache.org Delivered-To: apmail-hive-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 8E3F8D42E for ; Fri, 7 Sep 2012 17:41:07 +0000 (UTC) Received: (qmail 64972 invoked by uid 500); 7 Sep 2012 17:41:07 -0000 Delivered-To: apmail-hive-commits-archive@hive.apache.org Received: (qmail 64926 invoked by uid 500); 7 Sep 2012 17:41:07 -0000 Mailing-List: contact commits-help@hive.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: hive-dev@hive.apache.org Delivered-To: mailing list commits@hive.apache.org Received: (qmail 64917 invoked by uid 99); 7 Sep 2012 17:41:07 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 07 Sep 2012 17:41:07 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 07 Sep 2012 17:40:59 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id A249423888FE for ; Fri, 7 Sep 2012 17:40:16 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1382098 [1/2] - in /hive/trunk: ql/src/java/org/apache/hadoop/hive/ql/exec/ ql/src/java/org/apache/hadoop/hive/ql/optimizer/ ql/src/java/org/apache/hadoop/hive/ql/plan/ ql/src/java/org/apache/hadoop/hive/ql/util/ ql/src/test/queries/client... Date: Fri, 07 Sep 2012 17:40:15 -0000 To: commits@hive.apache.org From: cws@apache.org X-Mailer: svnmailer-1.0.8-patched Message-Id: <20120907174016.A249423888FE@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: cws Date: Fri Sep 7 17:40:14 2012 New Revision: 1382098 URL: http://svn.apache.org/viewvc?rev=1382098&view=rev Log: HIVE-3171. Bucketed sort merge join doesn't work when multiple files exist for small alias (Navis Ryu via cws) Added: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/util/ObjectPair.java (with props) hive/trunk/ql/src/test/queries/clientpositive/bucketcontext_5.q hive/trunk/ql/src/test/queries/clientpositive/bucketcontext_6.q hive/trunk/ql/src/test/results/clientpositive/bucketcontext_5.q.out hive/trunk/ql/src/test/results/clientpositive/bucketcontext_6.q.out Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecMapperContext.java hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchOperator.java hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapredLocalTask.java hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/SMBMapJoinOperator.java hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SortedMergeBucketMapJoinOptimizer.java hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/FetchWork.java hive/trunk/ql/src/test/queries/clientpositive/bucketcontext_1.q hive/trunk/ql/src/test/queries/clientpositive/bucketcontext_2.q hive/trunk/ql/src/test/queries/clientpositive/bucketcontext_3.q hive/trunk/ql/src/test/queries/clientpositive/bucketcontext_4.q hive/trunk/ql/src/test/results/clientpositive/bucketcontext_2.q.out hive/trunk/ql/src/test/results/clientpositive/bucketcontext_3.q.out hive/trunk/ql/src/test/results/clientpositive/bucketcontext_4.q.out hive/trunk/serde/src/java/org/apache/hadoop/hive/serde2/ColumnProjectionUtils.java Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecMapperContext.java URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecMapperContext.java?rev=1382098&r1=1382097&r2=1382098&view=diff ============================================================================== --- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecMapperContext.java (original) +++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecMapperContext.java Fri Sep 7 17:40:14 2012 @@ -17,18 +17,12 @@ */ package org.apache.hadoop.hive.ql.exec; -import java.util.Iterator; -import java.util.List; import java.util.Map; import org.apache.commons.logging.Log; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.io.IOContext; -import org.apache.hadoop.hive.ql.plan.BucketMapJoinContext; import org.apache.hadoop.hive.ql.plan.MapredLocalWork; import org.apache.hadoop.mapred.JobConf; -import org.apache.hadoop.util.ReflectionUtils; public class ExecMapperContext { @@ -110,26 +104,6 @@ public class ExecMapperContext { this.lastInputFile = lastInputFile; } - - private void setUpFetchOpContext(FetchOperator fetchOp, String alias) - throws Exception { - String currentInputFile = HiveConf.getVar(jc, - HiveConf.ConfVars.HADOOPMAPFILENAME); - BucketMapJoinContext bucketMatcherCxt = this.localWork - .getBucketMapjoinContext(); - Class bucketMatcherCls = bucketMatcherCxt - .getBucketMatcherClass(); - BucketMatcher bucketMatcher = (BucketMatcher) ReflectionUtils.newInstance( - bucketMatcherCls, null); - bucketMatcher.setAliasBucketFileNameMapping(bucketMatcherCxt - .getAliasBucketFileNameMapping()); - - List aliasFiles = bucketMatcher.getAliasBucketFiles(currentInputFile, - bucketMatcherCxt.getMapJoinBigTableAlias(), alias); - Iterator iter = aliasFiles.iterator(); - fetchOp.setupContext(iter, null); - } - public String getCurrentInputFile() { currentInputFile = this.ioCxt.getInputFile(); return currentInputFile; Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchOperator.java URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchOperator.java?rev=1382098&r1=1382097&r2=1382098&view=diff ============================================================================== --- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchOperator.java (original) +++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchOperator.java Fri Sep 7 17:40:14 2012 @@ -143,6 +143,10 @@ public class FetchOperator implements Se } else { isNativeTable = true; } + setupExecContext(); + } + + private void setupExecContext() { if (hasVC || work.getSplitSample() != null) { context = new ExecMapperContext(); if (operator != null) { @@ -536,6 +540,7 @@ public class FetchOperator implements Se context.clear(); context = null; } + this.currTbl = null; this.currPath = null; this.iterPath = null; this.iterPartDesc = null; @@ -546,21 +551,16 @@ public class FetchOperator implements Se } /** - * used for bucket map join. there is a hack for getting partitionDesc. bucket map join right now - * only allow one partition present in bucket map join. + * used for bucket map join */ - public void setupContext(Iterator iterPath, Iterator iterPartDesc) { - this.iterPath = iterPath; - this.iterPartDesc = iterPartDesc; - if (iterPartDesc == null) { - if (work.isNotPartitioned()) { - this.currTbl = work.getTblDesc(); - } else { - // hack, get the first. - List listParts = work.getPartDesc(); - currPart = listParts.isEmpty() ? null : listParts.get(0); - } + public void setupContext(List paths) { + this.iterPath = paths.iterator(); + if (work.isNotPartitioned()) { + this.currTbl = work.getTblDesc(); + } else { + this.iterPartDesc = work.getPartDescs(paths).iterator(); } + setupExecContext(); } /** Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapredLocalTask.java URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapredLocalTask.java?rev=1382098&r1=1382097&r2=1382098&view=diff ============================================================================== --- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapredLocalTask.java (original) +++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapredLocalTask.java Fri Sep 7 17:40:14 2012 @@ -28,7 +28,6 @@ import java.util.ArrayList; import java.util.Calendar; import java.util.Collection; import java.util.HashMap; -import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Properties; @@ -434,8 +433,7 @@ public class MapredLocalTask extends Tas List aliasFiles = bucketMatcher.getAliasBucketFiles(currentInputFile, bucketMatcherCxt .getMapJoinBigTableAlias(), alias); - Iterator iter = aliasFiles.iterator(); - fetchOp.setupContext(iter, null); + fetchOp.setupContext(aliasFiles); } @Override Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/SMBMapJoinOperator.java URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/SMBMapJoinOperator.java?rev=1382098&r1=1382097&r2=1382098&view=diff ============================================================================== --- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/SMBMapJoinOperator.java (original) +++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/SMBMapJoinOperator.java Fri Sep 7 17:40:14 2012 @@ -17,10 +17,12 @@ */ package org.apache.hadoop.hive.ql.exec; +import java.io.IOException; import java.io.Serializable; import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; import java.util.HashMap; -import java.util.Iterator; import java.util.List; import java.util.Map; @@ -38,12 +40,14 @@ import org.apache.hadoop.hive.ql.plan.Ma import org.apache.hadoop.hive.ql.plan.OperatorDesc; import org.apache.hadoop.hive.ql.plan.SMBJoinDesc; import org.apache.hadoop.hive.ql.plan.api.OperatorType; +import org.apache.hadoop.hive.ql.util.ObjectPair; import org.apache.hadoop.hive.serde2.ColumnProjectionUtils; import org.apache.hadoop.hive.serde2.objectinspector.InspectableObject; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.io.WritableComparator; import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.util.PriorityQueue; import org.apache.hadoop.util.ReflectionUtils; /** @@ -58,14 +62,15 @@ public class SMBMapJoinOperator extends .getName()); private MapredLocalWork localWork = null; - private Map fetchOperators; + private Map aliasToMergeQueue = Collections.emptyMap(); + transient ArrayList[] keyWritables; transient ArrayList[] nextKeyWritables; RowContainer>[] nextGroupStorage; RowContainer>[] candidateStorage; transient Map tagToAlias; - private transient boolean[] fetchOpDone; + private transient boolean[] fetchDone; private transient boolean[] foundNextKeyGroup; transient boolean firstFetchHappened = false; private transient boolean inputFileChanged = false; @@ -102,7 +107,7 @@ public class SMBMapJoinOperator extends candidateStorage = new RowContainer[maxAlias]; keyWritables = new ArrayList[maxAlias]; nextKeyWritables = new ArrayList[maxAlias]; - fetchOpDone = new boolean[maxAlias]; + fetchDone = new boolean[maxAlias]; foundNextKeyGroup = new boolean[maxAlias]; int bucketSize = HiveConf.getIntVar(hconf, @@ -123,7 +128,7 @@ public class SMBMapJoinOperator extends for (Byte alias : order) { if(alias != (byte) posBigTable) { - fetchOpDone[alias] = false; + fetchDone[alias] = false; } foundNextKeyGroup[alias] = false; } @@ -142,42 +147,51 @@ public class SMBMapJoinOperator extends } localWorkInited = true; this.localWork = localWork; - fetchOperators = new HashMap(); + aliasToMergeQueue = new HashMap(); - Map fetchOpJobConfMap = new HashMap(); // create map local operators - for (Map.Entry entry : localWork.getAliasToFetchWork() - .entrySet()) { - JobConf jobClone = new JobConf(hconf); - Operator tableScan = localWork.getAliasToWork() - .get(entry.getKey()); - if (tableScan instanceof TableScanOperator) { - ArrayList list = ((TableScanOperator)tableScan).getNeededColumnIDs(); - if (list != null) { - ColumnProjectionUtils.appendReadColumnIDs(jobClone, list); - } - } else { - ColumnProjectionUtils.setFullyReadColumns(jobClone); + Map aliasToFetchWork = localWork.getAliasToFetchWork(); + Map> aliasToWork = localWork.getAliasToWork(); + + for (Map.Entry entry : aliasToFetchWork.entrySet()) { + String alias = entry.getKey(); + FetchWork fetchWork = entry.getValue(); + + Operator forwardOp = aliasToWork.get(alias); + forwardOp.setExecContext(getExecContext()); + + JobConf jobClone = cloneJobConf(hconf, forwardOp); + FetchOperator fetchOp = new FetchOperator(fetchWork, jobClone); + forwardOp.initialize(jobClone, new ObjectInspector[]{fetchOp.getOutputObjectInspector()}); + fetchOp.clearFetchContext(); + + MergeQueue mergeQueue = new MergeQueue(alias, fetchWork, jobClone); + + aliasToMergeQueue.put(alias, mergeQueue); + l4j.info("fetch operators for " + alias + " initialized"); + } + } + + private JobConf cloneJobConf(Configuration hconf, Operator op) { + JobConf jobClone = new JobConf(hconf); + if (op instanceof TableScanOperator) { + List list = ((TableScanOperator)op).getNeededColumnIDs(); + if (list != null) { + ColumnProjectionUtils.appendReadColumnIDs(jobClone, list); } - FetchOperator fetchOp = new FetchOperator(entry.getValue(),jobClone); - fetchOpJobConfMap.put(fetchOp, jobClone); - fetchOperators.put(entry.getKey(), fetchOp); - l4j.info("fetchoperator for " + entry.getKey() + " created"); - } - - for (Map.Entry entry : fetchOperators.entrySet()) { - Operator forwardOp = localWork.getAliasToWork() - .get(entry.getKey()); - // All the operators need to be initialized before process - forwardOp.setExecContext(this.getExecContext()); - FetchOperator fetchOp = entry.getValue(); - JobConf jobConf = fetchOpJobConfMap.get(fetchOp); - if (jobConf == null) { - jobConf = this.getExecContext().getJc(); + } else { + ColumnProjectionUtils.setFullyReadColumns(jobClone); + } + return jobClone; + } + + private byte tagForAlias(String alias) { + for (Map.Entry entry : tagToAlias.entrySet()) { + if (entry.getValue().equals(alias)) { + return entry.getKey(); } - forwardOp.initialize(jobConf, new ObjectInspector[] {fetchOp.getOutputObjectInspector()}); - l4j.info("fetchoperator for " + entry.getKey() + " initialized"); } + return -1; } // The input file has changed - load the correct hash bucket @@ -196,11 +210,10 @@ public class SMBMapJoinOperator extends joinFinalLeftData(); } // set up the fetch operator for the new input file. - for (Map.Entry entry : fetchOperators.entrySet()) { + for (Map.Entry entry : aliasToMergeQueue.entrySet()) { String alias = entry.getKey(); - FetchOperator fetchOp = entry.getValue(); - fetchOp.clearFetchContext(); - setUpFetchOpContext(fetchOp, alias); + MergeQueue mergeQueue = entry.getValue(); + setUpFetchContexts(alias, mergeQueue); } firstFetchHappened = false; inputFileChanged = false; @@ -218,7 +231,6 @@ public class SMBMapJoinOperator extends } byte alias = (byte) tag; - // compute keys and values as StandardObjects // compute keys and values as StandardObjects ArrayList key = JoinUtil.computeKeys(row, joinKeys.get(alias), @@ -268,24 +280,24 @@ public class SMBMapJoinOperator extends private void joinFinalLeftData() throws HiveException { RowContainer bigTblRowContainer = this.candidateStorage[this.posBigTable]; - boolean allFetchOpDone = allFetchOpDone(); + boolean allFetchDone = allFetchDone(); // if all left data in small tables are less than and equal to the left data // in big table, let's them catch up while (bigTblRowContainer != null && bigTblRowContainer.size() > 0 - && !allFetchOpDone) { + && !allFetchDone) { joinOneGroup(); bigTblRowContainer = this.candidateStorage[this.posBigTable]; - allFetchOpDone = allFetchOpDone(); + allFetchDone = allFetchDone(); } - while (!allFetchOpDone) { + while (!allFetchDone) { List ret = joinOneGroup(); if (ret == null || ret.size() == 0) { break; } reportProgress(); numMapRowsRead++; - allFetchOpDone = allFetchOpDone(); + allFetchDone = allFetchDone(); } boolean dataInCache = true; @@ -307,15 +319,15 @@ public class SMBMapJoinOperator extends } } - private boolean allFetchOpDone() { - boolean allFetchOpDone = true; + private boolean allFetchDone() { + boolean allFetchDone = true; for (Byte tag : order) { if(tag == (byte) posBigTable) { continue; } - allFetchOpDone = allFetchOpDone && fetchOpDone[tag]; + allFetchDone = allFetchDone && fetchDone[tag]; } - return allFetchOpDone; + return allFetchDone; } private List joinOneGroup() throws HiveException { @@ -381,12 +393,12 @@ public class SMBMapJoinOperator extends //for tables other than the big table, we need to fetch more data until reach a new group or done. while (!foundNextKeyGroup[t]) { - if (fetchOpDone[t]) { + if (fetchDone[t]) { break; } fetchOneRow(t); } - if (!foundNextKeyGroup[t] && fetchOpDone[t]) { + if (!foundNextKeyGroup[t] && fetchDone[t]) { this.nextKeyWritables[t] = null; } } @@ -400,10 +412,10 @@ public class SMBMapJoinOperator extends this.nextGroupStorage[t] = oldRowContainer; } - private int compareKeys (ArrayList k1, ArrayList k2) { + private int compareKeys (List k1, List k2) { int ret = 0; - // join keys have difference sizes? + // join keys have difference sizes? ret = k1.size() - k2.size(); if (ret != 0) { return ret; @@ -475,53 +487,51 @@ public class SMBMapJoinOperator extends } } - private void setUpFetchOpContext(FetchOperator fetchOp, String alias) { + private void setUpFetchContexts(String alias, MergeQueue mergeQueue) throws HiveException { + mergeQueue.clearFetchContext(); + String currentInputFile = getExecContext().getCurrentInputFile(); - BucketMapJoinContext bucketMatcherCxt = localWork.getBucketMapjoinContext(); - Class bucketMatcherCls = bucketMatcherCxt - .getBucketMatcherClass(); - BucketMatcher bucketMatcher = (BucketMatcher) ReflectionUtils.newInstance( - bucketMatcherCls, null); + BucketMapJoinContext bucketMatcherCxt = localWork.getBucketMapjoinContext(); + Class bucketMatcherCls = bucketMatcherCxt.getBucketMatcherClass(); + BucketMatcher bucketMatcher = ReflectionUtils.newInstance(bucketMatcherCls, null); getExecContext().setFileId(bucketMatcherCxt.createFileId(currentInputFile)); LOG.info("set task id: " + getExecContext().getFileId()); bucketMatcher.setAliasBucketFileNameMapping(bucketMatcherCxt .getAliasBucketFileNameMapping()); + List aliasFiles = bucketMatcher.getAliasBucketFiles(currentInputFile, bucketMatcherCxt.getMapJoinBigTableAlias(), alias); - Iterator iter = aliasFiles.iterator(); - fetchOp.setupContext(iter, null); + mergeQueue.setupContext(aliasFiles); } private void fetchOneRow(byte tag) { - if (fetchOperators != null) { - String tble = this.tagToAlias.get(tag); - FetchOperator fetchOp = fetchOperators.get(tble); + String table = tagToAlias.get(tag); + MergeQueue mergeQueue = aliasToMergeQueue.get(table); - Operator forwardOp = localWork.getAliasToWork() - .get(tble); - try { - InspectableObject row = fetchOp.getNextRow(); - if (row == null) { - this.fetchOpDone[tag] = true; - return; - } - forwardOp.process(row.o, 0); - // check if any operator had a fatal error or early exit during - // execution - if (forwardOp.getDone()) { - this.fetchOpDone[tag] = true; - } - } catch (Throwable e) { - if (e instanceof OutOfMemoryError) { - // Don't create a new object if we are already out of memory - throw (OutOfMemoryError) e; - } else { - throw new RuntimeException("Map local work failed", e); - } + Operator forwardOp = localWork.getAliasToWork() + .get(table); + try { + InspectableObject row = mergeQueue.getNextRow(); + if (row == null) { + fetchDone[tag] = true; + return; + } + forwardOp.process(row.o, 0); + // check if any operator had a fatal error or early exit during + // execution + if (forwardOp.getDone()) { + fetchDone[tag] = true; + } + } catch (Throwable e) { + if (e instanceof OutOfMemoryError) { + // Don't create a new object if we are already out of memory + throw (OutOfMemoryError) e; + } else { + throw new RuntimeException("Map local work failed", e); } } } @@ -536,11 +546,10 @@ public class SMBMapJoinOperator extends if (inputFileChanged || !firstFetchHappened) { //set up the fetch operator for the new input file. - for (Map.Entry entry : fetchOperators.entrySet()) { + for (Map.Entry entry : aliasToMergeQueue.entrySet()) { String alias = entry.getKey(); - FetchOperator fetchOp = entry.getValue(); - fetchOp.clearFetchContext(); - setUpFetchOpContext(fetchOp, alias); + MergeQueue mergeQueue = entry.getValue(); + setUpFetchContexts(alias, mergeQueue); } firstFetchHappened = true; for (Byte t : order) { @@ -556,7 +565,7 @@ public class SMBMapJoinOperator extends //clean up for (Byte alias : order) { if(alias != (byte) posBigTable) { - fetchOpDone[alias] = false; + fetchDone[alias] = false; } foundNextKeyGroup[alias] = false; } @@ -564,12 +573,12 @@ public class SMBMapJoinOperator extends localWorkInited = false; super.closeOp(abort); - if (fetchOperators != null) { - for (Map.Entry entry : fetchOperators.entrySet()) { - Operator forwardOp = localWork - .getAliasToWork().get(entry.getKey()); - forwardOp.close(abort); - } + for (Map.Entry entry : aliasToMergeQueue.entrySet()) { + String alias = entry.getKey(); + MergeQueue mergeQueue = entry.getValue(); + Operator forwardOp = localWork.getAliasToWork().get(alias); + forwardOp.close(abort); + mergeQueue.clearFetchContext(); } } @@ -592,4 +601,145 @@ public class SMBMapJoinOperator extends public OperatorType getType() { return OperatorType.MAPJOIN; } + + // returns rows from possibly multiple bucket files of small table in ascending order + // by utilizing primary queue (borrowed from hadoop) + // elements of queue (Integer) are index to FetchOperator[] (segments) + private class MergeQueue extends PriorityQueue { + + private final String alias; + private final FetchWork fetchWork; + private final JobConf jobConf; + + // for keeping track of the number of elements read. just for debugging + transient int counter; + + transient FetchOperator[] segments; + transient List keyFields; + transient List keyFieldOIs; + + // index of FetchOperator which is providing smallest one + transient Integer currentMinSegment; + transient ObjectPair, InspectableObject>[] keys; + + public MergeQueue(String alias, FetchWork fetchWork, JobConf jobConf) { + this.alias = alias; + this.fetchWork = fetchWork; + this.jobConf = jobConf; + } + + // paths = bucket files of small table for current bucket file of big table + // initializes a FetchOperator for each file in paths, reuses FetchOperator if possible + // currently, number of paths is always the same (bucket numbers are all the same over + // all partitions in a table). + // But if hive supports assigning bucket number for each partition, this can be vary + public void setupContext(List paths) throws HiveException { + int segmentLen = paths.size(); + FetchOperator[] segments = segmentsForSize(segmentLen); + for (int i = 0 ; i < segmentLen; i++) { + Path path = paths.get(i); + if (segments[i] == null) { + segments[i] = new FetchOperator(fetchWork, new JobConf(jobConf)); + } + segments[i].setupContext(Arrays.asList(path)); + } + initialize(segmentLen); + for (int i = 0; i < segmentLen; i++) { + if (nextHive(i)) { + put(i); + } + } + counter = 0; + } + + @SuppressWarnings("unchecked") + private FetchOperator[] segmentsForSize(int segmentLen) { + if (segments == null || segments.length < segmentLen) { + FetchOperator[] newSegments = new FetchOperator[segmentLen]; + ObjectPair, InspectableObject>[] newKeys = new ObjectPair[segmentLen]; + if (segments != null) { + System.arraycopy(segments, 0, newSegments, 0, segments.length); + System.arraycopy(keys, 0, newKeys, 0, keys.length); + } + segments = newSegments; + keys = newKeys; + } + return segments; + } + + public void clearFetchContext() throws HiveException { + if (segments != null) { + for (FetchOperator op : segments) { + if (op != null) { + op.clearFetchContext(); + } + } + } + } + + protected boolean lessThan(Object a, Object b) { + return compareKeys(keys[(Integer) a].getFirst(), keys[(Integer)b].getFirst()) < 0; + } + + public final InspectableObject getNextRow() throws IOException { + if (currentMinSegment != null) { + adjustPriorityQueue(currentMinSegment); + } + Integer current = top(); + if (current == null) { + LOG.info("MergeQueue forwarded " + counter + " rows"); + return null; + } + counter++; + return keys[currentMinSegment = current].getSecond(); + } + + private void adjustPriorityQueue(Integer current) throws IOException { + if (nextIO(current)) { + adjustTop(); // sort + } else { + pop(); + } + } + + // wrapping for exception handling + private boolean nextHive(Integer current) throws HiveException { + try { + return next(current); + } catch (IOException e) { + throw new HiveException(e); + } + } + + // wrapping for exception handling + private boolean nextIO(Integer current) throws IOException { + try { + return next(current); + } catch (HiveException e) { + throw new IOException(e); + } + } + + // return true if current min segment(FetchOperator) has next row + private boolean next(Integer current) throws IOException, HiveException { + if (keyFields == null) { + // joinKeys/joinKeysOI are initialized after making merge queue, so setup lazily at runtime + byte tag = tagForAlias(alias); + keyFields = joinKeys.get(tag); + keyFieldOIs = joinKeysObjectInspectors.get(tag); + } + InspectableObject nextRow = segments[current].getNextRow(); + if (nextRow != null) { + if (keys[current] == null) { + keys[current] = new ObjectPair, InspectableObject>(); + } + // todo this should be changed to be evaluated lazily, especially for single segment case + keys[current].setFirst(JoinUtil.computeKeys(nextRow.o, keyFields, keyFieldOIs)); + keys[current].setSecond(nextRow); + return true; + } + keys[current] = null; + return false; + } + } } Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SortedMergeBucketMapJoinOptimizer.java URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SortedMergeBucketMapJoinOptimizer.java?rev=1382098&r1=1382097&r2=1382098&view=diff ============================================================================== --- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SortedMergeBucketMapJoinOptimizer.java (original) +++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SortedMergeBucketMapJoinOptimizer.java Fri Sep 7 17:40:14 2012 @@ -242,15 +242,6 @@ public class SortedMergeBucketMapJoinOpt if (tso == null) { return false; } - if (pos != op.getConf().getPosBigTable()) { - // currently, a file from a big table can be joined with only 1 file from a small table - for (List files : - op.getConf().getAliasBucketFileNameMapping().get(alias).values()) { - if (files != null && files.size() > 1) { - return false; - } - } - } List keys = op.getConf().getKeys().get((byte) pos); // get all join columns from join keys stored in MapJoinDesc Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/FetchWork.java URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/FetchWork.java?rev=1382098&r1=1382097&r2=1382098&view=diff ============================================================================== --- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/FetchWork.java (original) +++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/FetchWork.java Fri Sep 7 17:40:14 2012 @@ -193,6 +193,17 @@ public class FetchWork implements Serial } /** + * @return the partDescs for paths + */ + public List getPartDescs(List paths) { + List parts = new ArrayList(paths.size()); + for (Path path : paths) { + parts.add(partDesc.get(partDir.indexOf(path.getParent().toString()))); + } + return parts; + } + + /** * @param partDesc * the partDesc to set */ Added: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/util/ObjectPair.java URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/util/ObjectPair.java?rev=1382098&view=auto ============================================================================== --- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/util/ObjectPair.java (added) +++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/util/ObjectPair.java Fri Sep 7 17:40:14 2012 @@ -0,0 +1,47 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.util; + +public class ObjectPair { + private F first; + private S second; + + public ObjectPair() {} + + public ObjectPair(F first, S second) { + this.first = first; + this.second = second; + } + + public F getFirst() { + return first; + } + + public void setFirst(F first) { + this.first = first; + } + + public S getSecond() { + return second; + } + + public void setSecond(S second) { + this.second = second; + } +} Propchange: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/util/ObjectPair.java ------------------------------------------------------------------------------ svn:eol-style = native Modified: hive/trunk/ql/src/test/queries/clientpositive/bucketcontext_1.q URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/queries/clientpositive/bucketcontext_1.q?rev=1382098&r1=1382097&r2=1382098&view=diff ============================================================================== --- hive/trunk/ql/src/test/queries/clientpositive/bucketcontext_1.q (original) +++ hive/trunk/ql/src/test/queries/clientpositive/bucketcontext_1.q Fri Sep 7 17:40:14 2012 @@ -19,6 +19,5 @@ explain extended select /* + MAPJOIN(a) select /* + MAPJOIN(a) */ count(*) FROM bucket_small a JOIN bucket_big b ON a.key = b.key; set hive.optimize.bucketmapjoin.sortedmerge = true; -set hive.input.format = org.apache.hadoop.hive.ql.io.BucketizedHiveInputFormat; explain extended select /* + MAPJOIN(a) */ count(*) FROM bucket_small a JOIN bucket_big b ON a.key = b.key; select /* + MAPJOIN(a) */ count(*) FROM bucket_small a JOIN bucket_big b ON a.key = b.key; Modified: hive/trunk/ql/src/test/queries/clientpositive/bucketcontext_2.q URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/queries/clientpositive/bucketcontext_2.q?rev=1382098&r1=1382097&r2=1382098&view=diff ============================================================================== --- hive/trunk/ql/src/test/queries/clientpositive/bucketcontext_2.q (original) +++ hive/trunk/ql/src/test/queries/clientpositive/bucketcontext_2.q Fri Sep 7 17:40:14 2012 @@ -17,6 +17,5 @@ explain extended select /* + MAPJOIN(a) select /* + MAPJOIN(a) */ count(*) FROM bucket_small a JOIN bucket_big b ON a.key = b.key; set hive.optimize.bucketmapjoin.sortedmerge = true; -set hive.input.format = org.apache.hadoop.hive.ql.io.BucketizedHiveInputFormat; explain extended select /* + MAPJOIN(a) */ count(*) FROM bucket_small a JOIN bucket_big b ON a.key = b.key; - +select /* + MAPJOIN(a) */ count(*) FROM bucket_small a JOIN bucket_big b ON a.key = b.key; Modified: hive/trunk/ql/src/test/queries/clientpositive/bucketcontext_3.q URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/queries/clientpositive/bucketcontext_3.q?rev=1382098&r1=1382097&r2=1382098&view=diff ============================================================================== --- hive/trunk/ql/src/test/queries/clientpositive/bucketcontext_3.q (original) +++ hive/trunk/ql/src/test/queries/clientpositive/bucketcontext_3.q Fri Sep 7 17:40:14 2012 @@ -17,6 +17,5 @@ explain extended select /* + MAPJOIN(a) select /* + MAPJOIN(a) */ count(*) FROM bucket_small a JOIN bucket_big b ON a.key = b.key; set hive.optimize.bucketmapjoin.sortedmerge = true; -set hive.input.format = org.apache.hadoop.hive.ql.io.BucketizedHiveInputFormat; explain extended select /* + MAPJOIN(a) */ count(*) FROM bucket_small a JOIN bucket_big b ON a.key = b.key; - +select /* + MAPJOIN(a) */ count(*) FROM bucket_small a JOIN bucket_big b ON a.key = b.key; Modified: hive/trunk/ql/src/test/queries/clientpositive/bucketcontext_4.q URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/queries/clientpositive/bucketcontext_4.q?rev=1382098&r1=1382097&r2=1382098&view=diff ============================================================================== --- hive/trunk/ql/src/test/queries/clientpositive/bucketcontext_4.q (original) +++ hive/trunk/ql/src/test/queries/clientpositive/bucketcontext_4.q Fri Sep 7 17:40:14 2012 @@ -19,7 +19,5 @@ explain extended select /* + MAPJOIN(a) select /* + MAPJOIN(a) */ count(*) FROM bucket_small a JOIN bucket_big b ON a.key = b.key; set hive.optimize.bucketmapjoin.sortedmerge = true; -set hive.input.format = org.apache.hadoop.hive.ql.io.BucketizedHiveInputFormat; explain extended select /* + MAPJOIN(a) */ count(*) FROM bucket_small a JOIN bucket_big b ON a.key = b.key; - - +select /* + MAPJOIN(a) */ count(*) FROM bucket_small a JOIN bucket_big b ON a.key = b.key; Added: hive/trunk/ql/src/test/queries/clientpositive/bucketcontext_5.q URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/queries/clientpositive/bucketcontext_5.q?rev=1382098&view=auto ============================================================================== --- hive/trunk/ql/src/test/queries/clientpositive/bucketcontext_5.q (added) +++ hive/trunk/ql/src/test/queries/clientpositive/bucketcontext_5.q Fri Sep 7 17:40:14 2012 @@ -0,0 +1,18 @@ +-- small no part, 4 bucket & big no part, 2 bucket +CREATE TABLE bucket_small (key string, value string) CLUSTERED BY (key) SORTED BY (key) INTO 4 BUCKETS STORED AS TEXTFILE; +load data local inpath '../data/files/srcsortbucket1outof4.txt' INTO TABLE bucket_small; +load data local inpath '../data/files/srcsortbucket2outof4.txt' INTO TABLE bucket_small; +load data local inpath '../data/files/srcsortbucket3outof4.txt' INTO TABLE bucket_small; +load data local inpath '../data/files/srcsortbucket4outof4.txt' INTO TABLE bucket_small; + +CREATE TABLE bucket_big (key string, value string) CLUSTERED BY (key) SORTED BY (key) INTO 2 BUCKETS STORED AS TEXTFILE; +load data local inpath '../data/files/srcsortbucket1outof4.txt' INTO TABLE bucket_big; +load data local inpath '../data/files/srcsortbucket2outof4.txt' INTO TABLE bucket_big; + +set hive.optimize.bucketmapjoin = true; +explain extended select /* + MAPJOIN(a) */ count(*) FROM bucket_small a JOIN bucket_big b ON a.key = b.key; +select /* + MAPJOIN(a) */ count(*) FROM bucket_small a JOIN bucket_big b ON a.key = b.key; + +set hive.optimize.bucketmapjoin.sortedmerge = true; +explain extended select /* + MAPJOIN(a) */ count(*) FROM bucket_small a JOIN bucket_big b ON a.key = b.key; +select /* + MAPJOIN(a) */ count(*) FROM bucket_small a JOIN bucket_big b ON a.key = b.key; Added: hive/trunk/ql/src/test/queries/clientpositive/bucketcontext_6.q URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/queries/clientpositive/bucketcontext_6.q?rev=1382098&view=auto ============================================================================== --- hive/trunk/ql/src/test/queries/clientpositive/bucketcontext_6.q (added) +++ hive/trunk/ql/src/test/queries/clientpositive/bucketcontext_6.q Fri Sep 7 17:40:14 2012 @@ -0,0 +1,21 @@ +-- small no part, 4 bucket & big 2 part, 2 bucket +CREATE TABLE bucket_small (key string, value string) CLUSTERED BY (key) SORTED BY (key) INTO 4 BUCKETS STORED AS TEXTFILE; +load data local inpath '../data/files/srcsortbucket1outof4.txt' INTO TABLE bucket_small; +load data local inpath '../data/files/srcsortbucket2outof4.txt' INTO TABLE bucket_small; +load data local inpath '../data/files/srcsortbucket3outof4.txt' INTO TABLE bucket_small; +load data local inpath '../data/files/srcsortbucket4outof4.txt' INTO TABLE bucket_small; + +CREATE TABLE bucket_big (key string, value string) partitioned by (ds string) CLUSTERED BY (key) SORTED BY (key) INTO 2 BUCKETS STORED AS TEXTFILE; +load data local inpath '../data/files/srcsortbucket1outof4.txt' INTO TABLE bucket_big partition(ds='2008-04-08'); +load data local inpath '../data/files/srcsortbucket2outof4.txt' INTO TABLE bucket_big partition(ds='2008-04-08'); + +load data local inpath '../data/files/srcsortbucket1outof4.txt' INTO TABLE bucket_big partition(ds='2008-04-09'); +load data local inpath '../data/files/srcsortbucket2outof4.txt' INTO TABLE bucket_big partition(ds='2008-04-09'); + +set hive.optimize.bucketmapjoin = true; +explain extended select /* + MAPJOIN(a) */ count(*) FROM bucket_small a JOIN bucket_big b ON a.key = b.key; +select /* + MAPJOIN(a) */ count(*) FROM bucket_small a JOIN bucket_big b ON a.key = b.key; + +set hive.optimize.bucketmapjoin.sortedmerge = true; +explain extended select /* + MAPJOIN(a) */ count(*) FROM bucket_small a JOIN bucket_big b ON a.key = b.key; +select /* + MAPJOIN(a) */ count(*) FROM bucket_small a JOIN bucket_big b ON a.key = b.key; Modified: hive/trunk/ql/src/test/results/clientpositive/bucketcontext_2.q.out URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/results/clientpositive/bucketcontext_2.q.out?rev=1382098&r1=1382097&r2=1382098&view=diff ============================================================================== --- hive/trunk/ql/src/test/results/clientpositive/bucketcontext_2.q.out (original) +++ hive/trunk/ql/src/test/results/clientpositive/bucketcontext_2.q.out Fri Sep 7 17:40:14 2012 @@ -337,40 +337,11 @@ ABSTRACT SYNTAX TREE: (TOK_QUERY (TOK_FROM (TOK_JOIN (TOK_TABREF (TOK_TABNAME bucket_small) a) (TOK_TABREF (TOK_TABNAME bucket_big) b) (= (. (TOK_TABLE_OR_COL a) key) (. (TOK_TABLE_OR_COL b) key)))) (TOK_INSERT (TOK_DESTINATION (TOK_DIR TOK_TMP_FILE)) (TOK_SELECT (TOK_HINTLIST (TOK_HINT TOK_MAPJOIN (TOK_HINTARGLIST a))) (TOK_SELEXPR (TOK_FUNCTIONSTAR count))))) STAGE DEPENDENCIES: - Stage-4 is a root stage - Stage-1 depends on stages: Stage-4 + Stage-1 is a root stage Stage-2 depends on stages: Stage-1 Stage-0 is a root stage STAGE PLANS: - Stage: Stage-4 - Map Reduce Local Work - Alias -> Map Local Tables: - a - Fetch Operator - limit: -1 - Alias -> Map Local Operator Tree: - a - TableScan - alias: a - GatherStats: false - HashTable Sink Operator - condition expressions: - 0 - 1 - handleSkewJoin: false - keys: - 0 [Column[key]] - 1 [Column[key]] - Position of Big Table: 1 - Bucket Mapjoin Context: - Alias Bucket Base File Name Mapping: - a {ds=2008-04-08/srcsortbucket1outof4.txt=[ds=2008-04-08/srcsortbucket1outof4.txt, ds=2008-04-08/srcsortbucket3outof4.txt], ds=2008-04-08/srcsortbucket2outof4.txt=[ds=2008-04-08/srcsortbucket2outof4.txt, ds=2008-04-08/srcsortbucket4outof4.txt], ds=2008-04-09/srcsortbucket1outof4.txt=[ds=2008-04-08/srcsortbucket1outof4.txt, ds=2008-04-08/srcsortbucket3outof4.txt], ds=2008-04-09/srcsortbucket2outof4.txt=[ds=2008-04-08/srcsortbucket2outof4.txt, ds=2008-04-08/srcsortbucket4outof4.txt]} - Alias Bucket File Name Mapping: -#### A masked pattern was here #### - Alias Bucket Output File Name Mapping: -#### A masked pattern was here #### - Stage: Stage-1 Map Reduce Alias -> Map Operator Tree: @@ -378,7 +349,7 @@ STAGE PLANS: TableScan alias: b GatherStats: false - Map Join Operator + Sorted Merge Bucket Map Join Operator condition map: Inner Join 0 to 1 condition expressions: @@ -404,8 +375,6 @@ STAGE PLANS: TotalFiles: 1 GatherStats: false MultiFileSpray: false - Local Work: - Map Reduce Local Work Needs Tagging: false Path -> Alias: #### A masked pattern was here #### @@ -584,3 +553,16 @@ STAGE PLANS: limit: -1 +PREHOOK: query: select /* + MAPJOIN(a) */ count(*) FROM bucket_small a JOIN bucket_big b ON a.key = b.key +PREHOOK: type: QUERY +PREHOOK: Input: default@bucket_big@ds=2008-04-08 +PREHOOK: Input: default@bucket_big@ds=2008-04-09 +PREHOOK: Input: default@bucket_small@ds=2008-04-08 +#### A masked pattern was here #### +POSTHOOK: query: select /* + MAPJOIN(a) */ count(*) FROM bucket_small a JOIN bucket_big b ON a.key = b.key +POSTHOOK: type: QUERY +POSTHOOK: Input: default@bucket_big@ds=2008-04-08 +POSTHOOK: Input: default@bucket_big@ds=2008-04-09 +POSTHOOK: Input: default@bucket_small@ds=2008-04-08 +#### A masked pattern was here #### +928 Modified: hive/trunk/ql/src/test/results/clientpositive/bucketcontext_3.q.out URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/results/clientpositive/bucketcontext_3.q.out?rev=1382098&r1=1382097&r2=1382098&view=diff ============================================================================== --- hive/trunk/ql/src/test/results/clientpositive/bucketcontext_3.q.out (original) +++ hive/trunk/ql/src/test/results/clientpositive/bucketcontext_3.q.out Fri Sep 7 17:40:14 2012 @@ -287,40 +287,11 @@ ABSTRACT SYNTAX TREE: (TOK_QUERY (TOK_FROM (TOK_JOIN (TOK_TABREF (TOK_TABNAME bucket_small) a) (TOK_TABREF (TOK_TABNAME bucket_big) b) (= (. (TOK_TABLE_OR_COL a) key) (. (TOK_TABLE_OR_COL b) key)))) (TOK_INSERT (TOK_DESTINATION (TOK_DIR TOK_TMP_FILE)) (TOK_SELECT (TOK_HINTLIST (TOK_HINT TOK_MAPJOIN (TOK_HINTARGLIST a))) (TOK_SELEXPR (TOK_FUNCTIONSTAR count))))) STAGE DEPENDENCIES: - Stage-4 is a root stage - Stage-1 depends on stages: Stage-4 + Stage-1 is a root stage Stage-2 depends on stages: Stage-1 Stage-0 is a root stage STAGE PLANS: - Stage: Stage-4 - Map Reduce Local Work - Alias -> Map Local Tables: - a - Fetch Operator - limit: -1 - Alias -> Map Local Operator Tree: - a - TableScan - alias: a - GatherStats: false - HashTable Sink Operator - condition expressions: - 0 - 1 - handleSkewJoin: false - keys: - 0 [Column[key]] - 1 [Column[key]] - Position of Big Table: 1 - Bucket Mapjoin Context: - Alias Bucket Base File Name Mapping: - a {ds=2008-04-08/srcsortbucket1outof4.txt=[ds=2008-04-08/srcsortbucket1outof4.txt, ds=2008-04-09/srcsortbucket1outof4.txt], ds=2008-04-08/srcsortbucket2outof4.txt=[ds=2008-04-08/srcsortbucket2outof4.txt, ds=2008-04-09/srcsortbucket2outof4.txt], ds=2008-04-08/srcsortbucket3outof4.txt=[ds=2008-04-08/srcsortbucket1outof4.txt, ds=2008-04-09/srcsortbucket1outof4.txt], ds=2008-04-08/srcsortbucket4outof4.txt=[ds=2008-04-08/srcsortbucket2outof4.txt, ds=2008-04-09/srcsortbucket2outof4.txt]} - Alias Bucket File Name Mapping: -#### A masked pattern was here #### - Alias Bucket Output File Name Mapping: -#### A masked pattern was here #### - Stage: Stage-1 Map Reduce Alias -> Map Operator Tree: @@ -328,7 +299,7 @@ STAGE PLANS: TableScan alias: b GatherStats: false - Map Join Operator + Sorted Merge Bucket Map Join Operator condition map: Inner Join 0 to 1 condition expressions: @@ -354,8 +325,6 @@ STAGE PLANS: TotalFiles: 1 GatherStats: false MultiFileSpray: false - Local Work: - Map Reduce Local Work Needs Tagging: false Path -> Alias: #### A masked pattern was here #### @@ -484,3 +453,16 @@ STAGE PLANS: limit: -1 +PREHOOK: query: select /* + MAPJOIN(a) */ count(*) FROM bucket_small a JOIN bucket_big b ON a.key = b.key +PREHOOK: type: QUERY +PREHOOK: Input: default@bucket_big@ds=2008-04-08 +PREHOOK: Input: default@bucket_small@ds=2008-04-08 +PREHOOK: Input: default@bucket_small@ds=2008-04-09 +#### A masked pattern was here #### +POSTHOOK: query: select /* + MAPJOIN(a) */ count(*) FROM bucket_small a JOIN bucket_big b ON a.key = b.key +POSTHOOK: type: QUERY +POSTHOOK: Input: default@bucket_big@ds=2008-04-08 +POSTHOOK: Input: default@bucket_small@ds=2008-04-08 +POSTHOOK: Input: default@bucket_small@ds=2008-04-09 +#### A masked pattern was here #### +928 Modified: hive/trunk/ql/src/test/results/clientpositive/bucketcontext_4.q.out URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/results/clientpositive/bucketcontext_4.q.out?rev=1382098&r1=1382097&r2=1382098&view=diff ============================================================================== --- hive/trunk/ql/src/test/results/clientpositive/bucketcontext_4.q.out (original) +++ hive/trunk/ql/src/test/results/clientpositive/bucketcontext_4.q.out Fri Sep 7 17:40:14 2012 @@ -299,40 +299,11 @@ ABSTRACT SYNTAX TREE: (TOK_QUERY (TOK_FROM (TOK_JOIN (TOK_TABREF (TOK_TABNAME bucket_small) a) (TOK_TABREF (TOK_TABNAME bucket_big) b) (= (. (TOK_TABLE_OR_COL a) key) (. (TOK_TABLE_OR_COL b) key)))) (TOK_INSERT (TOK_DESTINATION (TOK_DIR TOK_TMP_FILE)) (TOK_SELECT (TOK_HINTLIST (TOK_HINT TOK_MAPJOIN (TOK_HINTARGLIST a))) (TOK_SELEXPR (TOK_FUNCTIONSTAR count))))) STAGE DEPENDENCIES: - Stage-4 is a root stage - Stage-1 depends on stages: Stage-4 + Stage-1 is a root stage Stage-2 depends on stages: Stage-1 Stage-0 is a root stage STAGE PLANS: - Stage: Stage-4 - Map Reduce Local Work - Alias -> Map Local Tables: - a - Fetch Operator - limit: -1 - Alias -> Map Local Operator Tree: - a - TableScan - alias: a - GatherStats: false - HashTable Sink Operator - condition expressions: - 0 - 1 - handleSkewJoin: false - keys: - 0 [Column[key]] - 1 [Column[key]] - Position of Big Table: 1 - Bucket Mapjoin Context: - Alias Bucket Base File Name Mapping: - a {ds=2008-04-08/srcsortbucket1outof4.txt=[ds=2008-04-08/srcsortbucket1outof4.txt, ds=2008-04-08/srcsortbucket3outof4.txt, ds=2008-04-09/srcsortbucket1outof4.txt, ds=2008-04-09/srcsortbucket3outof4.txt], ds=2008-04-08/srcsortbucket2outof4.txt=[ds=2008-04-08/srcsortbucket2outof4.txt, ds=2008-04-08/srcsortbucket4outof4.txt, ds=2008-04-09/srcsortbucket2outof4.txt, ds=2008-04-09/srcsortbucket4outof4.txt]} - Alias Bucket File Name Mapping: -#### A masked pattern was here #### - Alias Bucket Output File Name Mapping: -#### A masked pattern was here #### - Stage: Stage-1 Map Reduce Alias -> Map Operator Tree: @@ -340,7 +311,7 @@ STAGE PLANS: TableScan alias: b GatherStats: false - Map Join Operator + Sorted Merge Bucket Map Join Operator condition map: Inner Join 0 to 1 condition expressions: @@ -366,8 +337,6 @@ STAGE PLANS: TotalFiles: 1 GatherStats: false MultiFileSpray: false - Local Work: - Map Reduce Local Work Needs Tagging: false Path -> Alias: #### A masked pattern was here #### @@ -496,3 +465,16 @@ STAGE PLANS: limit: -1 +PREHOOK: query: select /* + MAPJOIN(a) */ count(*) FROM bucket_small a JOIN bucket_big b ON a.key = b.key +PREHOOK: type: QUERY +PREHOOK: Input: default@bucket_big@ds=2008-04-08 +PREHOOK: Input: default@bucket_small@ds=2008-04-08 +PREHOOK: Input: default@bucket_small@ds=2008-04-09 +#### A masked pattern was here #### +POSTHOOK: query: select /* + MAPJOIN(a) */ count(*) FROM bucket_small a JOIN bucket_big b ON a.key = b.key +POSTHOOK: type: QUERY +POSTHOOK: Input: default@bucket_big@ds=2008-04-08 +POSTHOOK: Input: default@bucket_small@ds=2008-04-08 +POSTHOOK: Input: default@bucket_small@ds=2008-04-09 +#### A masked pattern was here #### +928 Added: hive/trunk/ql/src/test/results/clientpositive/bucketcontext_5.q.out URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/results/clientpositive/bucketcontext_5.q.out?rev=1382098&view=auto ============================================================================== --- hive/trunk/ql/src/test/results/clientpositive/bucketcontext_5.q.out (added) +++ hive/trunk/ql/src/test/results/clientpositive/bucketcontext_5.q.out Fri Sep 7 17:40:14 2012 @@ -0,0 +1,441 @@ +PREHOOK: query: -- small no part, 4 bucket & big no part, 2 bucket +CREATE TABLE bucket_small (key string, value string) CLUSTERED BY (key) SORTED BY (key) INTO 4 BUCKETS STORED AS TEXTFILE +PREHOOK: type: CREATETABLE +POSTHOOK: query: -- small no part, 4 bucket & big no part, 2 bucket +CREATE TABLE bucket_small (key string, value string) CLUSTERED BY (key) SORTED BY (key) INTO 4 BUCKETS STORED AS TEXTFILE +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: default@bucket_small +PREHOOK: query: load data local inpath '../data/files/srcsortbucket1outof4.txt' INTO TABLE bucket_small +PREHOOK: type: LOAD +PREHOOK: Output: default@bucket_small +POSTHOOK: query: load data local inpath '../data/files/srcsortbucket1outof4.txt' INTO TABLE bucket_small +POSTHOOK: type: LOAD +POSTHOOK: Output: default@bucket_small +PREHOOK: query: load data local inpath '../data/files/srcsortbucket2outof4.txt' INTO TABLE bucket_small +PREHOOK: type: LOAD +PREHOOK: Output: default@bucket_small +POSTHOOK: query: load data local inpath '../data/files/srcsortbucket2outof4.txt' INTO TABLE bucket_small +POSTHOOK: type: LOAD +POSTHOOK: Output: default@bucket_small +PREHOOK: query: load data local inpath '../data/files/srcsortbucket3outof4.txt' INTO TABLE bucket_small +PREHOOK: type: LOAD +PREHOOK: Output: default@bucket_small +POSTHOOK: query: load data local inpath '../data/files/srcsortbucket3outof4.txt' INTO TABLE bucket_small +POSTHOOK: type: LOAD +POSTHOOK: Output: default@bucket_small +PREHOOK: query: load data local inpath '../data/files/srcsortbucket4outof4.txt' INTO TABLE bucket_small +PREHOOK: type: LOAD +PREHOOK: Output: default@bucket_small +POSTHOOK: query: load data local inpath '../data/files/srcsortbucket4outof4.txt' INTO TABLE bucket_small +POSTHOOK: type: LOAD +POSTHOOK: Output: default@bucket_small +PREHOOK: query: CREATE TABLE bucket_big (key string, value string) CLUSTERED BY (key) SORTED BY (key) INTO 2 BUCKETS STORED AS TEXTFILE +PREHOOK: type: CREATETABLE +POSTHOOK: query: CREATE TABLE bucket_big (key string, value string) CLUSTERED BY (key) SORTED BY (key) INTO 2 BUCKETS STORED AS TEXTFILE +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: default@bucket_big +PREHOOK: query: load data local inpath '../data/files/srcsortbucket1outof4.txt' INTO TABLE bucket_big +PREHOOK: type: LOAD +PREHOOK: Output: default@bucket_big +POSTHOOK: query: load data local inpath '../data/files/srcsortbucket1outof4.txt' INTO TABLE bucket_big +POSTHOOK: type: LOAD +POSTHOOK: Output: default@bucket_big +PREHOOK: query: load data local inpath '../data/files/srcsortbucket2outof4.txt' INTO TABLE bucket_big +PREHOOK: type: LOAD +PREHOOK: Output: default@bucket_big +POSTHOOK: query: load data local inpath '../data/files/srcsortbucket2outof4.txt' INTO TABLE bucket_big +POSTHOOK: type: LOAD +POSTHOOK: Output: default@bucket_big +PREHOOK: query: explain extended select /* + MAPJOIN(a) */ count(*) FROM bucket_small a JOIN bucket_big b ON a.key = b.key +PREHOOK: type: QUERY +POSTHOOK: query: explain extended select /* + MAPJOIN(a) */ count(*) FROM bucket_small a JOIN bucket_big b ON a.key = b.key +POSTHOOK: type: QUERY +ABSTRACT SYNTAX TREE: + (TOK_QUERY (TOK_FROM (TOK_JOIN (TOK_TABREF (TOK_TABNAME bucket_small) a) (TOK_TABREF (TOK_TABNAME bucket_big) b) (= (. (TOK_TABLE_OR_COL a) key) (. (TOK_TABLE_OR_COL b) key)))) (TOK_INSERT (TOK_DESTINATION (TOK_DIR TOK_TMP_FILE)) (TOK_SELECT (TOK_HINTLIST (TOK_HINT TOK_MAPJOIN (TOK_HINTARGLIST a))) (TOK_SELEXPR (TOK_FUNCTIONSTAR count))))) + +STAGE DEPENDENCIES: + Stage-4 is a root stage + Stage-1 depends on stages: Stage-4 + Stage-2 depends on stages: Stage-1 + Stage-0 is a root stage + +STAGE PLANS: + Stage: Stage-4 + Map Reduce Local Work + Alias -> Map Local Tables: + a + Fetch Operator + limit: -1 + Alias -> Map Local Operator Tree: + a + TableScan + alias: a + GatherStats: false + HashTable Sink Operator + condition expressions: + 0 + 1 + handleSkewJoin: false + keys: + 0 [Column[key]] + 1 [Column[key]] + Position of Big Table: 1 + Bucket Mapjoin Context: + Alias Bucket Base File Name Mapping: + a {srcsortbucket1outof4.txt=[srcsortbucket1outof4.txt, srcsortbucket3outof4.txt], srcsortbucket2outof4.txt=[srcsortbucket2outof4.txt, srcsortbucket4outof4.txt]} + Alias Bucket File Name Mapping: +#### A masked pattern was here #### + Alias Bucket Output File Name Mapping: +#### A masked pattern was here #### + + Stage: Stage-1 + Map Reduce + Alias -> Map Operator Tree: + b + TableScan + alias: b + GatherStats: false + Map Join Operator + condition map: + Inner Join 0 to 1 + condition expressions: + 0 + 1 + handleSkewJoin: false + keys: + 0 [Column[key]] + 1 [Column[key]] + Position of Big Table: 1 + File Output Operator + compressed: false + GlobalTableId: 0 +#### A masked pattern was here #### + NumFilesPerFileSink: 1 + table: + input format: org.apache.hadoop.mapred.SequenceFileInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat + properties: + columns + columns.types + escape.delim \ + TotalFiles: 1 + GatherStats: false + MultiFileSpray: false + Local Work: + Map Reduce Local Work + Needs Tagging: false + Path -> Alias: +#### A masked pattern was here #### + Path -> Partition: +#### A masked pattern was here #### + Partition + base file name: bucket_big + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + properties: + SORTBUCKETCOLSPREFIX TRUE + bucket_count 2 + bucket_field_name key + columns key,value + columns.types string:string +#### A masked pattern was here #### + name default.bucket_big + numFiles 2 + numPartitions 0 + numRows 0 + rawDataSize 0 + serialization.ddl struct bucket_big { string key, string value} + serialization.format 1 + serialization.lib org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + totalSize 2750 +#### A masked pattern was here #### + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + properties: + SORTBUCKETCOLSPREFIX TRUE + bucket_count 2 + bucket_field_name key + columns key,value + columns.types string:string +#### A masked pattern was here #### + name default.bucket_big + numFiles 2 + numPartitions 0 + numRows 0 + rawDataSize 0 + serialization.ddl struct bucket_big { string key, string value} + serialization.format 1 + serialization.lib org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + totalSize 2750 +#### A masked pattern was here #### + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + name: default.bucket_big + name: default.bucket_big + + Stage: Stage-2 + Map Reduce + Alias -> Map Operator Tree: +#### A masked pattern was here #### + Select Operator + Select Operator + Group By Operator + aggregations: + expr: count() + bucketGroup: false + mode: hash + outputColumnNames: _col0 + Reduce Output Operator + sort order: + tag: -1 + value expressions: + expr: _col0 + type: bigint + Needs Tagging: false + Path -> Alias: +#### A masked pattern was here #### + Path -> Partition: +#### A masked pattern was here #### + Partition + base file name: -mr-10002 + input format: org.apache.hadoop.mapred.SequenceFileInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat + properties: + columns + columns.types + escape.delim \ + + input format: org.apache.hadoop.mapred.SequenceFileInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat + properties: + columns + columns.types + escape.delim \ + Reduce Operator Tree: + Group By Operator + aggregations: + expr: count(VALUE._col0) + bucketGroup: false + mode: mergepartial + outputColumnNames: _col0 + Select Operator + expressions: + expr: _col0 + type: bigint + outputColumnNames: _col0 + File Output Operator + compressed: false + GlobalTableId: 0 +#### A masked pattern was here #### + NumFilesPerFileSink: 1 +#### A masked pattern was here #### + table: + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + properties: + columns _col0 + columns.types bigint + escape.delim \ + serialization.format 1 + TotalFiles: 1 + GatherStats: false + MultiFileSpray: false + + Stage: Stage-0 + Fetch Operator + limit: -1 + + +PREHOOK: query: select /* + MAPJOIN(a) */ count(*) FROM bucket_small a JOIN bucket_big b ON a.key = b.key +PREHOOK: type: QUERY +PREHOOK: Input: default@bucket_big +PREHOOK: Input: default@bucket_small +#### A masked pattern was here #### +POSTHOOK: query: select /* + MAPJOIN(a) */ count(*) FROM bucket_small a JOIN bucket_big b ON a.key = b.key +POSTHOOK: type: QUERY +POSTHOOK: Input: default@bucket_big +POSTHOOK: Input: default@bucket_small +#### A masked pattern was here #### +464 +PREHOOK: query: explain extended select /* + MAPJOIN(a) */ count(*) FROM bucket_small a JOIN bucket_big b ON a.key = b.key +PREHOOK: type: QUERY +POSTHOOK: query: explain extended select /* + MAPJOIN(a) */ count(*) FROM bucket_small a JOIN bucket_big b ON a.key = b.key +POSTHOOK: type: QUERY +ABSTRACT SYNTAX TREE: + (TOK_QUERY (TOK_FROM (TOK_JOIN (TOK_TABREF (TOK_TABNAME bucket_small) a) (TOK_TABREF (TOK_TABNAME bucket_big) b) (= (. (TOK_TABLE_OR_COL a) key) (. (TOK_TABLE_OR_COL b) key)))) (TOK_INSERT (TOK_DESTINATION (TOK_DIR TOK_TMP_FILE)) (TOK_SELECT (TOK_HINTLIST (TOK_HINT TOK_MAPJOIN (TOK_HINTARGLIST a))) (TOK_SELEXPR (TOK_FUNCTIONSTAR count))))) + +STAGE DEPENDENCIES: + Stage-1 is a root stage + Stage-2 depends on stages: Stage-1 + Stage-0 is a root stage + +STAGE PLANS: + Stage: Stage-1 + Map Reduce + Alias -> Map Operator Tree: + b + TableScan + alias: b + GatherStats: false + Sorted Merge Bucket Map Join Operator + condition map: + Inner Join 0 to 1 + condition expressions: + 0 + 1 + handleSkewJoin: false + keys: + 0 [Column[key]] + 1 [Column[key]] + Position of Big Table: 1 + File Output Operator + compressed: false + GlobalTableId: 0 +#### A masked pattern was here #### + NumFilesPerFileSink: 1 + table: + input format: org.apache.hadoop.mapred.SequenceFileInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat + properties: + columns + columns.types + escape.delim \ + TotalFiles: 1 + GatherStats: false + MultiFileSpray: false + Needs Tagging: false + Path -> Alias: +#### A masked pattern was here #### + Path -> Partition: +#### A masked pattern was here #### + Partition + base file name: bucket_big + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + properties: + SORTBUCKETCOLSPREFIX TRUE + bucket_count 2 + bucket_field_name key + columns key,value + columns.types string:string +#### A masked pattern was here #### + name default.bucket_big + numFiles 2 + numPartitions 0 + numRows 0 + rawDataSize 0 + serialization.ddl struct bucket_big { string key, string value} + serialization.format 1 + serialization.lib org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + totalSize 2750 +#### A masked pattern was here #### + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + properties: + SORTBUCKETCOLSPREFIX TRUE + bucket_count 2 + bucket_field_name key + columns key,value + columns.types string:string +#### A masked pattern was here #### + name default.bucket_big + numFiles 2 + numPartitions 0 + numRows 0 + rawDataSize 0 + serialization.ddl struct bucket_big { string key, string value} + serialization.format 1 + serialization.lib org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + totalSize 2750 +#### A masked pattern was here #### + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + name: default.bucket_big + name: default.bucket_big + + Stage: Stage-2 + Map Reduce + Alias -> Map Operator Tree: +#### A masked pattern was here #### + Select Operator + Select Operator + Group By Operator + aggregations: + expr: count() + bucketGroup: false + mode: hash + outputColumnNames: _col0 + Reduce Output Operator + sort order: + tag: -1 + value expressions: + expr: _col0 + type: bigint + Needs Tagging: false + Path -> Alias: +#### A masked pattern was here #### + Path -> Partition: +#### A masked pattern was here #### + Partition + base file name: -mr-10002 + input format: org.apache.hadoop.mapred.SequenceFileInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat + properties: + columns + columns.types + escape.delim \ + + input format: org.apache.hadoop.mapred.SequenceFileInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat + properties: + columns + columns.types + escape.delim \ + Reduce Operator Tree: + Group By Operator + aggregations: + expr: count(VALUE._col0) + bucketGroup: false + mode: mergepartial + outputColumnNames: _col0 + Select Operator + expressions: + expr: _col0 + type: bigint + outputColumnNames: _col0 + File Output Operator + compressed: false + GlobalTableId: 0 +#### A masked pattern was here #### + NumFilesPerFileSink: 1 +#### A masked pattern was here #### + table: + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + properties: + columns _col0 + columns.types bigint + escape.delim \ + serialization.format 1 + TotalFiles: 1 + GatherStats: false + MultiFileSpray: false + + Stage: Stage-0 + Fetch Operator + limit: -1 + + +PREHOOK: query: select /* + MAPJOIN(a) */ count(*) FROM bucket_small a JOIN bucket_big b ON a.key = b.key +PREHOOK: type: QUERY +PREHOOK: Input: default@bucket_big +PREHOOK: Input: default@bucket_small +#### A masked pattern was here #### +POSTHOOK: query: select /* + MAPJOIN(a) */ count(*) FROM bucket_small a JOIN bucket_big b ON a.key = b.key +POSTHOOK: type: QUERY +POSTHOOK: Input: default@bucket_big +POSTHOOK: Input: default@bucket_small +#### A masked pattern was here #### +464