spark-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Archit Thakur <archit279tha...@gmail.com>
Subject Re: Problems while moving from 0.8.0 to 0.8.1
Date Mon, 27 Jan 2014 20:12:47 GMT
On Tue, Jan 28, 2014 at 1:25 AM, Aaron Davidson <ilikerps@gmail.com> wrote:

> Looks like your MyCustomKeyType.equals() method doesn't correctly handle a
> null argument. In general, the contract of equals is to return false if
> called with a null argument, which this code currently relies on.
>
>
Ok. But what if it is tried to call with null object rather than passing
null.
Code would be internally doing a.equals(b) somewhere where b was null. What
if a happens to be null. Wont it give NPE there itself. Or Does it have a
null check there. I mean why would null be created for MyCustomKeyType,
when it should have some data.
How could MyCustomKeyType be null when I never filled null anywhere.


> This could still be patched for the sake of backwards compatibility with
> similarly incomplete equals() methods which previously worked.
>
>
>
Agreed.

Thanks and Regards,
Archit Thakur.



> On Mon, Jan 27, 2014 at 11:34 AM, Archit Thakur
> <archit279thakur@gmail.com>wrote:
>
> > ERROR executor.Executor: Exception in task ID 20
> > java.lang.NullPointerException
> >         at
> >
> >
> com.xyz.spark.common.collection.MyCustomKeyType.equals(MyCustomKeyType.java:200)
> >         at
> > org.apache.spark.util.AppendOnlyMap.changeValue(AppendOnlyMap.scala:122)
> >         at
> > org.apache.spark.Aggregator.combineValuesByKey(Aggregator.scala:42)
> >         at
> >
> >
> org.apache.spark.rdd.PairRDDFunctions$$anonfun$combineByKey$3.apply(PairRDDFunctions.scala:103)
> >         at
> >
> >
> org.apache.spark.rdd.PairRDDFunctions$$anonfun$combineByKey$3.apply(PairRDDFunctions.scala:102)
> >         at org.apache.spark.rdd.RDD$$anonfun$3.apply(RDD.scala:465)
> >         at org.apache.spark.rdd.RDD$$anonfun$3.apply(RDD.scala:465)
> >         at
> > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:34)
> >         at
> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:237)
> >         at org.apache.spark.rdd.RDD.iterator(RDD.scala:226)
> >         at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:29)
> >         at
> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:237)
> >         at org.apache.spark.rdd.RDD.iterator(RDD.scala:226)
> >         at
> > org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:107)
> >         at org.apache.spark.scheduler.Task.run(Task.scala:53)
> >         at
> >
> >
> org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mcV$sp(Executor.scala:215)
> >         at
> >
> org.apache.spark.deploy.SparkHadoopUtil.runAsUser(SparkHadoopUtil.scala:50)
> >         at
> > org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:182)
> >         at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(Unknown
> > Source)
> >         at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown
> > Source)
> >         at java.lang.Thread.run(Unknown Source)
> >
> >
> >
> > On Mon, Jan 27, 2014 at 10:54 PM, Reynold Xin <rxin@databricks.com>
> wrote:
> >
> > > Do you mind pasting the whole stack trace for the NPE?
> > >
> > >
> > >
> > > On Mon, Jan 27, 2014 at 6:44 AM, Archit Thakur <
> > archit279thakur@gmail.com
> > > >wrote:
> > >
> > > > Hi,
> > > >
> > > > Implementation of aggregation logic has been changed with 0.8.1
> > > > (Aggregator.scala)
> > > >
> > > > It is now using AppendOnlyMap as compared to java.util.HashMap in
> 0.8.0
> > > > release.
> > > >
> > > > Aggregator.scala
> > > > def combineValuesByKey(iter: Iterator[_ <: Product2[K, V]]) :
> > > Iterator[(K,
> > > > C)] = {
> > > >     val combiners = new AppendOnlyMap[K, C]
> > > >     var kv: Product2[K, V] = null
> > > >     val update = (hadValue: Boolean, oldValue: C) => {
> > > >       if (hadValue) mergeValue(oldValue, kv._2) else
> > > createCombiner(kv._2)
> > > >     }
> > > >     while (iter.hasNext) {
> > > >       kv = iter.next()
> > > >       combiners.changeValue(kv._1, update)
> > > >     }
> > > >     combiners.iterator
> > > >   }
> > > >
> > > > I am facing problem that in changeValue function of AppendOnlyMap, it
> > > > computes,
> > > > val curKey = data(2 * pos)
> > > > which is coming as null and eventually giving NPE.
> > > >
> > > > AppendOnlyMap.scala
> > > > def changeValue(key: K, updateFunc: (Boolean, V) => V): V = {
> > > >     val k = key.asInstanceOf[AnyRef]
> > > >     if (k.eq(null)) {
> > > >       if (!haveNullValue) {
> > > >         incrementSize()
> > > >       }
> > > >       nullValue = updateFunc(haveNullValue, nullValue)
> > > >       haveNullValue = true
> > > >       return nullValue
> > > >     }
> > > >     var pos = rehash(k.hashCode) & mask
> > > >     var i = 1
> > > >     while (true) {
> > > >       val curKey = data(2 * pos)
> > > >       if (k.eq(curKey) || k.equals(curKey)) {
> > > >         val newValue = updateFunc(true, data(2 * pos +
> > > 1).asInstanceOf[V])
> > > >         data(2 * pos + 1) = newValue.asInstanceOf[AnyRef]
> > > >         return newValue
> > > >       } else if (curKey.eq(null)) {
> > > >         val newValue = updateFunc(false, null.asInstanceOf[V])
> > > >         data(2 * pos) = k
> > > >         data(2 * pos + 1) = newValue.asInstanceOf[AnyRef]
> > > >         incrementSize()
> > > >         return newValue
> > > >       } else {
> > > >         val delta = i
> > > >         pos = (pos + delta) & mask
> > > >         i += 1
> > > >       }
> > > >     }
> > > >     null.asInstanceOf[V] // Never reached but needed to keep compiler
> > > happy
> > > >   }
> > > >
> > > >
> > > > Other info:
> > > > 1. My code works fine with 0.8.0.
> > > > 2. I used groupByKey transformation.
> > > > 3. I replaces the Aggregator.scala with the older version(0.8.0),
> > > compiled
> > > > it, Restarted Master and Worker, It ran successfully.
> > > >
> > > > Thanks and Regards,
> > > > Archit Thakur.
> > > >
> > >
> >
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message