Return-Path: Delivered-To: apmail-hadoop-hive-commits-archive@minotaur.apache.org Received: (qmail 66976 invoked from network); 5 Mar 2010 00:54:31 -0000 Received: from unknown (HELO mail.apache.org) (140.211.11.3) by 140.211.11.9 with SMTP; 5 Mar 2010 00:54:31 -0000 Received: (qmail 81743 invoked by uid 500); 5 Mar 2010 00:54:19 -0000 Delivered-To: apmail-hadoop-hive-commits-archive@hadoop.apache.org Received: (qmail 81725 invoked by uid 500); 5 Mar 2010 00:54:19 -0000 Mailing-List: contact hive-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: hive-dev@hadoop.apache.org Delivered-To: mailing list hive-commits@hadoop.apache.org Received: (qmail 81717 invoked by uid 99); 5 Mar 2010 00:54:19 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 05 Mar 2010 00:54:19 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=10.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 05 Mar 2010 00:54:07 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id F066823888E7; Fri, 5 Mar 2010 00:53:44 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r919252 [1/6] - in /hadoop/hive/trunk: ./ common/src/java/org/apache/hadoop/hive/conf/ data/files/ ql/src/java/org/apache/hadoop/hive/ql/exec/ ql/src/java/org/apache/hadoop/hive/ql/optimizer/ ql/src/java/org/apache/hadoop/hive/ql/parse/ ql/... Date: Fri, 05 Mar 2010 00:53:44 -0000 To: hive-commits@hadoop.apache.org From: namit@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20100305005344.F066823888E7@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: namit Date: Fri Mar 5 00:53:43 2010 New Revision: 919252 URL: http://svn.apache.org/viewvc?rev=919252&view=rev Log: HIVE-1194. Add sort merge join (He Yongqiang via namit) Added: hadoop/hive/trunk/data/files/smbbucket_1.txt hadoop/hive/trunk/data/files/smbbucket_2.txt hadoop/hive/trunk/data/files/smbbucket_3.txt hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/AbstractMapJoinOperator.java hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/SMBMapJoinOperator.java hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SortedMergeBucketMapJoinOptimizer.java hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/SMBJoinDesc.java hadoop/hive/trunk/ql/src/test/queries/clientnegative/smb_bucketmapjoin.q hadoop/hive/trunk/ql/src/test/queries/clientpositive/smb_mapjoin_1.q hadoop/hive/trunk/ql/src/test/queries/clientpositive/smb_mapjoin_2.q hadoop/hive/trunk/ql/src/test/queries/clientpositive/smb_mapjoin_3.q hadoop/hive/trunk/ql/src/test/queries/clientpositive/smb_mapjoin_4.q hadoop/hive/trunk/ql/src/test/queries/clientpositive/smb_mapjoin_5.q hadoop/hive/trunk/ql/src/test/queries/clientpositive/smb_mapjoin_6.q hadoop/hive/trunk/ql/src/test/results/clientnegative/smb_bucketmapjoin.q.out hadoop/hive/trunk/ql/src/test/results/clientpositive/smb_mapjoin_1.q.out hadoop/hive/trunk/ql/src/test/results/clientpositive/smb_mapjoin_2.q.out hadoop/hive/trunk/ql/src/test/results/clientpositive/smb_mapjoin_3.q.out hadoop/hive/trunk/ql/src/test/results/clientpositive/smb_mapjoin_4.q.out hadoop/hive/trunk/ql/src/test/results/clientpositive/smb_mapjoin_5.q.out hadoop/hive/trunk/ql/src/test/results/clientpositive/smb_mapjoin_6.q.out Modified: hadoop/hive/trunk/CHANGES.txt hadoop/hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/CommonJoinOperator.java hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecMapper.java hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorFactory.java hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/BucketMapJoinOptimizer.java hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRFileSink1.java hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRProcContext.java hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRUnion1.java hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinFactory.java hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinProcessor.java hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/BaseSemanticAnalyzer.java hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/ParseContext.java hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/JoinDesc.java hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/MapJoinDesc.java Modified: hadoop/hive/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/CHANGES.txt?rev=919252&r1=919251&r2=919252&view=diff ============================================================================== --- hadoop/hive/trunk/CHANGES.txt (original) +++ hadoop/hive/trunk/CHANGES.txt Fri Mar 5 00:53:43 2010 @@ -45,6 +45,9 @@ HIVE-1197. Add BucketizedHiveInputFormat (Siying Dong via namit) + HIVE-1194. Add sort merge join + (He Yongqiang via namit) + IMPROVEMENTS HIVE-983. Function from_unixtime takes long. (Ning Zhang via zshao) Modified: hadoop/hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java?rev=919252&r1=919251&r2=919252&view=diff ============================================================================== --- hadoop/hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java (original) +++ hadoop/hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java Fri Mar 5 00:53:43 2010 @@ -200,6 +200,7 @@ HIVEOPTPPD("hive.optimize.ppd", true), // predicate pushdown HIVEOPTGROUPBY("hive.optimize.groupby", true), // optimize group by HIVEOPTBUCKETMAPJOIN("hive.optimize.bucketmapjoin", false), // optimize bucket map join + HIVEOPTSORTMERGEBUCKETMAPJOIN("hive.optimize.bucketmapjoin.sortedmerge", false), // try to use sorted merge bucket map join ; public final String varname; Added: hadoop/hive/trunk/data/files/smbbucket_1.txt URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/data/files/smbbucket_1.txt?rev=919252&view=auto ============================================================================== --- hadoop/hive/trunk/data/files/smbbucket_1.txt (added) +++ hadoop/hive/trunk/data/files/smbbucket_1.txt Fri Mar 5 00:53:43 2010 @@ -0,0 +1,5 @@ +1val_1 +3val_3 +4val_4 +5val_5 +10val_10 Added: hadoop/hive/trunk/data/files/smbbucket_2.txt URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/data/files/smbbucket_2.txt?rev=919252&view=auto ============================================================================== --- hadoop/hive/trunk/data/files/smbbucket_2.txt (added) +++ hadoop/hive/trunk/data/files/smbbucket_2.txt Fri Mar 5 00:53:43 2010 @@ -0,0 +1,4 @@ +20val_20 +23val_23 +25val_25 +30val_30 Added: hadoop/hive/trunk/data/files/smbbucket_3.txt URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/data/files/smbbucket_3.txt?rev=919252&view=auto ============================================================================== --- hadoop/hive/trunk/data/files/smbbucket_3.txt (added) +++ hadoop/hive/trunk/data/files/smbbucket_3.txt Fri Mar 5 00:53:43 2010 @@ -0,0 +1,6 @@ +4val_4 +10val_10 +17val_17 +19val_19 +20val_20 +23val_23 Added: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/AbstractMapJoinOperator.java URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/AbstractMapJoinOperator.java?rev=919252&view=auto ============================================================================== --- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/AbstractMapJoinOperator.java (added) +++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/AbstractMapJoinOperator.java Fri Mar 5 00:53:43 2010 @@ -0,0 +1,154 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.exec; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.HashMap; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.exec.MapJoinOperator.MapJoinObjectCtx; +import org.apache.hadoop.hive.ql.exec.persistence.HashMapWrapper; +import org.apache.hadoop.hive.ql.exec.persistence.MapJoinObjectKey; +import org.apache.hadoop.hive.ql.exec.persistence.MapJoinObjectValue; +import org.apache.hadoop.hive.ql.exec.persistence.RowContainer; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.plan.MapJoinDesc; +import org.apache.hadoop.hive.ql.plan.TableDesc; +import org.apache.hadoop.hive.serde2.SerDe; +import org.apache.hadoop.hive.ql.plan.api.OperatorType; +import org.apache.hadoop.hive.serde2.SerDeException; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils; +import org.apache.hadoop.hive.serde2.objectinspector.StructField; +import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption; +import org.apache.hadoop.util.ReflectionUtils; + + +public abstract class AbstractMapJoinOperator extends CommonJoinOperator implements + Serializable { + private static final long serialVersionUID = 1L; + + /** + * The expressions for join inputs's join keys. + */ + protected transient Map> joinKeys; + /** + * The ObjectInspectors for the join inputs's join keys. + */ + protected transient Map> joinKeysObjectInspectors; + /** + * The standard ObjectInspectors for the join inputs's join keys. + */ + protected transient Map> joinKeysStandardObjectInspectors; + + protected transient int posBigTable = -1; // one of the tables that is not in memory + transient int mapJoinRowsKey; // rows for a given key + + protected transient RowContainer> emptyList = null; + + transient int numMapRowsRead; + + private static final transient String[] FATAL_ERR_MSG = { + null, // counter value 0 means no error + "Mapside join size exceeds hive.mapjoin.maxsize. " + + "Please increase that or remove the mapjoin hint." + }; + + transient boolean firstRow; + transient int heartbeatInterval; + + public AbstractMapJoinOperator() { + } + + public AbstractMapJoinOperator(AbstractMapJoinOperator mjop) { + super((CommonJoinOperator)mjop); + } + + @Override + protected void initializeOp(Configuration hconf) throws HiveException { + super.initializeOp(hconf); + + numMapRowsRead = 0; + firstRow = true; + heartbeatInterval = HiveConf.getIntVar(hconf, + HiveConf.ConfVars.HIVESENDHEARTBEAT); + + joinKeys = new HashMap>(); + + populateJoinKeyValue(joinKeys, conf.getKeys()); + joinKeysObjectInspectors = getObjectInspectorsFromEvaluators(joinKeys, + inputObjInspectors); + joinKeysStandardObjectInspectors = getStandardObjectInspectors(joinKeysObjectInspectors); + + // all other tables are small, and are cached in the hash table + posBigTable = conf.getPosBigTable(); + + emptyList = new RowContainer>(1, hconf); + RowContainer bigPosRC = getRowContainer(hconf, (byte) posBigTable, + order[posBigTable], joinCacheSize); + storage.put((byte) posBigTable, bigPosRC); + + mapJoinRowsKey = HiveConf.getIntVar(hconf, + HiveConf.ConfVars.HIVEMAPJOINROWSIZE); + + List structFields = ((StructObjectInspector) outputObjInspector) + .getAllStructFieldRefs(); + if (conf.getOutputColumnNames().size() < structFields.size()) { + List structFieldObjectInspectors = new ArrayList(); + for (Byte alias : order) { + int sz = conf.getExprs().get(alias).size(); + List retained = conf.getRetainList().get(alias); + for (int i = 0; i < sz; i++) { + int pos = retained.get(i); + structFieldObjectInspectors.add(structFields.get(pos) + .getFieldObjectInspector()); + } + } + outputObjInspector = ObjectInspectorFactory + .getStandardStructObjectInspector(conf.getOutputColumnNames(), + structFieldObjectInspectors); + } + initializeChildren(hconf); + } + + @Override + protected void fatalErrorMessage(StringBuilder errMsg, long counterCode) { + errMsg.append("Operator " + getOperatorId() + " (id=" + id + "): " + + FATAL_ERR_MSG[(int) counterCode]); + } + + protected void reportProgress() { + // Send some status periodically + numMapRowsRead++; + if (((numMapRowsRead % heartbeatInterval) == 0) && (reporter != null)) { + reporter.progress(); + } + } + + @Override + public int getType() { + return OperatorType.MAPJOIN; + } +} Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/CommonJoinOperator.java URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/CommonJoinOperator.java?rev=919252&r1=919251&r2=919252&view=diff ============================================================================== --- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/CommonJoinOperator.java (original) +++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/CommonJoinOperator.java Fri Mar 5 00:53:43 2010 @@ -110,7 +110,7 @@ // be output protected transient JoinCondDesc[] condn; protected transient boolean noOuterJoin; - private transient Object[] dummyObj; // for outer joins, contains the + protected transient Object[] dummyObj; // for outer joins, contains the // potential nulls for the concerned // aliases protected transient RowContainer>[] dummyObjVectors; // empty @@ -140,6 +140,48 @@ transient boolean handleSkewJoin = false; + public CommonJoinOperator() { + } + + public CommonJoinOperator(CommonJoinOperator clone) { + this.joinEmitInterval = clone.joinEmitInterval; + this.joinCacheSize = clone.joinCacheSize; + this.nextSz = clone.nextSz; + this.childOperators = clone.childOperators; + this.parentOperators = clone.parentOperators; + this.counterNames = clone.counterNames; + this.counterNameToEnum = clone.counterNameToEnum; + this.done = clone.done; + this.operatorId = clone.operatorId; + this.storage = clone.storage; + this.condn = clone.condn; + + this.setSchema(clone.getSchema()); + + this.alias = clone.alias; + this.beginTime = clone.beginTime; + this.inputRows = clone.inputRows; + this.childOperatorsArray = clone.childOperatorsArray; + this.childOperatorsTag = clone.childOperatorsTag; + this.colExprMap = clone.colExprMap; + this.counters = clone.counters; + this.dummyObj = clone.dummyObj; + this.dummyObjVectors = clone.dummyObjVectors; + this.forwardCache = clone.forwardCache; + this.groupKeyObject = clone.groupKeyObject; + this.handleSkewJoin = clone.handleSkewJoin; + this.hconf = clone.hconf; + this.id = clone.id; + this.inputObjInspectors = clone.inputObjInspectors; + this.inputRows = clone.inputRows; + this.noOuterJoin = clone.noOuterJoin; + this.numAliases = clone.numAliases; + this.operatorId = clone.operatorId; + this.posToAliasMap = clone.posToAliasMap; + this.spillTableDesc = clone.spillTableDesc; + this.statsMap = clone.statsMap; + } + protected int populateJoinKeyValue(Map> outMap, Map> inputMap) { @@ -224,8 +266,6 @@ protected void initializeOp(Configuration hconf) throws HiveException { this.handleSkewJoin = conf.getHandleSkewJoin(); this.hconf = hconf; - LOG.info("COMMONJOIN " - + ((StructObjectInspector) inputObjInspectors[0]).getTypeName()); totalSz = 0; // Map that contains the rows for each alias storage = new HashMap>>(); @@ -699,7 +739,7 @@ * maintained (inputNulls) where each entry denotes whether the element is to * be used or not (whether it is null or not). The size of the bitvector is * same as the number of inputs under consideration currently. When all inputs - * are accounted for, the output is forwared appropriately. + * are accounted for, the output is forwarded appropriately. */ private void genObject(ArrayList inputNulls, int aliasNum, IntermediateObject intObj, boolean firstRow) throws HiveException { Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecMapper.java URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecMapper.java?rev=919252&r1=919251&r2=919252&view=diff ============================================================================== --- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecMapper.java (original) +++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecMapper.java Fri Mar 5 00:53:43 2010 @@ -68,6 +68,32 @@ private long nextCntr = 1; private String lastInputFile = null; private MapredLocalWork localWork = null; + + private ExecMapperContext execContext = new ExecMapperContext(); + + public static class ExecMapperContext { + boolean inputFileChanged = false; + String currentInputFile; + JobConf jc; + public boolean isInputFileChanged() { + return inputFileChanged; + } + public void setInputFileChanged(boolean inputFileChanged) { + this.inputFileChanged = inputFileChanged; + } + public String getCurrentInputFile() { + return currentInputFile; + } + public void setCurrentInputFile(String currentInputFile) { + this.currentInputFile = currentInputFile; + } + public JobConf getJc() { + return jc; + } + public void setJc(JobConf jc) { + this.jc = jc; + } + } @Override public void configure(JobConf job) { @@ -86,6 +112,7 @@ } try { jc = job; + execContext.jc = jc; // create map and fetch operators MapredWork mrwork = Utilities.getMapRedWork(job); mo = new MapOperator(); @@ -93,8 +120,10 @@ // initialize map operator mo.setChildren(job); l4j.info(mo.dump(0)); + mo.setExecContext(execContext); + mo.initializeLocalWork(jc); mo.initialize(jc, null); - + // initialize map local work localWork = mrwork.getMapLocalWork(); if (localWork == null) { @@ -112,6 +141,7 @@ for (Map.Entry entry : fetchOperators.entrySet()) { Operator forwardOp = localWork.getAliasToWork() .get(entry.getKey()); + forwardOp.setExecContext(execContext); // All the operators need to be initialized before process forwardOp.initialize(jc, new ObjectInspector[] {entry.getValue() .getOutputObjectInspector()}); @@ -141,11 +171,12 @@ mo.setReporter(rp); } - if (localWork != null - && (this.lastInputFile == null || - (localWork.getInputFileChangeSensitive() && inputFileChanged()))) { + if(inputFileChanged()) { + if (this.localWork != null + && (localWork.getInputFileChangeSensitive() || this.lastInputFile == null)) { + processMapLocalWork(localWork.getInputFileChangeSensitive()); + } this.lastInputFile = HiveConf.getVar(jc, HiveConf.ConfVars.HADOOPMAPFILENAME); - processMapLocalWork(localWork.getInputFileChangeSensitive()); } try { @@ -188,10 +219,13 @@ */ private boolean inputFileChanged() { String currentInputFile = HiveConf.getVar(jc, HiveConf.ConfVars.HADOOPMAPFILENAME); + execContext.currentInputFile = currentInputFile; if (this.lastInputFile == null || !this.lastInputFile.equals(currentInputFile)) { + execContext.inputFileChanged = true; return true; } + execContext.inputFileChanged = false; return false; } Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java?rev=919252&r1=919251&r2=919252&view=diff ============================================================================== --- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java (original) +++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java Fri Mar 5 00:53:43 2010 @@ -18,7 +18,6 @@ package org.apache.hadoop.hive.ql.exec; -import java.io.File; import java.io.Serializable; import java.util.ArrayList; import java.util.HashMap; @@ -50,32 +49,14 @@ /** * Map side Join operator implementation. */ -public class MapJoinOperator extends CommonJoinOperator implements +public class MapJoinOperator extends AbstractMapJoinOperator implements Serializable { private static final long serialVersionUID = 1L; private static final Log LOG = LogFactory.getLog(MapJoinOperator.class .getName()); - /** - * The expressions for join inputs's join keys. - */ - protected transient Map> joinKeys; - /** - * The ObjectInspectors for the join inputs's join keys. - */ - protected transient Map> joinKeysObjectInspectors; - /** - * The standard ObjectInspectors for the join inputs's join keys. - */ - protected transient Map> joinKeysStandardObjectInspectors; - - private transient int posBigTable; // one of the tables that is not in memory - transient int mapJoinRowsKey; // rows for a given key - protected transient Map> mapJoinTables; - protected transient RowContainer> emptyList = null; - private static final transient String[] FATAL_ERR_MSG = { null, // counter value 0 means no error "Mapside join size exceeds hive.mapjoin.maxsize. " @@ -135,43 +116,30 @@ return mapMetadata; } - transient boolean firstRow; - transient int metadataKeyTag; transient int[] metadataValueTag; - transient List hTables; - transient int numMapRowsRead; - transient int heartbeatInterval; transient int maxMapJoinSize; + + public MapJoinOperator() { + } + + public MapJoinOperator(AbstractMapJoinOperator mjop) { + super(mjop); + } @Override protected void initializeOp(Configuration hconf) throws HiveException { super.initializeOp(hconf); - numMapRowsRead = 0; - - firstRow = true; - heartbeatInterval = HiveConf.getIntVar(hconf, - HiveConf.ConfVars.HIVESENDHEARTBEAT); + maxMapJoinSize = HiveConf.getIntVar(hconf, HiveConf.ConfVars.HIVEMAXMAPJOINSIZE); - joinKeys = new HashMap>(); - - populateJoinKeyValue(joinKeys, conf.getKeys()); - joinKeysObjectInspectors = getObjectInspectorsFromEvaluators(joinKeys, - inputObjInspectors); - joinKeysStandardObjectInspectors = getStandardObjectInspectors(joinKeysObjectInspectors); - - // all other tables are small, and are cached in the hash table - posBigTable = conf.getPosBigTable(); - metadataValueTag = new int[numAliases]; for (int pos = 0; pos < numAliases; pos++) { metadataValueTag[pos] = -1; } mapJoinTables = new HashMap>(); - hTables = new ArrayList(); // initialize the hash tables for other tables for (int pos = 0; pos < numAliases; pos++) { @@ -186,33 +154,6 @@ mapJoinTables.put(Byte.valueOf((byte) pos), hashTable); } - - emptyList = new RowContainer>(1, hconf); - RowContainer bigPosRC = getRowContainer(hconf, (byte) posBigTable, - order[posBigTable], joinCacheSize); - storage.put((byte) posBigTable, bigPosRC); - - mapJoinRowsKey = HiveConf.getIntVar(hconf, - HiveConf.ConfVars.HIVEMAPJOINROWSIZE); - - List structFields = ((StructObjectInspector) outputObjInspector) - .getAllStructFieldRefs(); - if (conf.getOutputColumnNames().size() < structFields.size()) { - List structFieldObjectInspectors = new ArrayList(); - for (Byte alias : order) { - int sz = conf.getExprs().get(alias).size(); - List retained = conf.getRetainList().get(alias); - for (int i = 0; i < sz; i++) { - int pos = retained.get(i); - structFieldObjectInspectors.add(structFields.get(pos) - .getFieldObjectInspector()); - } - } - outputObjInspector = ObjectInspectorFactory - .getStandardStructObjectInspector(conf.getOutputColumnNames(), - structFieldObjectInspectors); - } - initializeChildren(hconf); } @Override @@ -258,11 +199,7 @@ firstRow = false; } - // Send some status periodically - numMapRowsRead++; - if (((numMapRowsRead % heartbeatInterval) == 0) && (reporter != null)) { - reporter.progress(); - } + reportProgress(); if ((numMapRowsRead > maxMapJoinSize) && (reporter != null) && (counterNameToEnum != null)) { @@ -380,7 +317,7 @@ } super.closeOp(abort); } - + /** * Implements the getName function for the Node Interface. * Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java?rev=919252&r1=919251&r2=919252&view=diff ============================================================================== --- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java (original) +++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java Fri Mar 5 00:53:43 2010 @@ -29,6 +29,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.ql.exec.ExecMapper.ExecMapperContext; import org.apache.hadoop.hive.ql.lib.Node; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.plan.Explain; @@ -67,6 +68,8 @@ * run-time while extracting the operator specific counts. */ protected HashMap counterNameToEnum; + + private transient ExecMapperContext execContext; private static int seqId; @@ -284,7 +287,7 @@ } return true; } - + /** * Initializes operators only if all parents have been initialized. Calls * operator specific initializer which then initializes child ops. @@ -340,11 +343,23 @@ } // derived classes can set this to different object if needed outputObjInspector = inputObjInspectors[0]; - + + //pass the exec context to child operators + passExecContext(this.execContext); + initializeOp(hconf); LOG.info("Initialization Done " + id + " " + getName()); } - + + public void initializeLocalWork(Configuration hconf) throws HiveException { + if (childOperators != null) { + for (int i =0; i childOp = this.childOperators.get(i); + childOp.initializeLocalWork(hconf); + } + } + } + /** * Operator specific initialization. */ @@ -371,6 +386,18 @@ } } } + + /** + * Pass the execContext reference to every child operator + */ + public void passExecContext(ExecMapperContext execContext) { + this.setExecContext(execContext); + if(childOperators != null) { + for (int i = 0; i < childOperators.size(); i++) { + childOperators.get(i).passExecContext(execContext); + } + } + } /** * Collects all the parent's output object inspectors and calls actual @@ -398,7 +425,7 @@ // call the actual operator initialization function initialize(hconf, null); } - + /** * Process the row. * @@ -470,7 +497,7 @@ LOG.debug("End group Done"); } - private boolean allInitializedParentsAreClosed() { + protected boolean allInitializedParentsAreClosed() { if (parentOperators != null) { for (Operator parent : parentOperators) { if (!(parent.state == State.CLOSE || parent.state == State.UNINIT)) { @@ -1117,4 +1144,18 @@ public Object getGroupKeyObject() { return groupKeyObject; } + + public ExecMapperContext getExecContext() { + return execContext; + } + + public void setExecContext(ExecMapperContext execContext) { + this.execContext = execContext; + if(this.childOperators != null) { + for (int i = 0; i op = this.childOperators.get(i); + op.setExecContext(execContext); + } + } + } } Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorFactory.java URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorFactory.java?rev=919252&r1=919251&r2=919252&view=diff ============================================================================== --- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorFactory.java (original) +++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorFactory.java Fri Mar 5 00:53:43 2010 @@ -33,6 +33,7 @@ import org.apache.hadoop.hive.ql.plan.LimitDesc; import org.apache.hadoop.hive.ql.plan.MapJoinDesc; import org.apache.hadoop.hive.ql.plan.ReduceSinkDesc; +import org.apache.hadoop.hive.ql.plan.SMBJoinDesc; import org.apache.hadoop.hive.ql.plan.ScriptDesc; import org.apache.hadoop.hive.ql.plan.SelectDesc; import org.apache.hadoop.hive.ql.plan.TableScanDesc; @@ -74,6 +75,7 @@ opvec.add(new OpTuple(GroupByDesc.class, GroupByOperator.class)); opvec.add(new OpTuple(JoinDesc.class, JoinOperator.class)); opvec.add(new OpTuple(MapJoinDesc.class, MapJoinOperator.class)); + opvec.add(new OpTuple(SMBJoinDesc.class, SMBMapJoinOperator.class)); opvec.add(new OpTuple(LimitDesc.class, LimitOperator.class)); opvec.add(new OpTuple(TableScanDesc.class, TableScanOperator.class)); opvec.add(new OpTuple(UnionDesc.class, UnionOperator.class)); Added: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/SMBMapJoinOperator.java URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/SMBMapJoinOperator.java?rev=919252&view=auto ============================================================================== --- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/SMBMapJoinOperator.java (added) +++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/SMBMapJoinOperator.java Fri Mar 5 00:53:43 2010 @@ -0,0 +1,519 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements.See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership.The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License.You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.exec; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.exec.persistence.RowContainer; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.plan.FetchWork; +import org.apache.hadoop.hive.ql.plan.MapJoinDesc; +import org.apache.hadoop.hive.ql.plan.MapredLocalWork; +import org.apache.hadoop.hive.ql.plan.SMBJoinDesc; +import org.apache.hadoop.hive.ql.plan.MapredLocalWork.BucketMapJoinContext; +import org.apache.hadoop.hive.ql.plan.api.OperatorType; +import org.apache.hadoop.hive.serde2.objectinspector.InspectableObject; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.io.WritableComparable; +import org.apache.hadoop.io.WritableComparator; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.util.ReflectionUtils; + +/** + * Sorted Merge Map Join Operator. + */ +public class SMBMapJoinOperator extends AbstractMapJoinOperator implements + Serializable { + + private static final long serialVersionUID = 1L; + + private static final Log LOG = LogFactory.getLog(SMBMapJoinOperator.class + .getName()); + + private MapredLocalWork localWork = null; + private Map fetchOperators; + transient Map> keyWritables; + transient Map> nextKeyWritables; + HashMap>> nextGroupStorage; + HashMap>> candidateStorage; + + transient HashMap tagToAlias; + private transient HashMap fetchOpDone = new HashMap(); + private transient HashMap foundNextKeyGroup = new HashMap(); + transient boolean firstFetchHappened = false; + transient boolean localWorkInited = false; + + public SMBMapJoinOperator() { + } + + public SMBMapJoinOperator(AbstractMapJoinOperator mapJoinOp) { + super(mapJoinOp); + } + + @Override + protected void initializeOp(Configuration hconf) throws HiveException { + super.initializeOp(hconf); + + firstRow = true; + + closeCalled = false; + + nextGroupStorage = new HashMap>>(); + candidateStorage = new HashMap>>(); + int bucketSize = HiveConf.getIntVar(hconf, + HiveConf.ConfVars.HIVEMAPJOINBUCKETCACHESIZE); + byte storePos = (byte) 0; + for (Byte alias : order) { + RowContainer rc = getRowContainer(hconf, storePos, alias, bucketSize); + nextGroupStorage.put((byte) storePos, rc); + RowContainer candidateRC = getRowContainer(hconf, storePos, alias, + bucketSize); + candidateStorage.put(alias, candidateRC); + storePos++; + } + tagToAlias = conf.getTagToAlias(); + keyWritables = new HashMap>(); + nextKeyWritables = new HashMap>(); + + for (Byte alias : order) { + if(alias != (byte) posBigTable) { + fetchOpDone.put(alias, Boolean.FALSE);; + } + foundNextKeyGroup.put(alias, Boolean.FALSE); + } + } + + @Override + public void initializeLocalWork(Configuration hconf) throws HiveException { + initializeMapredLocalWork(this.getConf(), hconf, this.getConf().getLocalWork(), LOG); + super.initializeLocalWork(hconf); + } + + public void initializeMapredLocalWork(MapJoinDesc conf, Configuration hconf, + MapredLocalWork localWork, Log l4j) throws HiveException { + if (localWork == null || localWorkInited) { + return; + } + localWorkInited = true; + this.localWork = localWork; + fetchOperators = new HashMap(); + // create map local operators + for (Map.Entry entry : localWork.getAliasToFetchWork() + .entrySet()) { + fetchOperators.put(entry.getKey(), new FetchOperator(entry.getValue(), + new JobConf(hconf))); + if (l4j != null) { + l4j.info("fetchoperator for " + entry.getKey() + " created"); + } + } + + for (Map.Entry entry : fetchOperators.entrySet()) { + Operator forwardOp = localWork.getAliasToWork() + .get(entry.getKey()); + // All the operators need to be initialized before process + forwardOp.setExecContext(this.getExecContext()); + forwardOp.initialize(this.getExecContext().jc, new ObjectInspector[] {entry.getValue() + .getOutputObjectInspector()}); + l4j.info("fetchoperator for " + entry.getKey() + " initialized"); + } + } + + @Override + public void processOp(Object row, int tag) throws HiveException { + + if (this.getExecContext().inputFileChanged) { + if(firstFetchHappened) { + //we need to first join and flush out data left by the previous file. + joinFinalLeftData(); + } + //set up the fetch operator for the new input file. + for (Map.Entry entry : fetchOperators.entrySet()) { + String alias = entry.getKey(); + FetchOperator fetchOp = entry.getValue(); + fetchOp.clearFetchContext(); + setUpFetchOpContext(fetchOp, alias); + } + this.getExecContext().inputFileChanged = false; + firstFetchHappened = false; + } + + if (!firstFetchHappened) { + firstFetchHappened = true; + // fetch the first group for all small table aliases + for (Byte t : order) { + if(t != (byte)posBigTable) { + fetchNextGroup(t); + } + } + } + + byte alias = (byte) tag; + // compute keys and values as StandardObjects + ArrayList key = computeValues(row, joinKeys.get(alias), + joinKeysObjectInspectors.get(alias)); + ArrayList value = computeValues(row, joinValues.get(alias), + joinValuesObjectInspectors.get(alias)); + + //have we reached a new key group? + boolean nextKeyGroup = processKey(alias, key); + if (nextKeyGroup) { + //assert this.nextGroupStorage.get(alias).size() == 0; + this.nextGroupStorage.get(alias).add(value); + foundNextKeyGroup.put((byte) tag, Boolean.TRUE); + if (tag != posBigTable) { + return; + } + } + + reportProgress(); + + // the big table has reached a new key group. try to let the small tables + // catch up with the big table. + if (nextKeyGroup) { + assert tag == (byte)posBigTable; + List smallestPos = null; + do { + smallestPos = joinOneGroup(); + //jump out the loop if we need input from the big table + } while (smallestPos != null && smallestPos.size() > 0 + && !smallestPos.contains((byte)this.posBigTable)); + + return; + } + + assert !nextKeyGroup; + candidateStorage.get((byte) tag).add(value); + } + + /* + * this happens either when the input file of the big table is changed or in + * closeop. It needs to fetch all the left data from the small tables and try + * to join them. + */ + private void joinFinalLeftData() throws HiveException { + RowContainer bigTblRowContainer = this.candidateStorage.get((byte)this.posBigTable); + + boolean allFetchOpDone = allFetchOpDone(); + // if all left data in small tables are less than and equal to the left data + // in big table, let's them catch up + while (bigTblRowContainer != null && bigTblRowContainer.size() > 0 + && !allFetchOpDone) { + joinOneGroup(); + bigTblRowContainer = this.candidateStorage.get((byte)this.posBigTable); + allFetchOpDone = allFetchOpDone(); + } + + if (allFetchOpDone + && this.candidateStorage.get((byte) this.posBigTable).size() > 0) { + // if all fetch operator for small tables are done and there are data left + // in big table + for (byte t : order) { + if(this.foundNextKeyGroup.get(t) && this.nextKeyWritables.get(t) != null) { + promoteNextGroupToCandidate(t); + } + } + joinOneGroup(); + } else { + while (!allFetchOpDone) { + List ret = joinOneGroup(); + if (ret == null || ret.size() == 0) { + break; + } + + reportProgress(); + + allFetchOpDone = allFetchOpDone(); + } + //one final table left + for (byte t : order) { + if(this.foundNextKeyGroup.get(t) && this.nextKeyWritables.get(t) != null) { + promoteNextGroupToCandidate(t); + } + } + joinOneGroup(); + } + } + + private boolean allFetchOpDone() { + boolean allFetchOpDone = true; + for (Byte tag : order) { + if(tag == (byte) posBigTable) { + continue; + } + allFetchOpDone = allFetchOpDone && fetchOpDone.get(tag); + } + return allFetchOpDone; + } + + private List joinOneGroup() throws HiveException { + int smallestPos = -1; + smallestPos = findMostSmallKey(); + List listOfNeedFetchNext = null; + if(smallestPos >= 0) { + listOfNeedFetchNext = joinObject(smallestPos); + if (listOfNeedFetchNext.size() > 0) { + // listOfNeedFetchNext contains all tables that we have joined data in their + // candidateStorage, and we need to clear candidate storage and promote their + // nextGroupStorage to candidateStorage and fetch data until we reach a + // new group. + for (Byte b : listOfNeedFetchNext) { + fetchNextGroup(b); + } + } + } + return listOfNeedFetchNext; + } + + private List joinObject(int smallestPos) throws HiveException { + List needFetchList = new ArrayList(); + ArrayList smallKey = keyWritables.get((byte) smallestPos); + needFetchList.add((byte)smallestPos); + this.storage.put((byte) smallestPos, this.candidateStorage.get((byte) smallestPos)); + for (Byte i : order) { + if ((byte) smallestPos == i) { + continue; + } + ArrayList key = keyWritables.get(i); + if (key == null) { + putDummyOrEmpty(i); + } else { + int cmp = compareKeys(key, smallKey); + if (cmp == 0) { + this.storage.put((byte) i, this.candidateStorage + .get((byte) i)); + needFetchList.add(i); + continue; + } else { + putDummyOrEmpty(i); + } + } + } + checkAndGenObject(); + for (Byte pos : needFetchList) { + this.candidateStorage.get(pos).clear(); + this.keyWritables.remove(pos); + } + return needFetchList; + } + + private void fetchNextGroup(Byte t) throws HiveException { + if (foundNextKeyGroup.get(t)) { + // first promote the next group to be the current group if we reached a + // new group in the previous fetch + if (this.nextKeyWritables.get(t) != null) { + promoteNextGroupToCandidate(t); + } else { + this.keyWritables.remove(t); + this.candidateStorage.remove(t); + this.nextGroupStorage.remove(t); + } + foundNextKeyGroup.put(t, Boolean.FALSE); + } + //for the big table, we only need to promote the next group to the current group. + if(t == (byte)posBigTable) { + return; + } + + //for tables other than the big table, we need to fetch more data until reach a new group or done. + while (!foundNextKeyGroup.get(t)) { + if (fetchOpDone.get(t)) { + break; + } + fetchOneRow(t); + } + if (!foundNextKeyGroup.get(t) && fetchOpDone.get(t)) { + this.nextKeyWritables.remove(t); + } + } + + private void promoteNextGroupToCandidate(Byte t) throws HiveException { + this.keyWritables.put(t, this.nextKeyWritables.get(t)); + this.nextKeyWritables.remove(t); + RowContainer> oldRowContainer = this.candidateStorage.get(t); + oldRowContainer.clear(); + this.candidateStorage.put(t, this.nextGroupStorage.get(t)); + this.nextGroupStorage.put(t, oldRowContainer); + } + + private int compareKeys (ArrayList k1, ArrayList k2) { + int ret = 0; + for (int i = 0; i < k1.size() && i < k1.size(); i++) { + WritableComparable key_1 = (WritableComparable) k1.get(i); + WritableComparable key_2 = (WritableComparable) k2.get(i); + ret = WritableComparator.get(key_1.getClass()).compare(key_1, key_2); + if(ret != 0) { + return ret; + } + } + return k1.size() - k2.size(); + } + + private void putDummyOrEmpty(Byte i) { + // put a empty list or null + if (noOuterJoin) { + storage.put(i, emptyList); + } else { + storage.put(i, dummyObjVectors[i.intValue()]); + } + } + + private int findMostSmallKey() { + byte index = -1; + ArrayList mostSmallOne = null; + + for (byte i : order) { + ArrayList key = keyWritables.get(i); + if (key == null) { + continue; + } + if (mostSmallOne == null) { + mostSmallOne = key; + index = i; + continue; + } + int cmp = compareKeys(key, mostSmallOne); + if (cmp < 0) { + mostSmallOne = key; + index = i; + continue; + } + } + return index; + } + + private boolean processKey(byte alias, ArrayList key) + throws HiveException { + ArrayList keyWritable = keyWritables.get(alias); + if (keyWritable == null) { + //the first group. + keyWritables.put(alias, key); + return false; + } else { + int cmp = compareKeys(key, keyWritable);; + if (cmp != 0) { + nextKeyWritables.put(alias, key); + return true; + } + return false; + } + } + + private void setUpFetchOpContext(FetchOperator fetchOp, String alias) { + String currentInputFile = this.getExecContext().currentInputFile; + BucketMapJoinContext bucketMatcherCxt = this.localWork + .getBucketMapjoinContext(); + Class bucketMatcherCls = bucketMatcherCxt + .getBucketMatcherClass(); + BucketMatcher bucketMatcher = (BucketMatcher) ReflectionUtils.newInstance( + bucketMatcherCls, null); + bucketMatcher.setAliasBucketFileNameMapping(bucketMatcherCxt + .getAliasBucketFileNameMapping()); + List aliasFiles = bucketMatcher.getAliasBucketFiles(currentInputFile, + bucketMatcherCxt.getMapJoinBigTableAlias(), alias); + Iterator iter = aliasFiles.iterator(); + fetchOp.setupContext(iter, null); + } + + private void fetchOneRow(byte tag) { + if (fetchOperators != null) { + String tble = this.tagToAlias.get(tag); + FetchOperator fetchOp = fetchOperators.get(tble); + + Operator forwardOp = localWork.getAliasToWork() + .get(tble); + try { + InspectableObject row = fetchOp.getNextRow(); + if (row == null) { + this.fetchOpDone.put(tag, Boolean.TRUE); + return; + } + forwardOp.process(row.o, 0); + // check if any operator had a fatal error or early exit during + // execution + if (forwardOp.getDone()) { + this.fetchOpDone.put(tag, Boolean.TRUE); + } + } catch (Throwable e) { + if (e instanceof OutOfMemoryError) { + // Don't create a new object if we are already out of memory + throw (OutOfMemoryError) e; + } else { + throw new RuntimeException("Map local work failed", e); + } + } + } + } + + transient boolean closeCalled = false; + @Override + public void closeOp(boolean abort) throws HiveException { + if(closeCalled) { + return; + } + closeCalled = true; + joinFinalLeftData(); + this.firstFetchHappened = false; + //clean up + for (Byte alias : order) { + if(alias != (byte) posBigTable) { + fetchOpDone.put(alias, Boolean.FALSE);; + } + foundNextKeyGroup.put(alias, Boolean.FALSE); + } + + localWorkInited = false; + + super.closeOp(abort); + if (fetchOperators != null) { + for (Map.Entry entry : fetchOperators.entrySet()) { + Operator forwardOp = localWork + .getAliasToWork().get(entry.getKey()); + forwardOp.close(abort); + } + } + } + + protected boolean allInitializedParentsAreClosed() { + return true; + } + + /** + * Implements the getName function for the Node Interface. + * + * @return the name of the operator + */ + @Override + public String getName() { + return "MAPJOIN"; + } + + @Override + public int getType() { + return OperatorType.MAPJOIN; + } +} Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/BucketMapJoinOptimizer.java URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/BucketMapJoinOptimizer.java?rev=919252&r1=919251&r2=919252&view=diff ============================================================================== --- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/BucketMapJoinOptimizer.java (original) +++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/BucketMapJoinOptimizer.java Fri Mar 5 00:53:43 2010 @@ -192,6 +192,8 @@ for (int index = 0; index < joinAliases.size(); index++) { String alias = joinAliases.get(index); TableScanOperator tso = (TableScanOperator) topOps.get(alias); + if (tso == null) + return null; Table tbl = topToTable.get(tso); if(tbl.isPartitioned()) { PrunedPartitionList prunedParts = null; Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRFileSink1.java URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRFileSink1.java?rev=919252&r1=919251&r2=919252&view=diff ============================================================================== --- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRFileSink1.java (original) +++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRFileSink1.java Fri Mar 5 00:53:43 2010 @@ -26,6 +26,7 @@ import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.Context; +import org.apache.hadoop.hive.ql.exec.AbstractMapJoinOperator; import org.apache.hadoop.hive.ql.exec.ColumnInfo; import org.apache.hadoop.hive.ql.exec.ConditionalTask; import org.apache.hadoop.hive.ql.exec.FileSinkOperator; @@ -54,6 +55,7 @@ import org.apache.hadoop.hive.ql.plan.ExtractDesc; import org.apache.hadoop.hive.ql.plan.FileSinkDesc; import org.apache.hadoop.hive.ql.plan.LoadFileDesc; +import org.apache.hadoop.hive.ql.plan.MapJoinDesc; import org.apache.hadoop.hive.ql.plan.MapredWork; import org.apache.hadoop.hive.ql.plan.MoveWork; import org.apache.hadoop.hive.ql.plan.PartitionDesc; @@ -342,7 +344,7 @@ return dest; } - MapJoinOperator currMapJoinOp = ctx.getCurrMapJoinOp(); + AbstractMapJoinOperator currMapJoinOp = ctx.getCurrMapJoinOp(); if (currMapJoinOp != null) { opTaskMap.put(null, currTask); Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRProcContext.java URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRProcContext.java?rev=919252&r1=919251&r2=919252&view=diff ============================================================================== --- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRProcContext.java (original) +++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRProcContext.java Fri Mar 5 00:53:43 2010 @@ -26,6 +26,7 @@ import java.util.Set; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.exec.AbstractMapJoinOperator; import org.apache.hadoop.hive.ql.exec.FileSinkOperator; import org.apache.hadoop.hive.ql.exec.MapJoinOperator; import org.apache.hadoop.hive.ql.exec.Operator; @@ -35,6 +36,7 @@ import org.apache.hadoop.hive.ql.hooks.WriteEntity; import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx; import org.apache.hadoop.hive.ql.parse.ParseContext; +import org.apache.hadoop.hive.ql.plan.MapJoinDesc; import org.apache.hadoop.hive.ql.plan.TableDesc; /** @@ -140,7 +142,7 @@ String taskTmpDir; TableDesc tt_desc; Operator rootMapJoinOp; - MapJoinOperator oldMapJoin; + AbstractMapJoinOperator oldMapJoin; public GenMRMapJoinCtx() { taskTmpDir = null; @@ -157,7 +159,7 @@ */ public GenMRMapJoinCtx(String taskTmpDir, TableDesc tt_desc, Operator rootMapJoinOp, - MapJoinOperator oldMapJoin) { + AbstractMapJoinOperator oldMapJoin) { this.taskTmpDir = taskTmpDir; this.tt_desc = tt_desc; this.rootMapJoinOp = rootMapJoinOp; @@ -198,7 +200,7 @@ /** * @return the oldMapJoin */ - public MapJoinOperator getOldMapJoin() { + public AbstractMapJoinOperator getOldMapJoin() { return oldMapJoin; } @@ -206,7 +208,7 @@ * @param oldMapJoin * the oldMapJoin to set */ - public void setOldMapJoin(MapJoinOperator oldMapJoin) { + public void setOldMapJoin(AbstractMapJoinOperator oldMapJoin) { this.oldMapJoin = oldMapJoin; } } @@ -214,7 +216,7 @@ private HiveConf conf; private HashMap, Task> opTaskMap; private HashMap unionTaskMap; - private HashMap mapJoinTaskMap; + private HashMap, GenMRMapJoinCtx> mapJoinTaskMap; private List> seenOps; private List seenFileSinkOps; @@ -226,7 +228,7 @@ private Task currTask; private Operator currTopOp; private UnionOperator currUnionOp; - private MapJoinOperator currMapJoinOp; + private AbstractMapJoinOperator currMapJoinOp; private String currAliasId; private List> rootOps; @@ -289,7 +291,7 @@ rootOps = new ArrayList>(); rootOps.addAll(parseCtx.getTopOps().values()); unionTaskMap = new HashMap(); - mapJoinTaskMap = new HashMap(); + mapJoinTaskMap = new HashMap, GenMRMapJoinCtx>(); } /** @@ -456,7 +458,7 @@ this.currUnionOp = currUnionOp; } - public MapJoinOperator getCurrMapJoinOp() { + public AbstractMapJoinOperator getCurrMapJoinOp() { return currMapJoinOp; } @@ -464,7 +466,7 @@ * @param currMapJoinOp * current map join operator */ - public void setCurrMapJoinOp(MapJoinOperator currMapJoinOp) { + public void setCurrMapJoinOp(AbstractMapJoinOperator currMapJoinOp) { this.currMapJoinOp = currMapJoinOp; } @@ -491,11 +493,11 @@ unionTaskMap.put(op, uTask); } - public GenMRMapJoinCtx getMapJoinCtx(MapJoinOperator op) { + public GenMRMapJoinCtx getMapJoinCtx(AbstractMapJoinOperator op) { return mapJoinTaskMap.get(op); } - public void setMapJoinCtx(MapJoinOperator op, GenMRMapJoinCtx mjCtx) { + public void setMapJoinCtx(AbstractMapJoinOperator op, GenMRMapJoinCtx mjCtx) { mapJoinTaskMap.put(op, mjCtx); } Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRUnion1.java URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRUnion1.java?rev=919252&r1=919251&r2=919252&view=diff ============================================================================== --- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRUnion1.java (original) +++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRUnion1.java Fri Mar 5 00:53:43 2010 @@ -27,6 +27,7 @@ import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.Context; +import org.apache.hadoop.hive.ql.exec.AbstractMapJoinOperator; import org.apache.hadoop.hive.ql.exec.MapJoinOperator; import org.apache.hadoop.hive.ql.exec.Operator; import org.apache.hadoop.hive.ql.exec.OperatorFactory; @@ -45,6 +46,7 @@ import org.apache.hadoop.hive.ql.parse.ParseContext; import org.apache.hadoop.hive.ql.parse.SemanticException; import org.apache.hadoop.hive.ql.plan.FileSinkDesc; +import org.apache.hadoop.hive.ql.plan.MapJoinDesc; import org.apache.hadoop.hive.ql.plan.MapredWork; import org.apache.hadoop.hive.ql.plan.PartitionDesc; import org.apache.hadoop.hive.ql.plan.PlanUtils; @@ -141,7 +143,7 @@ // If there is a mapjoin at position 'pos' if (uPrsCtx.getMapJoinSubq(pos)) { - MapJoinOperator mjOp = ctx.getCurrMapJoinOp(); + AbstractMapJoinOperator mjOp = ctx.getCurrMapJoinOp(); assert mjOp != null; GenMRMapJoinCtx mjCtx = ctx.getMapJoinCtx(mjOp); assert mjCtx != null; Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java?rev=919252&r1=919251&r2=919252&view=diff ============================================================================== --- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java (original) +++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java Fri Mar 5 00:53:43 2010 @@ -33,11 +33,13 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.Context; +import org.apache.hadoop.hive.ql.exec.AbstractMapJoinOperator; import org.apache.hadoop.hive.ql.exec.JoinOperator; import org.apache.hadoop.hive.ql.exec.MapJoinOperator; import org.apache.hadoop.hive.ql.exec.Operator; import org.apache.hadoop.hive.ql.exec.OperatorFactory; import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator; +import org.apache.hadoop.hive.ql.exec.SMBMapJoinOperator; import org.apache.hadoop.hive.ql.exec.Task; import org.apache.hadoop.hive.ql.exec.TaskFactory; import org.apache.hadoop.hive.ql.exec.UnionOperator; @@ -157,7 +159,7 @@ // The mapjoin has already been encountered. Some context must be stored // about that if (readInputMapJoin) { - MapJoinOperator currMapJoinOp = opProcCtx.getCurrMapJoinOp(); + AbstractMapJoinOperator currMapJoinOp = (AbstractMapJoinOperator) opProcCtx.getCurrMapJoinOp(); assert currMapJoinOp != null; boolean local = ((pos == -1) || (pos == (currMapJoinOp.getConf()) .getPosBigTable())) ? false : true; @@ -217,7 +219,7 @@ seenOps.add(currTopOp); boolean local = (pos == desc.getPosBigTable()) ? false : true; setTaskPlan(currAliasId, currTopOp, plan, local, opProcCtx); - setupBucketMapJoinInfo(plan, (MapJoinOperator)op); + setupBucketMapJoinInfo(plan, (AbstractMapJoinOperator)op); } opProcCtx.setCurrTask(currTask); @@ -226,16 +228,35 @@ } private static void setupBucketMapJoinInfo(MapredWork plan, - MapJoinOperator currMapJoinOp) { + AbstractMapJoinOperator currMapJoinOp) { if (currMapJoinOp != null) { LinkedHashMap>> aliasBucketFileNameMapping = currMapJoinOp.getConf().getAliasBucketFileNameMapping(); if(aliasBucketFileNameMapping!= null) { MapredLocalWork localPlan = plan.getMapLocalWork(); - if (localPlan == null) { - localPlan = new MapredLocalWork( - new LinkedHashMap>(), - new LinkedHashMap()); + if(localPlan == null) { + if(currMapJoinOp instanceof SMBMapJoinOperator) { + localPlan = ((SMBMapJoinOperator)currMapJoinOp).getConf().getLocalWork(); + } + if (localPlan == null) { + localPlan = new MapredLocalWork( + new LinkedHashMap>(), + new LinkedHashMap()); + } + } else { + //local plan is not null, we want to merge it into SMBMapJoinOperator's local work + if(currMapJoinOp instanceof SMBMapJoinOperator) { + MapredLocalWork smbLocalWork = ((SMBMapJoinOperator)currMapJoinOp).getConf().getLocalWork(); + if(smbLocalWork != null) { + localPlan.getAliasToFetchWork().putAll(smbLocalWork.getAliasToFetchWork()); + localPlan.getAliasToWork().putAll(smbLocalWork.getAliasToWork()); + } + } + } + if(currMapJoinOp instanceof SMBMapJoinOperator) { + plan.setMapLocalWork(null); + ((SMBMapJoinOperator)currMapJoinOp).getConf().setLocalWork(localPlan); + } else { plan.setMapLocalWork(localPlan); } BucketMapJoinContext bucketMJCxt = new BucketMapJoinContext(); @@ -364,11 +385,14 @@ : true; } setTaskPlan(currAliasId, currTopOp, plan, local, opProcCtx); + if(op instanceof AbstractMapJoinOperator) { + setupBucketMapJoinInfo(plan, (AbstractMapJoinOperator)op); + } } currTopOp = null; opProcCtx.setCurrTopOp(currTopOp); } else if (opProcCtx.getCurrMapJoinOp() != null) { - MapJoinOperator mjOp = opProcCtx.getCurrMapJoinOp(); + AbstractMapJoinOperator mjOp = (AbstractMapJoinOperator) opProcCtx.getCurrMapJoinOp(); if (readUnionData) { initUnionPlan(opProcCtx, currTask, false); } else { @@ -376,7 +400,7 @@ // In case of map-join followed by map-join, the file needs to be // obtained from the old map join - MapJoinOperator oldMapJoin = mjCtx.getOldMapJoin(); + AbstractMapJoinOperator oldMapJoin = (AbstractMapJoinOperator) mjCtx.getOldMapJoin(); String taskTmpDir = null; TableDesc tt_desc = null; Operator rootOp = null; @@ -819,8 +843,8 @@ setTaskPlan(taskTmpDir, streamDesc, ts_op, cplan, local, tt_desc); // This can be cleaned up as a function table in future - if (op instanceof MapJoinOperator) { - MapJoinOperator mjOp = (MapJoinOperator) op; + if (op instanceof AbstractMapJoinOperator) { + AbstractMapJoinOperator mjOp = (AbstractMapJoinOperator) op; opProcCtx.setCurrMapJoinOp(mjOp); GenMRMapJoinCtx mjCtx = opProcCtx.getMapJoinCtx(mjOp); if (mjCtx == null) { Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinFactory.java URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinFactory.java?rev=919252&r1=919251&r2=919252&view=diff ============================================================================== --- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinFactory.java (original) +++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinFactory.java Fri Mar 5 00:53:43 2010 @@ -26,6 +26,7 @@ import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.Context; +import org.apache.hadoop.hive.ql.exec.AbstractMapJoinOperator; import org.apache.hadoop.hive.ql.exec.MapJoinOperator; import org.apache.hadoop.hive.ql.exec.Operator; import org.apache.hadoop.hive.ql.exec.OperatorFactory; @@ -43,6 +44,7 @@ import org.apache.hadoop.hive.ql.parse.ParseContext; import org.apache.hadoop.hive.ql.parse.SemanticException; import org.apache.hadoop.hive.ql.plan.FileSinkDesc; +import org.apache.hadoop.hive.ql.plan.MapJoinDesc; import org.apache.hadoop.hive.ql.plan.MapredWork; import org.apache.hadoop.hive.ql.plan.PlanUtils; import org.apache.hadoop.hive.ql.plan.TableDesc; @@ -52,7 +54,7 @@ */ public final class MapJoinFactory { - public static int getPositionParent(MapJoinOperator op, Stack stack) { + public static int getPositionParent(AbstractMapJoinOperator op, Stack stack) { int pos = 0; int size = stack.size(); assert size >= 2 && stack.get(size - 1) == op; @@ -72,7 +74,7 @@ @Override public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, Object... nodeOutputs) throws SemanticException { - MapJoinOperator mapJoin = (MapJoinOperator) nd; + AbstractMapJoinOperator mapJoin = (AbstractMapJoinOperator) nd; GenMRProcContext ctx = (GenMRProcContext) procCtx; // find the branch on which this processor was invoked @@ -122,7 +124,7 @@ @Override public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, Object... nodeOutputs) throws SemanticException { - MapJoinOperator mapJoin = (MapJoinOperator) nd; + AbstractMapJoinOperator mapJoin = (AbstractMapJoinOperator) nd; GenMRProcContext opProcCtx = (GenMRProcContext) procCtx; MapredWork cplan = GenMapRedUtils.getMapRedWork(); @@ -133,7 +135,7 @@ // find the branch on which this processor was invoked int pos = getPositionParent(mapJoin, stack); - boolean local = (pos == (mapJoin.getConf()).getPosBigTable()) ? false + boolean local = (pos == ((MapJoinDesc)(mapJoin.getConf())).getPosBigTable()) ? false : true; GenMapRedUtils.splitTasks(mapJoin, currTask, redTask, opProcCtx, false, @@ -180,7 +182,7 @@ Object... nodeOutputs) throws SemanticException { SelectOperator sel = (SelectOperator) nd; - MapJoinOperator mapJoin = (MapJoinOperator) sel.getParentOperators().get( + AbstractMapJoinOperator mapJoin = (AbstractMapJoinOperator) sel.getParentOperators().get( 0); assert sel.getParentOperators().size() == 1; @@ -188,7 +190,7 @@ ParseContext parseCtx = ctx.getParseCtx(); // is the mapjoin followed by a reducer - List listMapJoinOps = parseCtx + List> listMapJoinOps = parseCtx .getListMapJoinOpsNoReducer(); if (listMapJoinOps.contains(mapJoin)) { @@ -263,11 +265,11 @@ @Override public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, Object... nodeOutputs) throws SemanticException { - MapJoinOperator mapJoin = (MapJoinOperator) nd; + AbstractMapJoinOperator mapJoin = (AbstractMapJoinOperator) nd; GenMRProcContext ctx = (GenMRProcContext) procCtx; ctx.getParseCtx(); - MapJoinOperator oldMapJoin = ctx.getCurrMapJoinOp(); + AbstractMapJoinOperator oldMapJoin = ctx.getCurrMapJoinOp(); assert oldMapJoin != null; GenMRMapJoinCtx mjCtx = ctx.getMapJoinCtx(mapJoin); if (mjCtx != null) { @@ -335,7 +337,7 @@ UnionOperator currUnion = ctx.getCurrUnionOp(); assert currUnion != null; ctx.getUnionTask(currUnion); - MapJoinOperator mapJoin = (MapJoinOperator) nd; + AbstractMapJoinOperator mapJoin = (AbstractMapJoinOperator) nd; // find the branch on which this processor was invoked int pos = getPositionParent(mapJoin, stack); Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinProcessor.java URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinProcessor.java?rev=919252&r1=919251&r2=919252&view=diff ============================================================================== --- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinProcessor.java (original) +++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinProcessor.java Fri Mar 5 00:53:43 2010 @@ -28,7 +28,13 @@ import java.util.Set; import java.util.Stack; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.api.Order; +import org.apache.hadoop.hive.ql.exec.AbstractMapJoinOperator; import org.apache.hadoop.hive.ql.exec.ColumnInfo; +import org.apache.hadoop.hive.ql.exec.FunctionRegistry; import org.apache.hadoop.hive.ql.exec.JoinOperator; import org.apache.hadoop.hive.ql.exec.MapJoinOperator; import org.apache.hadoop.hive.ql.exec.Operator; @@ -36,6 +42,7 @@ import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator; import org.apache.hadoop.hive.ql.exec.RowSchema; import org.apache.hadoop.hive.ql.exec.SelectOperator; +import org.apache.hadoop.hive.ql.exec.TableScanOperator; import org.apache.hadoop.hive.ql.lib.DefaultRuleDispatcher; import org.apache.hadoop.hive.ql.lib.Dispatcher; import org.apache.hadoop.hive.ql.lib.GraphWalker; @@ -44,21 +51,29 @@ import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx; import org.apache.hadoop.hive.ql.lib.Rule; import org.apache.hadoop.hive.ql.lib.RuleRegExp; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.metadata.Partition; +import org.apache.hadoop.hive.ql.metadata.Table; +import org.apache.hadoop.hive.ql.optimizer.ppr.PartitionPruner; +import org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer; import org.apache.hadoop.hive.ql.parse.ErrorMsg; import org.apache.hadoop.hive.ql.parse.GenMapRedWalker; import org.apache.hadoop.hive.ql.parse.OpParseContext; import org.apache.hadoop.hive.ql.parse.ParseContext; +import org.apache.hadoop.hive.ql.parse.PrunedPartitionList; import org.apache.hadoop.hive.ql.parse.QBJoinTree; import org.apache.hadoop.hive.ql.parse.RowResolver; import org.apache.hadoop.hive.ql.parse.SemanticException; import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc; import org.apache.hadoop.hive.ql.plan.ExprNodeDesc; +import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc; import org.apache.hadoop.hive.ql.plan.JoinDesc; import org.apache.hadoop.hive.ql.plan.MapJoinDesc; import org.apache.hadoop.hive.ql.plan.PlanUtils; import org.apache.hadoop.hive.ql.plan.ReduceSinkDesc; import org.apache.hadoop.hive.ql.plan.SelectDesc; import org.apache.hadoop.hive.ql.plan.TableDesc; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDF; /** * Implementation of one of the rule-based map join optimization. User passes @@ -68,6 +83,9 @@ * implemented, this transformation can also be done based on costs. */ public class MapJoinProcessor implements Transform { + + private static final Log LOG = LogFactory.getLog(MapJoinProcessor.class.getName()); + private ParseContext pGraphContext; /** @@ -84,7 +102,7 @@ pGraphContext.getOpParseCtx().put(op, ctx); return op; } - + /** * convert a regular join to a a map-side join. * @@ -101,18 +119,12 @@ // outer join cannot be performed on a table which is being cached JoinDesc desc = op.getConf(); org.apache.hadoop.hive.ql.plan.JoinCondDesc[] condns = desc.getConds(); - for (org.apache.hadoop.hive.ql.plan.JoinCondDesc condn : condns) { - if (condn.getType() == JoinDesc.FULL_OUTER_JOIN) { - throw new SemanticException(ErrorMsg.NO_OUTER_MAPJOIN.getMsg()); - } - if ((condn.getType() == JoinDesc.LEFT_OUTER_JOIN) - && (condn.getLeft() != mapJoinPos)) { - throw new SemanticException(ErrorMsg.NO_OUTER_MAPJOIN.getMsg()); - } - if ((condn.getType() == JoinDesc.RIGHT_OUTER_JOIN) - && (condn.getRight() != mapJoinPos)) { - throw new SemanticException(ErrorMsg.NO_OUTER_MAPJOIN.getMsg()); - } + HiveConf hiveConf = pGraphContext.getConf(); + boolean noCheckOuterJoin = HiveConf.getBoolVar(hiveConf, + HiveConf.ConfVars.HIVEOPTSORTMERGEBUCKETMAPJOIN) + && HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.HIVEOPTBUCKETMAPJOIN); + if (!noCheckOuterJoin) { + checkMapJoin(mapJoinPos, condns); } RowResolver oldOutputRS = pctx.getOpParseCtx().get(op).getRR(); @@ -243,7 +255,7 @@ keyTableDesc, valueExprMap, valueTableDescs, outputColumnNames, mapJoinPos, joinCondns), new RowSchema(outputRS.getColumnInfos()), newPar), outputRS); - + mapJoinOp.getConf().setReversedExprs(op.getConf().getReversedExprs()); mapJoinOp.setColumnExprMap(colExprMap); @@ -264,6 +276,24 @@ return mapJoinOp; } + public static void checkMapJoin(int mapJoinPos, + org.apache.hadoop.hive.ql.plan.JoinCondDesc[] condns) + throws SemanticException { + for (org.apache.hadoop.hive.ql.plan.JoinCondDesc condn : condns) { + if (condn.getType() == JoinDesc.FULL_OUTER_JOIN) { + throw new SemanticException(ErrorMsg.NO_OUTER_MAPJOIN.getMsg()); + } + if ((condn.getType() == JoinDesc.LEFT_OUTER_JOIN) + && (condn.getLeft() != mapJoinPos)) { + throw new SemanticException(ErrorMsg.NO_OUTER_MAPJOIN.getMsg()); + } + if ((condn.getType() == JoinDesc.RIGHT_OUTER_JOIN) + && (condn.getRight() != mapJoinPos)) { + throw new SemanticException(ErrorMsg.NO_OUTER_MAPJOIN.getMsg()); + } + } + } + private void genSelectPlan(ParseContext pctx, MapJoinOperator input) throws SemanticException { List> childOps = input.getChildOperators(); @@ -396,7 +426,7 @@ } // Go over the list and find if a reducer is not needed - List listMapJoinOpsNoRed = new ArrayList(); + List> listMapJoinOpsNoRed = new ArrayList>(); // create a walker which walks the tree in a DFS manner while maintaining // the operator stack. @@ -461,8 +491,8 @@ Object... nodeOutputs) throws SemanticException { MapJoinWalkerCtx ctx = (MapJoinWalkerCtx) procCtx; - MapJoinOperator mapJoin = ctx.getCurrMapJoinOp(); - List listRejectedMapJoins = ctx + AbstractMapJoinOperator mapJoin = ctx.getCurrMapJoinOp(); + List> listRejectedMapJoins = ctx .getListRejectedMapJoins(); // the mapjoin has already been handled @@ -471,9 +501,9 @@ return null; } - List listMapJoinsNoRed = ctx.getListMapJoinsNoRed(); + List> listMapJoinsNoRed = ctx.getListMapJoinsNoRed(); if (listMapJoinsNoRed == null) { - listMapJoinsNoRed = new ArrayList(); + listMapJoinsNoRed = new ArrayList>(); } listMapJoinsNoRed.add(mapJoin); ctx.setListMapJoins(listMapJoinsNoRed); @@ -494,11 +524,11 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, Object... nodeOutputs) throws SemanticException { MapJoinWalkerCtx ctx = (MapJoinWalkerCtx) procCtx; - MapJoinOperator mapJoin = ctx.getCurrMapJoinOp(); - List listRejectedMapJoins = ctx + AbstractMapJoinOperator mapJoin = ctx.getCurrMapJoinOp(); + List> listRejectedMapJoins = ctx .getListRejectedMapJoins(); if (listRejectedMapJoins == null) { - listRejectedMapJoins = new ArrayList(); + listRejectedMapJoins = new ArrayList>(); } listRejectedMapJoins.add(mapJoin); ctx.setListRejectedMapJoins(listRejectedMapJoins); @@ -543,23 +573,23 @@ * */ public static class MapJoinWalkerCtx implements NodeProcessorCtx { - private List listMapJoinsNoRed; - private List listRejectedMapJoins; - private MapJoinOperator currMapJoinOp; + private List> listMapJoinsNoRed; + private List> listRejectedMapJoins; + private AbstractMapJoinOperator currMapJoinOp; /** * @param listMapJoinsNoRed */ - public MapJoinWalkerCtx(List listMapJoinsNoRed) { + public MapJoinWalkerCtx(List> listMapJoinsNoRed) { this.listMapJoinsNoRed = listMapJoinsNoRed; currMapJoinOp = null; - listRejectedMapJoins = new ArrayList(); + listRejectedMapJoins = new ArrayList>(); } /** * @return the listMapJoins */ - public List getListMapJoinsNoRed() { + public List> getListMapJoinsNoRed() { return listMapJoinsNoRed; } @@ -567,14 +597,14 @@ * @param listMapJoinsNoRed * the listMapJoins to set */ - public void setListMapJoins(List listMapJoinsNoRed) { + public void setListMapJoins(List> listMapJoinsNoRed) { this.listMapJoinsNoRed = listMapJoinsNoRed; } /** * @return the currMapJoinOp */ - public MapJoinOperator getCurrMapJoinOp() { + public AbstractMapJoinOperator getCurrMapJoinOp() { return currMapJoinOp; } @@ -582,14 +612,14 @@ * @param currMapJoinOp * the currMapJoinOp to set */ - public void setCurrMapJoinOp(MapJoinOperator currMapJoinOp) { + public void setCurrMapJoinOp(AbstractMapJoinOperator currMapJoinOp) { this.currMapJoinOp = currMapJoinOp; } /** * @return the listRejectedMapJoins */ - public List getListRejectedMapJoins() { + public List> getListRejectedMapJoins() { return listRejectedMapJoins; } @@ -598,7 +628,7 @@ * the listRejectedMapJoins to set */ public void setListRejectedMapJoins( - List listRejectedMapJoins) { + List> listRejectedMapJoins) { this.listRejectedMapJoins = listRejectedMapJoins; } } Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java?rev=919252&r1=919251&r2=919252&view=diff ============================================================================== --- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java (original) +++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java Fri Mar 5 00:53:43 2010 @@ -58,6 +58,9 @@ transformations.add(new MapJoinProcessor()); if (HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.HIVEOPTBUCKETMAPJOIN)) { transformations.add(new BucketMapJoinOptimizer()); + if(HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.HIVEOPTSORTMERGEBUCKETMAPJOIN)) { + transformations.add(new SortedMergeBucketMapJoinOptimizer()); + } } transformations.add(new UnionProcessor()); transformations.add(new JoinReorder());