hive-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "liyunzhang (JIRA)" <j...@apache.org>
Subject [jira] [Comment Edited] (HIVE-18301) Investigate to enable MapInput cache in Hive on Spark
Date Wed, 24 Jan 2018 05:48:00 GMT

    [ https://issues.apache.org/jira/browse/HIVE-18301?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16336946#comment-16336946
] 

liyunzhang edited comment on HIVE-18301 at 1/24/18 5:47 AM:
------------------------------------------------------------

In HIVE-18301.patch, it provides one solution to transfer the {{IOContext::inputPath}}
{code:java}
  inputRDD1                inputRDD2
        |CopyFunction            | CopyFunction
    CopyRDD1                CopyRDD2
        |                       |
       MT_11                       MT_12
        |                       |
       RT_1                         RT_2
         \                      /
                     Union  
{code}
MT_11 will call following stack to initialize IOContext::inputPath
{code:java}
 CombineHiveRecordReader#init
->HiveContextAwareRecordReader.initIOContext
->IOContext.setInputPath
{code}
inputRDD1 and inputRDD2 are same table's rdd, so CopyRDD1 and CopyRDD2 are same rdd if rdd
cache is enabled. When MT_12 will not call CombineHiveRecordReader#init to initialize {{IOContext::inputPath}}
but {{MapOperator#process(Writable value)}} still need this value. IOContext is bound to single
thread, so the value is different in different thread. {{inputRDD1-CopyRDD1-MT_11-RT_1}} and
{{inputRDD2-CopyRDD2-MT_12-RT_2}} is called in different thread. So IOContext can not be shared
between these two threads.

For this issue, I gave following solution:
 We save the inputPath in CopyRDD1 when {{inputRDD1-CopyRDD1-MT_11-RT_1}} is executed. CopyRDD2
get the cached value and inputPath from CopyRDD1 which is stored in spark cache manager. We
reinitialized the {{IOContext::inputPath}} in {{MapOperator#process(Writable value)}} in MT_12.
 *where to setInputPath?*
 MapInput#CopyFunction#call, save inputPath in the first element of returned tuple
{code:java}
 public MapInput(SparkPlan sparkPlan, JavaPairRDD<WritableComparable, Writable> hadoopRDD)
{
     this(sparkPlan, hadoopRDD, false);
@@ -79,10 +83,19 @@ public void setToCache(boolean toCache) {
     call(Tuple2<WritableComparable, Writable> tuple) throws Exception {
       if (conf == null) {
         conf = new Configuration();
+        conf.set("hive.execution.engine","spark");
       }
-
-      return new Tuple2<WritableComparable, Writable>(tuple._1(),
-          WritableUtils.clone(tuple._2(), conf));
+      //                CopyFunction       MapFunction
+      //  HADOOPRDD-----------------> RDD1-------------> RDD2.....
+      // these transformation are in one stage and will be executed by 1 spark task(thread),
+      // IOContext.get(conf).getInputPath will not be null.
+      String inputPath = IOContextMap.get(conf).getInputPath().toString();
+      Text inputPathText = new Text(inputPath);
+      // save inputPath in the first element of returned tuple
+      // before we need not use tuple._1() in SparkMapRecordHandler#processRow
+      // so replace inputPathText with tuple._1().
+      return new Tuple2<WritableComparable, Writable>(inputPathText,
+        WritableUtils.clone(tuple._2(), conf));
     }

   }
{code}
*where to getInputPath?*
{code:java}
SparkMapRecordHandler#getInputPath
public void processRow(Object key, Object value) throws IOException {
....
+    if (HiveConf.getBoolVar(jc, HiveConf.ConfVars.HIVE_SPARK_SHARED_WORK_OPTIMIZATION)) {
+      Path inputPath = IOContextMap.get(jc).getInputPath();
+      // when inputPath is null, it means the record is cached 
+      if (inputPath == null) {
+        Text pathText = (Text) key;
+        IOContextMap.get(jc).setInputPath(new Path(pathText.toString()));
+      }
+    }
....
{code}
[~lirui], [~xuefuz], [~stakiar],[~csun], please give me your suggesions, thanks!


was (Author: kellyzly):
In HIVE-18301.patch, it provides one solution to transfer the {{IOContext::inputPath}}
{code}
  inputRDD1                inputRDD2
        |CopyFunction            | CopyFunction
    CopyRDD1                CopyRDD2
        |                       |
       MT_11                       MT_12
        |                       |
       RT_1                         RT_2
         \                      /
                     Union  
{code}
MT_11 will call following stack to initialize IOContext::inputPath
{code}
 CombineHiveRecordReader#init
->HiveContextAwareRecordReader.initIOContext
->IOContext.setInputPath
{code}
inputRDD1 and inputRDD2 are same table's rdd, so CopyRDD1 and CopyRDD2 are same rdd if rdd
cache is enabled. When MT_12 will not call CombineHiveRecordReader#init to initialize {{IOContext::inputPath}}
but {{MapOperator#process(Writable value)}} still need this value. IOContext is bound to single
thread, so the value is different in different thread. {{inputRDD1-CopyRDD1-MT_11-RT_1}} and
{{inputRDD2-CopyRDD2-MT_12-RT_2}} is called in different thread. So IOContext can not be shared
between these two threads.

For this issue, I gave following solution:
We save the inputPath in CopyRDD1 when {{inputRDD1-CopyRDD1-MT_11-RT_1}} is executed.  CopyRDD2
get the cached value and inputPath from CopyRDD1 which is stored in spark cache  manager.
We reinitialized the {{IOContext::inputPath}} in {{MapOperator#process(Writable value)}} in
MT_12.
*where to setInputPath?*
MapInput#CopyFunction#call, save inputPath in the first element of returned tuple
{code}
 public MapInput(SparkPlan sparkPlan, JavaPairRDD<WritableComparable, Writable> hadoopRDD)
{
     this(sparkPlan, hadoopRDD, false);
@@ -79,10 +83,19 @@ public void setToCache(boolean toCache) {
     call(Tuple2<WritableComparable, Writable> tuple) throws Exception {
       if (conf == null) {
         conf = new Configuration();
+        conf.set("hive.execution.engine","spark");
       }
-
-      return new Tuple2<WritableComparable, Writable>(tuple._1(),
-          WritableUtils.clone(tuple._2(), conf));
+      //                CopyFunction       MapFunction
+      //  HADOOPRDD-----------------> RDD1-------------> RDD2.....
+      // these transformation are in one stage and will be executed by 1 spark task(thread),
+      // IOContext.get(conf).getInputPath will not be null.
+      String inputPath = IOContextMap.get(conf).getInputPath().toString();
+      Text inputPathText = new Text(inputPath);
+      // save inputPath in the first element of returned tuple
+      return new Tuple2<WritableComparable, Writable>(inputPathText,
+        WritableUtils.clone(tuple._2(), conf));
     }

   }
{code}
*where to getInputPath?*
{code}
SparkMapRecordHandler#getInputPath
public void processRow(Object key, Object value) throws IOException {
....
+    if (HiveConf.getBoolVar(jc, HiveConf.ConfVars.HIVE_SPARK_SHARED_WORK_OPTIMIZATION)) {
+      Path inputPath = IOContextMap.get(jc).getInputPath();
+      // when inputPath is null, it means the record is cached 
+      if (inputPath == null) {
+        Text pathText = (Text) key;
+        IOContextMap.get(jc).setInputPath(new Path(pathText.toString()));
+      }
+    }
....
{code}
[~lirui], [~xuefuz], [~stakiar],[~csun], please give me your suggesions, thanks!

> Investigate to enable MapInput cache in Hive on Spark
> -----------------------------------------------------
>
>                 Key: HIVE-18301
>                 URL: https://issues.apache.org/jira/browse/HIVE-18301
>             Project: Hive
>          Issue Type: Bug
>            Reporter: liyunzhang
>            Assignee: liyunzhang
>            Priority: Major
>         Attachments: HIVE-18301.patch
>
>
> Before IOContext problem is found in MapTran when spark rdd cache is enabled in HIVE-8920.
> so we disabled rdd cache in MapTran at [SparkPlanGenerator|https://github.com/kellyzly/hive/blob/master/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java#L202].
 The problem is IOContext seems not initialized correctly in the spark yarn client/cluster
mode and caused the exception like 
> {code}
> Job aborted due to stage failure: Task 93 in stage 0.0 failed 4 times, most recent failure:
Lost task 93.3 in stage 0.0 (TID 616, bdpe48): java.lang.RuntimeException: Error processing
row: java.lang.NullPointerException
> 	at org.apache.hadoop.hive.ql.exec.spark.SparkMapRecordHandler.processRow(SparkMapRecordHandler.java:165)
> 	at org.apache.hadoop.hive.ql.exec.spark.HiveMapFunctionResultList.processNextRecord(HiveMapFunctionResultList.java:48)
> 	at org.apache.hadoop.hive.ql.exec.spark.HiveMapFunctionResultList.processNextRecord(HiveMapFunctionResultList.java:27)
> 	at org.apache.hadoop.hive.ql.exec.spark.HiveBaseFunctionResultList.hasNext(HiveBaseFunctionResultList.java:85)
> 	at scala.collection.convert.Wrappers$JIteratorWrapper.hasNext(Wrappers.scala:42)
> 	at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125)
> 	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79)
> 	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:47)
> 	at org.apache.spark.scheduler.Task.run(Task.scala:85)
> 	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
> 	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> 	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> 	at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.NullPointerException
> 	at org.apache.hadoop.hive.ql.exec.AbstractMapOperator.getNominalPath(AbstractMapOperator.java:101)
> 	at org.apache.hadoop.hive.ql.exec.MapOperator.cleanUpInputFileChangedOp(MapOperator.java:516)
> 	at org.apache.hadoop.hive.ql.exec.Operator.cleanUpInputFileChanged(Operator.java:1187)
> 	at org.apache.hadoop.hive.ql.exec.MapOperator.process(MapOperator.java:546)
> 	at org.apache.hadoop.hive.ql.exec.spark.SparkMapRecordHandler.processRow(SparkMapRecordHandler.java:152)
> 	... 12 more
> Driver stacktrace:
> {code}
> in yarn client/cluster mode, sometimes [ExecMapperContext#currentInputPath|https://github.com/kellyzly/hive/blob/master/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecMapperContext.java#L109]
is null when rdd cach is enabled.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Mime
View raw message