Return-Path: X-Original-To: apmail-hive-commits-archive@www.apache.org Delivered-To: apmail-hive-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id F1C1617CB9 for ; Tue, 3 Mar 2015 03:23:21 +0000 (UTC) Received: (qmail 60932 invoked by uid 500); 3 Mar 2015 03:23:21 -0000 Delivered-To: apmail-hive-commits-archive@hive.apache.org Received: (qmail 60883 invoked by uid 500); 3 Mar 2015 03:23:21 -0000 Mailing-List: contact commits-help@hive.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: hive-dev@hive.apache.org Delivered-To: mailing list commits@hive.apache.org Received: (qmail 60870 invoked by uid 99); 3 Mar 2015 03:23:21 -0000 Received: from eris.apache.org (HELO hades.apache.org) (140.211.11.105) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 03 Mar 2015 03:23:21 +0000 Received: from hades.apache.org (localhost [127.0.0.1]) by hades.apache.org (ASF Mail Server at hades.apache.org) with ESMTP id 6D60AAC02BD for ; Tue, 3 Mar 2015 03:23:21 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1663520 - in /hive/trunk/ql/src/java/org/apache/hadoop/hive/ql: exec/ exec/mr/ exec/spark/ exec/tez/ io/merge/ Date: Tue, 03 Mar 2015 03:23:20 -0000 To: commits@hive.apache.org From: gunther@apache.org X-Mailer: svnmailer-1.0.9 Message-Id: <20150303032321.6D60AAC02BD@hades.apache.org> Author: gunther Date: Tue Mar 3 03:23:20 2015 New Revision: 1663520 URL: http://svn.apache.org/r1663520 Log: HIVE-9810: Prep object registry for multi threading (Gunther Hagleitner, reviewed by Vikram Dixit K) Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/AbstractMapJoinOperator.java hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/CommonMergeJoinOperator.java hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ObjectCache.java hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/SMBMapJoinOperator.java hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecMapper.java hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecReducer.java hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ObjectCache.java hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkMapRecordHandler.java hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkMergeFileRecordHandler.java hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkReduceRecordHandler.java hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MergeFileRecordProcessor.java hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ObjectCache.java hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezCacheAccess.java hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/merge/MergeFileMapper.java Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/AbstractMapJoinOperator.java URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/AbstractMapJoinOperator.java?rev=1663520&r1=1663519&r2=1663520&view=diff ============================================================================== --- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/AbstractMapJoinOperator.java (original) +++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/AbstractMapJoinOperator.java Tue Mar 3 03:23:20 2015 @@ -48,9 +48,6 @@ public abstract class AbstractMapJoinOpe transient int numMapRowsRead; - transient boolean firstRow; - - public AbstractMapJoinOperator() { } @@ -72,7 +69,6 @@ public abstract class AbstractMapJoinOpe super.initializeOp(hconf); numMapRowsRead = 0; - firstRow = true; // all other tables are small, and are cached in the hash table posBigTable = (byte) conf.getPosBigTable(); Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/CommonMergeJoinOperator.java URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/CommonMergeJoinOperator.java?rev=1663520&r1=1663519&r2=1663520&view=diff ============================================================================== --- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/CommonMergeJoinOperator.java (original) +++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/CommonMergeJoinOperator.java Tue Mar 3 03:23:20 2015 @@ -500,8 +500,11 @@ public class CommonMergeJoinOperator ext } Map dummyOps = parent.getTagToOperatorTree(); for (Entry connectOp : dummyOps.entrySet()) { - parentOperators.add(connectOp.getKey(), connectOp.getValue()); - connectOp.getValue().getChildOperators().add(this); + if (connectOp.getValue().getChildOperators() == null + || connectOp.getValue().getChildOperators().isEmpty()) { + parentOperators.add(connectOp.getKey(), connectOp.getValue()); + connectOp.getValue().getChildOperators().add(this); + } } super.initializeLocalWork(hconf); return; Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java?rev=1663520&r1=1663519&r2=1663520&view=diff ============================================================================== --- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java (original) +++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java Tue Mar 3 03:23:20 2015 @@ -21,10 +21,14 @@ package org.apache.hadoop.hive.ql.exec; import java.io.Serializable; import java.util.ArrayList; import java.util.List; +import java.util.concurrent.Callable; +import org.apache.commons.lang3.tuple.Pair; +import org.apache.commons.lang3.tuple.ImmutablePair; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.HashTableLoaderFactory; import org.apache.hadoop.hive.ql.exec.mapjoin.MapJoinMemoryExhaustionHandler; import org.apache.hadoop.hive.ql.exec.persistence.MapJoinKey; @@ -55,8 +59,7 @@ public class MapJoinOperator extends Abs private static final String CLASS_NAME = MapJoinOperator.class.getName(); private final PerfLogger perfLogger = PerfLogger.getPerfLogger(); - private transient String tableKey; - private transient String serdeKey; + private transient String cacheKey; private transient ObjectCache cache; protected HashTableLoader loader; @@ -99,28 +102,53 @@ public class MapJoinOperator extends Abs // On Tez only: The hash map might already be cached in the container we run // the task in. On MR: The cache is a no-op. - tableKey = "__HASH_MAP_"+this.getOperatorId()+"_container"; - serdeKey = "__HASH_MAP_"+this.getOperatorId()+"_serde"; + cacheKey = HiveConf.getVar(hconf, HiveConf.ConfVars.HIVEQUERYID) + + "__HASH_MAP_"+this.getOperatorId()+"_container"; cache = ObjectCacheFactory.getCache(hconf); loader = HashTableLoaderFactory.getLoader(hconf); hashMapRowGetters = null; - mapJoinTables = (MapJoinTableContainer[]) cache.retrieve(tableKey); - mapJoinTableSerdes = (MapJoinTableContainerSerDe[]) cache.retrieve(serdeKey); - hashTblInitedOnce = true; - if (isLogInfoEnabled) { - LOG.info("Try to retrieve from cache"); - } + mapJoinTables = new MapJoinTableContainer[tagLen]; + mapJoinTableSerdes = new MapJoinTableContainerSerDe[tagLen]; + hashTblInitedOnce = false; - if (mapJoinTables == null || mapJoinTableSerdes == null) { + generateMapMetaData(); + + if (!conf.isBucketMapJoin()) { + /* + * The issue with caching in case of bucket map join is that different tasks + * process different buckets and if the container is reused to join a different bucket, + * join results can be incorrect. The cache is keyed on operator id and for bucket map join + * the operator does not change but data needed is different. For a proper fix, this + * requires changes in the Tez API with regard to finding bucket id and + * also ability to schedule tasks to re-use containers that have cached the specific bucket. + */ if (isLogInfoEnabled) { - LOG.info("Did not find tables in cache"); + LOG.info("This is not bucket map join, so cache"); } - mapJoinTables = new MapJoinTableContainer[tagLen]; - mapJoinTableSerdes = new MapJoinTableContainerSerDe[tagLen]; - hashTblInitedOnce = false; + + Pair pair = + (Pair) + cache.retrieve(cacheKey, new Callable() { + public Object call() throws HiveException { + return loadHashTable(); + } + }); + + mapJoinTables = pair.getLeft(); + mapJoinTableSerdes = pair.getRight(); + hashTblInitedOnce = true; + } else { + loadHashTable(); + } + + if (this.getExecContext() != null) { + // reset exec context so that initialization of the map operator happens + // poperly + this.getExecContext().setLastInputPath(null); + this.getExecContext().setCurrentInputPath(null); } } @@ -147,85 +175,71 @@ public class MapJoinOperator extends Abs return valueOI; } - public void generateMapMetaData() throws HiveException, SerDeException { + public void generateMapMetaData() throws HiveException { // generate the meta data for key // index for key is -1 - TableDesc keyTableDesc = conf.getKeyTblDesc(); - SerDe keySerializer = (SerDe) ReflectionUtils.newInstance(keyTableDesc.getDeserializerClass(), - null); - SerDeUtils.initializeSerDe(keySerializer, null, keyTableDesc.getProperties(), null); - MapJoinObjectSerDeContext keyContext = new MapJoinObjectSerDeContext(keySerializer, false); - for (int pos = 0; pos < order.length; pos++) { - if (pos == posBigTable) { - continue; - } - TableDesc valueTableDesc; - if (conf.getNoOuterJoin()) { - valueTableDesc = conf.getValueTblDescs().get(pos); - } else { - valueTableDesc = conf.getValueFilteredTblDescs().get(pos); + try { + TableDesc keyTableDesc = conf.getKeyTblDesc(); + SerDe keySerializer = (SerDe) ReflectionUtils.newInstance(keyTableDesc.getDeserializerClass(), + null); + SerDeUtils.initializeSerDe(keySerializer, null, keyTableDesc.getProperties(), null); + MapJoinObjectSerDeContext keyContext = new MapJoinObjectSerDeContext(keySerializer, false); + for (int pos = 0; pos < order.length; pos++) { + if (pos == posBigTable) { + continue; + } + TableDesc valueTableDesc; + if (conf.getNoOuterJoin()) { + valueTableDesc = conf.getValueTblDescs().get(pos); + } else { + valueTableDesc = conf.getValueFilteredTblDescs().get(pos); + } + SerDe valueSerDe = (SerDe) ReflectionUtils.newInstance(valueTableDesc.getDeserializerClass(), + null); + SerDeUtils.initializeSerDe(valueSerDe, null, valueTableDesc.getProperties(), null); + MapJoinObjectSerDeContext valueContext = new MapJoinObjectSerDeContext(valueSerDe, hasFilter(pos)); + mapJoinTableSerdes[pos] = new MapJoinTableContainerSerDe(keyContext, valueContext); } - SerDe valueSerDe = (SerDe) ReflectionUtils.newInstance(valueTableDesc.getDeserializerClass(), - null); - SerDeUtils.initializeSerDe(valueSerDe, null, valueTableDesc.getProperties(), null); - MapJoinObjectSerDeContext valueContext = new MapJoinObjectSerDeContext(valueSerDe, hasFilter(pos)); - mapJoinTableSerdes[pos] = new MapJoinTableContainerSerDe(keyContext, valueContext); + } catch (SerDeException e) { + throw new HiveException(e); } } - private void loadHashTable() throws HiveException { + private Pair + loadHashTable() throws HiveException { - if ((this.getExecContext() == null) - || (this.getExecContext().getLocalWork() == null) - || (this.getExecContext().getLocalWork().getInputFileChangeSensitive() == false) - ) { - /* - * This early-exit criteria is not applicable if the local work is sensitive to input file changes. - * But the check does no apply if there is no local work, or if this is a reducer vertex (execContext is null). - */ - if (hashTblInitedOnce) { - return; - } else { - hashTblInitedOnce = true; - } + if (this.hashTblInitedOnce + && ((this.getExecContext() == null) + || (this.getExecContext().getLocalWork() == null) + || (this.getExecContext().getLocalWork().getInputFileChangeSensitive() + == false))) { + // no need to reload + return new ImmutablePair (mapJoinTables, mapJoinTableSerdes); } + perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.LOAD_HASHTABLE); loader.init(getExecContext(), hconf, this); long memUsage = (long)(MapJoinMemoryExhaustionHandler.getMaxHeapSize() * conf.getHashTableMemoryUsage()); loader.load(mapJoinTables, mapJoinTableSerdes, memUsage); - if (!conf.isBucketMapJoin()) { - /* - * The issue with caching in case of bucket map join is that different tasks - * process different buckets and if the container is reused to join a different bucket, - * join results can be incorrect. The cache is keyed on operator id and for bucket map join - * the operator does not change but data needed is different. For a proper fix, this - * requires changes in the Tez API with regard to finding bucket id and - * also ability to schedule tasks to re-use containers that have cached the specific bucket. - */ - if (isLogInfoEnabled) { - LOG.info("This is not bucket map join, so cache"); - } - cache.cache(tableKey, mapJoinTables); - cache.cache(serdeKey, mapJoinTableSerdes); - } + + hashTblInitedOnce = true; + + Pair pair + = new ImmutablePair (mapJoinTables, mapJoinTableSerdes); + perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.LOAD_HASHTABLE); + + return pair; } // Load the hash table @Override public void cleanUpInputFileChangedOp() throws HiveException { - try { - if (firstRow) { - // generate the map metadata - generateMapMetaData(); - firstRow = false; - } - loadHashTable(); - } catch (SerDeException e) { - throw new HiveException(e); - } + loadHashTable(); } protected void setMapJoinKey( @@ -248,12 +262,6 @@ public class MapJoinOperator extends Abs @Override public void processOp(Object row, int tag) throws HiveException { try { - if (firstRow) { - generateMapMetaData(); - loadHashTable(); - firstRow = false; - } - alias = (byte) tag; if (hashMapRowGetters == null) { hashMapRowGetters = new ReusableGetAdaptor[mapJoinTables.length]; @@ -337,6 +345,7 @@ public class MapJoinOperator extends Abs } } } + cache.release(cacheKey); super.closeOp(abort); } Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ObjectCache.java URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ObjectCache.java?rev=1663520&r1=1663519&r2=1663520&view=diff ============================================================================== --- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ObjectCache.java (original) +++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ObjectCache.java Tue Mar 3 03:23:20 2015 @@ -17,21 +17,24 @@ */ package org.apache.hadoop.hive.ql.exec; +import java.util.concurrent.Callable; +import org.apache.hadoop.hive.ql.metadata.HiveException; + /** * ObjectCache. Interface for maintaining objects associated with a task. */ public interface ObjectCache { + /** - * Add an object to the cache * @param key - * @param value */ - public void cache(String key, Object value); + public void release(String key); /** * Retrieve object from cache. * @param key + * @param fn function to generate the object if it's not there * @return the last cached object with the key, null if none. */ - public Object retrieve(String key); + public Object retrieve(String key, Callable fn) throws HiveException; } Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/SMBMapJoinOperator.java URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/SMBMapJoinOperator.java?rev=1663520&r1=1663519&r2=1663520&view=diff ============================================================================== --- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/SMBMapJoinOperator.java (original) +++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/SMBMapJoinOperator.java Tue Mar 3 03:23:20 2015 @@ -101,8 +101,6 @@ public class SMBMapJoinOperator extends super.initializeOp(hconf); - firstRow = true; - closeCalled = false; this.firstFetchHappened = false; Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecMapper.java URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecMapper.java?rev=1663520&r1=1663519&r2=1663520&view=diff ============================================================================== --- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecMapper.java (original) +++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecMapper.java Tue Mar 3 03:23:20 2015 @@ -30,8 +30,6 @@ import org.apache.commons.logging.LogFac import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.exec.MapOperator; import org.apache.hadoop.hive.ql.exec.MapredContext; -import org.apache.hadoop.hive.ql.exec.ObjectCache; -import org.apache.hadoop.hive.ql.exec.ObjectCacheFactory; import org.apache.hadoop.hive.ql.exec.Operator; import org.apache.hadoop.hive.ql.exec.OperatorUtils; import org.apache.hadoop.hive.ql.exec.Utilities; @@ -93,19 +91,13 @@ public class ExecMapper extends MapReduc setDone(false); - ObjectCache cache = ObjectCacheFactory.getCache(job); - try { jc = job; execContext.setJc(jc); + // create map and fetch operators - MapWork mrwork = (MapWork) cache.retrieve(PLAN_KEY); - if (mrwork == null) { - mrwork = Utilities.getMapWork(job); - cache.cache(PLAN_KEY, mrwork); - } else { - Utilities.setMapWork(job, mrwork); - } + MapWork mrwork = Utilities.getMapWork(job); + if (mrwork.getVectorMode()) { mo = new VectorMapOperator(); } else { Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecReducer.java URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecReducer.java?rev=1663520&r1=1663519&r2=1663520&view=diff ============================================================================== --- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecReducer.java (original) +++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecReducer.java Tue Mar 3 03:23:20 2015 @@ -30,8 +30,6 @@ import java.util.List; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hive.ql.exec.MapredContext; -import org.apache.hadoop.hive.ql.exec.ObjectCache; -import org.apache.hadoop.hive.ql.exec.ObjectCacheFactory; import org.apache.hadoop.hive.ql.exec.Operator; import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.exec.mr.ExecMapper.ReportStats; @@ -111,14 +109,7 @@ public class ExecReducer extends MapRedu } jc = job; - ObjectCache cache = ObjectCacheFactory.getCache(jc); - ReduceWork gWork = (ReduceWork) cache.retrieve(PLAN_KEY); - if (gWork == null) { - gWork = Utilities.getReduceWork(job); - cache.cache(PLAN_KEY, gWork); - } else { - Utilities.setReduceWork(job, gWork); - } + ReduceWork gWork = Utilities.getReduceWork(job); reducer = gWork.getReducer(); reducer.setParentOperators(null); // clear out any parents as reducer is the Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ObjectCache.java URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ObjectCache.java?rev=1663520&r1=1663519&r2=1663520&view=diff ============================================================================== --- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ObjectCache.java (original) +++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ObjectCache.java Tue Mar 3 03:23:20 2015 @@ -18,9 +18,11 @@ package org.apache.hadoop.hive.ql.exec.mr; +import java.util.concurrent.Callable; + import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; - +import org.apache.hadoop.hive.ql.metadata.HiveException; /** * ObjectCache. No-op implementation on MR we don't have a means to reuse @@ -32,14 +34,16 @@ public class ObjectCache implements org. private static final Log LOG = LogFactory.getLog(ObjectCache.class.getName()); @Override - public void cache(String key, Object value) { - LOG.info("Ignoring cache key: "+key); + public void release(String key) { + // nothing to do } @Override - public Object retrieve(String key) { - LOG.info("Ignoring retrieval request: "+key); - return null; + public Object retrieve(String key, Callable fn) throws HiveException { + try { + return fn.call(); + } catch (Exception e) { + throw new HiveException(e); + } } - } Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkMapRecordHandler.java URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkMapRecordHandler.java?rev=1663520&r1=1663519&r2=1663520&view=diff ============================================================================== --- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkMapRecordHandler.java (original) +++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkMapRecordHandler.java Tue Mar 3 03:23:20 2015 @@ -22,8 +22,6 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hive.ql.exec.MapOperator; import org.apache.hadoop.hive.ql.exec.MapredContext; -import org.apache.hadoop.hive.ql.exec.ObjectCache; -import org.apache.hadoop.hive.ql.exec.ObjectCacheFactory; import org.apache.hadoop.hive.ql.exec.Operator; import org.apache.hadoop.hive.ql.exec.OperatorUtils; import org.apache.hadoop.hive.ql.exec.Utilities; @@ -68,19 +66,13 @@ public class SparkMapRecordHandler exten super.init(job, output, reporter); isLogInfoEnabled = LOG.isInfoEnabled(); - ObjectCache cache = ObjectCacheFactory.getCache(job); try { jc = job; execContext = new ExecMapperContext(jc); // create map and fetch operators - MapWork mrwork = (MapWork) cache.retrieve(PLAN_KEY); - if (mrwork == null) { - mrwork = Utilities.getMapWork(job); - cache.cache(PLAN_KEY, mrwork); - } else { - Utilities.setMapWork(job, mrwork); - } + MapWork mrwork = Utilities.getMapWork(job); + if (mrwork.getVectorMode()) { mo = new VectorMapOperator(); } else { Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkMergeFileRecordHandler.java URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkMergeFileRecordHandler.java?rev=1663520&r1=1663519&r2=1663520&view=diff ============================================================================== --- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkMergeFileRecordHandler.java (original) +++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkMergeFileRecordHandler.java Tue Mar 3 03:23:20 2015 @@ -24,8 +24,6 @@ import java.util.Iterator; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hive.ql.exec.AbstractFileMergeOperator; -import org.apache.hadoop.hive.ql.exec.ObjectCache; -import org.apache.hadoop.hive.ql.exec.ObjectCacheFactory; import org.apache.hadoop.hive.ql.exec.Operator; import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.io.merge.MergeFileWork; @@ -58,18 +56,10 @@ public class SparkMergeFileRecordHandler public void init(JobConf job, OutputCollector output, Reporter reporter) throws Exception { super.init(job, output, reporter); - ObjectCache cache = ObjectCacheFactory.getCache(job); - try { jc = job; - MapWork mapWork = (MapWork) cache.retrieve(PLAN_KEY); - if (mapWork == null) { - mapWork = Utilities.getMapWork(job); - cache.cache(PLAN_KEY, mapWork); - } else { - Utilities.setMapWork(job, mapWork); - } + MapWork mapWork = Utilities.getMapWork(job); if (mapWork instanceof MergeFileWork) { MergeFileWork mergeFileWork = (MergeFileWork) mapWork; Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkReduceRecordHandler.java URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkReduceRecordHandler.java?rev=1663520&r1=1663519&r2=1663520&view=diff ============================================================================== --- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkReduceRecordHandler.java (original) +++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkReduceRecordHandler.java Tue Mar 3 03:23:20 2015 @@ -27,8 +27,6 @@ import java.util.List; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hive.ql.exec.MapredContext; -import org.apache.hadoop.hive.ql.exec.ObjectCache; -import org.apache.hadoop.hive.ql.exec.ObjectCacheFactory; import org.apache.hadoop.hive.ql.exec.Operator; import org.apache.hadoop.hive.ql.exec.OperatorUtils; import org.apache.hadoop.hive.ql.exec.Utilities; @@ -114,14 +112,7 @@ public class SparkReduceRecordHandler ex ObjectInspector[] valueObjectInspector = new ObjectInspector[Byte.MAX_VALUE]; ObjectInspector keyObjectInspector; - ObjectCache cache = ObjectCacheFactory.getCache(jc); - ReduceWork gWork = (ReduceWork) cache.retrieve(PLAN_KEY); - if (gWork == null) { - gWork = Utilities.getReduceWork(job); - cache.cache(PLAN_KEY, gWork); - } else { - Utilities.setReduceWork(job, gWork); - } + ReduceWork gWork = Utilities.getReduceWork(job); reducer = gWork.getReducer(); vectorized = gWork.getVectorMode(); Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java?rev=1663520&r1=1663519&r2=1663520&view=diff ============================================================================== --- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java (original) +++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java Tue Mar 3 03:23:20 2015 @@ -26,10 +26,12 @@ import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.TreeMap; +import java.util.concurrent.Callable; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.exec.DummyStoreOperator; import org.apache.hadoop.hive.ql.exec.HashTableDummyOperator; import org.apache.hadoop.hive.ql.exec.MapOperator; @@ -71,7 +73,6 @@ public class MapRecordProcessor extends private MapRecordSource[] sources; private final Map multiMRInputMap = new HashMap(); private int position = 0; - private boolean foundCachedMergeWork = false; MRInputLegacy legacyMRInput = null; MultiMRInput mainWorkMultiMRInput = null; private ExecMapperContext execContext = null; @@ -79,47 +80,49 @@ public class MapRecordProcessor extends protected static final String MAP_PLAN_KEY = "__MAP_PLAN__"; private MapWork mapWork; List mergeWorkList = null; + List cacheKeys; + ObjectCache cache; + private static Map connectOps = - new TreeMap(); + new TreeMap(); - public MapRecordProcessor(JobConf jconf) throws Exception { + public MapRecordProcessor(final JobConf jconf) throws Exception { ObjectCache cache = ObjectCacheFactory.getCache(jconf); execContext = new ExecMapperContext(jconf); execContext.setJc(jconf); + cacheKeys = new ArrayList(); + + String queryId = HiveConf.getVar(jconf, HiveConf.ConfVars.HIVEQUERYID); + String key = queryId + MAP_PLAN_KEY; + cacheKeys.add(key); + // create map and fetch operators - mapWork = (MapWork) cache.retrieve(MAP_PLAN_KEY); - if (mapWork == null) { - mapWork = Utilities.getMapWork(jconf); - cache.cache(MAP_PLAN_KEY, mapWork); - l4j.debug("Plan: " + mapWork); - for (String s: mapWork.getAliases()) { - l4j.debug("Alias: " + s); - } - } else { - Utilities.setMapWork(jconf, mapWork); - } + mapWork = (MapWork) cache.retrieve(key, new Callable() { + public Object call() { + return Utilities.getMapWork(jconf); + } + }); + Utilities.setMapWork(jconf, mapWork); String prefixes = jconf.get(DagUtils.TEZ_MERGE_WORK_FILE_PREFIXES); if (prefixes != null) { mergeWorkList = new ArrayList(); - for (String prefix : prefixes.split(",")) { - MapWork mergeMapWork = (MapWork) cache.retrieve(prefix); - if (mergeMapWork != null) { - l4j.info("Found merge work in cache"); - foundCachedMergeWork = true; - mergeWorkList.add(mergeMapWork); + + for (final String prefix : prefixes.split(",")) { + if (prefix == null || prefix.isEmpty()) { continue; } - if (foundCachedMergeWork) { - throw new Exception( - "Should find all work in cache else operator pipeline will be in non-deterministic state"); - } - if ((prefix != null) && (prefix.isEmpty() == false)) { - mergeMapWork = (MapWork) Utilities.getMergeWork(jconf, prefix); - mergeWorkList.add(mergeMapWork); - cache.cache(prefix, mergeMapWork); - } + key = queryId + prefix; + cacheKeys.add(key); + + mergeWorkList.add( + (MapWork) cache.retrieve(key, + new Callable() { + public Object call() { + return Utilities.getMergeWork(jconf, prefix); + } + })); } } } @@ -174,10 +177,10 @@ public class MapRecordProcessor extends l4j.info("Input name is " + mergeMapWork.getName()); jconf.set(Utilities.INPUT_NAME, mergeMapWork.getName()); mergeMapOp.setChildren(jconf); - if (foundCachedMergeWork == false) { - DummyStoreOperator dummyOp = getJoinParentOp(mergeMapOp); - connectOps.put(mergeMapWork.getTag(), dummyOp); - } + + DummyStoreOperator dummyOp = getJoinParentOp(mergeMapOp); + connectOps.put(mergeMapWork.getTag(), dummyOp); + mergeMapOp.setExecContext(new ExecMapperContext(jconf)); mergeMapOp.initializeLocalWork(jconf); } @@ -264,16 +267,16 @@ public class MapRecordProcessor extends @SuppressWarnings("deprecation") private KeyValueReader getKeyValueReader(Collection keyValueReaders, MapOperator mapOp) - throws Exception { + throws Exception { List kvReaderList = new ArrayList(keyValueReaders); // this sets up the map operator contexts correctly mapOp.initializeContexts(); Deserializer deserializer = mapOp.getCurrentDeserializer(); KeyValueReader reader = - new KeyValueInputMerger(kvReaderList, deserializer, - new ObjectInspector[] { deserializer.getObjectInspector() }, mapOp - .getConf() - .getSortCols()); + new KeyValueInputMerger(kvReaderList, deserializer, + new ObjectInspector[] { deserializer.getObjectInspector() }, mapOp + .getConf() + .getSortCols()); return reader; } @@ -290,7 +293,6 @@ public class MapRecordProcessor extends @Override void run() throws Exception { - while (sources[position].pushRecord()) {} } @@ -301,6 +303,12 @@ public class MapRecordProcessor extends abort = execContext.getIoCxt().getIOExceptions(); } + if (cache != null && cacheKeys != null) { + for (String k: cacheKeys) { + cache.release(k); + } + } + // detecting failed executions by exceptions thrown by the operator tree try { if (mapOp == null || mapWork == null) { Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MergeFileRecordProcessor.java URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MergeFileRecordProcessor.java?rev=1663520&r1=1663519&r2=1663520&view=diff ============================================================================== --- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MergeFileRecordProcessor.java (original) +++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MergeFileRecordProcessor.java Tue Mar 3 03:23:20 2015 @@ -20,10 +20,12 @@ package org.apache.hadoop.hive.ql.exec.t import java.io.IOException; import java.util.Map; import java.util.Map.Entry; +import java.util.concurrent.Callable; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.exec.MapredContext; import org.apache.hadoop.hive.ql.exec.ObjectCacheFactory; import org.apache.hadoop.hive.ql.exec.Operator; @@ -55,13 +57,15 @@ public class MergeFileRecordProcessor ex protected Operator mergeOp; private ExecMapperContext execContext = null; protected static final String MAP_PLAN_KEY = "__MAP_PLAN__"; + private String cacheKey; private MergeFileWork mfWork; MRInputLegacy mrInput = null; private boolean abort = false; private final Object[] row = new Object[2]; + ObjectCache cache; @Override - void init(JobConf jconf, ProcessorContext processorContext, + void init(final JobConf jconf, ProcessorContext processorContext, MRTaskReporter mrReporter, Map inputs, Map outputs) throws Exception { perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.TEZ_INIT_OPERATORS); @@ -85,17 +89,20 @@ public class MergeFileRecordProcessor ex } org.apache.hadoop.hive.ql.exec.ObjectCache cache = ObjectCacheFactory - .getCache(jconf); + .getCache(jconf); + try { execContext.setJc(jconf); - // create map and fetch operators - MapWork mapWork = (MapWork) cache.retrieve(MAP_PLAN_KEY); - if (mapWork == null) { - mapWork = Utilities.getMapWork(jconf); - cache.cache(MAP_PLAN_KEY, mapWork); - } else { - Utilities.setMapWork(jconf, mapWork); - } + + String queryId = HiveConf.getVar(jconf, HiveConf.ConfVars.HIVEQUERYID); + cacheKey = queryId + MAP_PLAN_KEY; + + MapWork mapWork = (MapWork) cache.retrieve(cacheKey, new Callable() { + public Object call() { + return Utilities.getMapWork(jconf); + } + }); + Utilities.setMapWork(jconf, mapWork); if (mapWork instanceof MergeFileWork) { mfWork = (MergeFileWork) mapWork; @@ -144,6 +151,11 @@ public class MergeFileRecordProcessor ex @Override void close() { + + if (cache != null && cacheKey != null) { + cache.release(cacheKey); + } + // check if there are IOExceptions if (!abort) { abort = execContext.getIoCxt().getIOExceptions(); Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ObjectCache.java URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ObjectCache.java?rev=1663520&r1=1663519&r2=1663520&view=diff ============================================================================== --- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ObjectCache.java (original) +++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ObjectCache.java Tue Mar 3 03:23:20 2015 @@ -18,9 +18,12 @@ package org.apache.hadoop.hive.ql.exec.tez; +import java.util.concurrent.Callable; + import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.tez.runtime.api.ObjectRegistry; +import org.apache.hadoop.hive.ql.metadata.HiveException; import com.google.common.base.Preconditions; @@ -29,17 +32,17 @@ import com.google.common.base.Preconditi * */ public class ObjectCache implements org.apache.hadoop.hive.ql.exec.ObjectCache { - + private static final Log LOG = LogFactory.getLog(ObjectCache.class.getName()); - + // ObjectRegistry is available via the Input/Output/ProcessorContext. // This is setup as part of the Tez Processor construction, so that it is available whenever an // instance of the ObjectCache is created. The assumption is that Tez will initialize the Processor // before anything else. private volatile static ObjectRegistry staticRegistry; - + private final ObjectRegistry registry; - + public ObjectCache() { Preconditions.checkNotNull(staticRegistry, "Object registry not setup yet. This should have been setup by the TezProcessor"); @@ -49,18 +52,27 @@ public class ObjectCache implements org. public static void setupObjectRegistry(ObjectRegistry objectRegistry) { staticRegistry = objectRegistry; } - + @Override - public void cache(String key, Object value) { - LOG.info("Adding " + key + " to cache with value " + value); - registry.cacheForVertex(key, value); + public void release(String key) { + // nothing to do + LOG.info("Releasing key: " + key); } @Override - public Object retrieve(String key) { - Object o = registry.get(key); - if (o != null) { - LOG.info("Found " + key + " in cache with value: " + o); + public Object retrieve(String key, Callable fn) throws HiveException { + Object o; + try { + o = registry.get(key); + if (o == null) { + o = fn.call(); + LOG.info("Caching key: " + key); + registry.cacheForVertex(key, o); + } else { + LOG.info("Found " + key + " in cache with value: " + o); + } + } catch (Exception e) { + throw new HiveException(e); } return o; } Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java?rev=1663520&r1=1663519&r2=1663520&view=diff ============================================================================== --- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java (original) +++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java Tue Mar 3 03:23:20 2015 @@ -22,9 +22,11 @@ import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Map.Entry; +import java.util.concurrent.Callable; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.exec.HashTableDummyOperator; import org.apache.hadoop.hive.ql.exec.MapredContext; import org.apache.hadoop.hive.ql.exec.ObjectCache; @@ -55,6 +57,10 @@ public class ReduceRecordProcessor exte private static final String REDUCE_PLAN_KEY = "__REDUCE_PLAN__"; + private ObjectCache cache; + + private String cacheKey; + public static final Log l4j = LogFactory.getLog(ReduceRecordProcessor.class); private ReduceWork redWork; @@ -68,20 +74,22 @@ public class ReduceRecordProcessor exte private boolean abort; @Override - void init(JobConf jconf, ProcessorContext processorContext, MRTaskReporter mrReporter, - Map inputs, Map outputs) throws Exception { + void init(final JobConf jconf, ProcessorContext processorContext, + MRTaskReporter mrReporter, Map inputs, + Map outputs) throws Exception { perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.TEZ_INIT_OPERATORS); super.init(jconf, processorContext, mrReporter, inputs, outputs); ObjectCache cache = ObjectCacheFactory.getCache(jconf); - redWork = (ReduceWork) cache.retrieve(REDUCE_PLAN_KEY); - if (redWork == null) { - redWork = Utilities.getReduceWork(jconf); - cache.cache(REDUCE_PLAN_KEY, redWork); - } else { - Utilities.setReduceWork(jconf, redWork); - } + String queryId = HiveConf.getVar(jconf, HiveConf.ConfVars.HIVEQUERYID); + cacheKey = queryId + REDUCE_PLAN_KEY; + redWork = (ReduceWork) cache.retrieve(cacheKey, new Callable() { + public Object call() { + return Utilities.getReduceWork(jconf); + } + }); + Utilities.setReduceWork(jconf, redWork); reducer = redWork.getReducer(); reducer.getParentOperators().clear(); @@ -188,6 +196,10 @@ public class ReduceRecordProcessor exte @Override void close(){ + if (cache != null) { + cache.release(cacheKey); + } + try { for (ReduceRecordSource rs: sources) { abort = abort && rs.close(); Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezCacheAccess.java URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezCacheAccess.java?rev=1663520&r1=1663519&r2=1663520&view=diff ============================================================================== --- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezCacheAccess.java (original) +++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezCacheAccess.java Tue Mar 3 03:23:20 2015 @@ -22,59 +22,61 @@ import java.util.Collections; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.locks.ReentrantLock; +import java.util.concurrent.Callable; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.exec.ObjectCache; import org.apache.hadoop.hive.ql.exec.ObjectCacheFactory; +import org.apache.hadoop.hive.ql.metadata.HiveException; /** * Access to the Object cache from Tez, along with utility methods for accessing specific Keys. */ public class TezCacheAccess { - private TezCacheAccess(ObjectCache cache) { + private TezCacheAccess(ObjectCache cache, String qId) { + this.qId = qId; this.cache = cache; } private ObjectCache cache; + private String qId; public static TezCacheAccess createInstance(Configuration conf) { ObjectCache cache = ObjectCacheFactory.getCache(conf); - return new TezCacheAccess(cache); + String qId = HiveConf.getVar(conf, HiveConf.ConfVars.HIVEQUERYID); + return new TezCacheAccess(cache, qId); } private static final String CACHED_INPUT_KEY = "CACHED_INPUTS"; - + private final ReentrantLock cachedInputLock = new ReentrantLock(); - public boolean isInputCached(String inputName) { + private Set get() throws HiveException { + return (Set) cache.retrieve(CACHED_INPUT_KEY, + new Callable() { + public Object call() { + return Collections.newSetFromMap(new ConcurrentHashMap()); + } + }); + } + + public boolean isInputCached(String inputName) throws HiveException { this.cachedInputLock.lock(); try { - @SuppressWarnings("unchecked") - Set cachedInputs = (Set) cache.retrieve(CACHED_INPUT_KEY); - if (cachedInputs == null) { - return false; - } else { - return cachedInputs.contains(inputName); - } + return get().contains(qId+inputName); } finally { this.cachedInputLock.unlock(); } } - public void registerCachedInput(String inputName) { + public void registerCachedInput(String inputName) throws HiveException { this.cachedInputLock.lock(); try { - @SuppressWarnings("unchecked") - Set cachedInputs = (Set) cache.retrieve(CACHED_INPUT_KEY); - if (cachedInputs == null) { - cachedInputs = Collections.newSetFromMap(new ConcurrentHashMap()); - cache.cache(CACHED_INPUT_KEY, cachedInputs); - } - cachedInputs.add(inputName); + get().add(qId+inputName); } finally { this.cachedInputLock.unlock(); } } - } Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/merge/MergeFileMapper.java URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/merge/MergeFileMapper.java?rev=1663520&r1=1663519&r2=1663520&view=diff ============================================================================== --- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/merge/MergeFileMapper.java (original) +++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/merge/MergeFileMapper.java Tue Mar 3 03:23:20 2015 @@ -21,8 +21,6 @@ package org.apache.hadoop.hive.ql.io.mer import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hive.ql.exec.AbstractFileMergeOperator; -import org.apache.hadoop.hive.ql.exec.ObjectCache; -import org.apache.hadoop.hive.ql.exec.ObjectCacheFactory; import org.apache.hadoop.hive.ql.exec.Operator; import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.metadata.HiveException; @@ -54,17 +52,7 @@ public class MergeFileMapper extends Map @Override public void configure(JobConf job) { jc = job; - ObjectCache cache = ObjectCacheFactory.getCache(job); - MapWork mapWork = (MapWork) cache.retrieve(PLAN_KEY); - - // if map work is found in object cache then return it else retrieve the - // plan from filesystem and cache it - if (mapWork == null) { - mapWork = Utilities.getMapWork(job); - cache.cache(PLAN_KEY, mapWork); - } else { - Utilities.setMapWork(job, mapWork); - } + MapWork mapWork = Utilities.getMapWork(job); try { if (mapWork instanceof MergeFileWork) {