spark-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Michael Allman (JIRA)" <j...@apache.org>
Subject [jira] [Updated] (SPARK-17204) Spark 2.0 off heap RDD persistence with replication factor 2 leads to in-memory data corruption
Date Tue, 29 Nov 2016 17:58:59 GMT

     [ https://issues.apache.org/jira/browse/SPARK-17204?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]

Michael Allman updated SPARK-17204:
-----------------------------------
    Description: 
We use the {{OFF_HEAP}} storage level extensively with great success. We've tried off-heap
storage with replication factor 2 and have always received exceptions on the executor side
very shortly after starting the job. For example:

{code}
com.esotericsoftware.kryo.KryoException: Encountered unregistered class ID: 9086
	at com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:137)
	at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:670)
	at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:781)
	at org.apache.spark.serializer.KryoDeserializationStream.readObject(KryoSerializer.scala:229)
	at org.apache.spark.serializer.DeserializationStream$$anon$1.getNext(Serializer.scala:169)
	at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)
	at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
	at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:461)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificColumnarIterator.hasNext(Unknown
Source)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.agg_doAggregateWithoutKey$(Unknown
Source)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown
Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
	at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:47)
	at org.apache.spark.scheduler.Task.run(Task.scala:85)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:745)
{code}

or

{code}
java.lang.IndexOutOfBoundsException: Index: 6, Size: 0
	at java.util.ArrayList.rangeCheck(ArrayList.java:653)
	at java.util.ArrayList.get(ArrayList.java:429)
	at com.esotericsoftware.kryo.util.MapReferenceResolver.getReadObject(MapReferenceResolver.java:60)
	at com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:834)
	at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:788)
	at org.apache.spark.serializer.KryoDeserializationStream.readObject(KryoSerializer.scala:229)
	at org.apache.spark.serializer.DeserializationStream$$anon$1.getNext(Serializer.scala:169)
	at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)
	at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
	at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:461)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificColumnarIterator.hasNext(Unknown
Source)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.agg_doAggregateWithoutKey$(Unknown
Source)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown
Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
	at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:47)
	at org.apache.spark.scheduler.Task.run(Task.scala:85)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:745)
{code}

or

{code}
java.lang.NullPointerException
	at org.apache.spark.sql.execution.columnar.InMemoryTableScanExec$$anonfun$doExecute$1$$anonfun$6.apply(InMemoryTableScanExec.scala:141)
	at org.apache.spark.sql.execution.columnar.InMemoryTableScanExec$$anonfun$doExecute$1$$anonfun$6.apply(InMemoryTableScanExec.scala:140)
	at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:463)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificColumnarIterator.hasNext(Unknown
Source)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificColumnarIterator.hasNext(Unknown
Source)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.agg_doAggregateWithoutKey$(Unknown
Source)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown
Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
	at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:47)
	at org.apache.spark.scheduler.Task.run(Task.scala:85)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:745)
{code}

We've tried switching to Java serialization and get a different exception:

{code}
java.io.StreamCorruptedException: invalid stream header: 780000D0
	at java.io.ObjectInputStream.readStreamHeader(ObjectInputStream.java:808)
	at java.io.ObjectInputStream.<init>(ObjectInputStream.java:301)
	at org.apache.spark.serializer.JavaDeserializationStream$$anon$1.<init>(JavaSerializer.scala:63)
	at org.apache.spark.serializer.JavaDeserializationStream.<init>(JavaSerializer.scala:63)
	at org.apache.spark.serializer.JavaSerializerInstance.deserializeStream(JavaSerializer.scala:122)
	at org.apache.spark.serializer.SerializerManager.dataDeserializeStream(SerializerManager.scala:146)
	at org.apache.spark.storage.BlockManager.getLocalValues(BlockManager.scala:433)
	at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:672)
	at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:330)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:281)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:47)
	at org.apache.spark.scheduler.Task.run(Task.scala:85)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:745)
{code}

This suggest some kind of memory corruption to me.

I haven't seen this problem when testing in local mode, but I believe that's because no actual
replication takes place in that mode.

I've been able to consistently reproduce this problem with a very simple code snippet. The
following works without trouble:

{code}
import org.apache.spark.storage.StorageLevel, StorageLevel.OFF_HEAP
sc.range(0, 100).persist(OFF_HEAP).count
{code}

However, this will throw an exception similar to those above:

{code}
import org.apache.spark.storage.StorageLevel
val OFF_HEAP_2 = StorageLevel(useDisk = true, useMemory = true, useOffHeap = true, deserialized
= false, replication = 2)
sc.range(0, 100).persist(OFF_HEAP_2).count
{code}

I've even received an exception when testing with {{sc.range(0, 0)}}, but others have not.

  was:
We use the OFF_HEAP storage level extensively with great success. We've tried off-heap storage
with replication factor 2 and have always received exceptions on the executor side very shortly
after starting the job. For example:

{code}
com.esotericsoftware.kryo.KryoException: Encountered unregistered class ID: 9086
	at com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:137)
	at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:670)
	at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:781)
	at org.apache.spark.serializer.KryoDeserializationStream.readObject(KryoSerializer.scala:229)
	at org.apache.spark.serializer.DeserializationStream$$anon$1.getNext(Serializer.scala:169)
	at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)
	at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
	at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:461)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificColumnarIterator.hasNext(Unknown
Source)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.agg_doAggregateWithoutKey$(Unknown
Source)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown
Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
	at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:47)
	at org.apache.spark.scheduler.Task.run(Task.scala:85)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:745)
{code}

or

{code}
java.lang.IndexOutOfBoundsException: Index: 6, Size: 0
	at java.util.ArrayList.rangeCheck(ArrayList.java:653)
	at java.util.ArrayList.get(ArrayList.java:429)
	at com.esotericsoftware.kryo.util.MapReferenceResolver.getReadObject(MapReferenceResolver.java:60)
	at com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:834)
	at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:788)
	at org.apache.spark.serializer.KryoDeserializationStream.readObject(KryoSerializer.scala:229)
	at org.apache.spark.serializer.DeserializationStream$$anon$1.getNext(Serializer.scala:169)
	at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)
	at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
	at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:461)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificColumnarIterator.hasNext(Unknown
Source)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.agg_doAggregateWithoutKey$(Unknown
Source)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown
Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
	at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:47)
	at org.apache.spark.scheduler.Task.run(Task.scala:85)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:745)
{code}

or

{code}
java.lang.NullPointerException
	at org.apache.spark.sql.execution.columnar.InMemoryTableScanExec$$anonfun$doExecute$1$$anonfun$6.apply(InMemoryTableScanExec.scala:141)
	at org.apache.spark.sql.execution.columnar.InMemoryTableScanExec$$anonfun$doExecute$1$$anonfun$6.apply(InMemoryTableScanExec.scala:140)
	at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:463)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificColumnarIterator.hasNext(Unknown
Source)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificColumnarIterator.hasNext(Unknown
Source)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.agg_doAggregateWithoutKey$(Unknown
Source)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown
Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
	at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:47)
	at org.apache.spark.scheduler.Task.run(Task.scala:85)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:745)
{code}

We've tried switching to Java serialization and get a different exception:

{code}
java.io.StreamCorruptedException: invalid stream header: 780000D0
	at java.io.ObjectInputStream.readStreamHeader(ObjectInputStream.java:808)
	at java.io.ObjectInputStream.<init>(ObjectInputStream.java:301)
	at org.apache.spark.serializer.JavaDeserializationStream$$anon$1.<init>(JavaSerializer.scala:63)
	at org.apache.spark.serializer.JavaDeserializationStream.<init>(JavaSerializer.scala:63)
	at org.apache.spark.serializer.JavaSerializerInstance.deserializeStream(JavaSerializer.scala:122)
	at org.apache.spark.serializer.SerializerManager.dataDeserializeStream(SerializerManager.scala:146)
	at org.apache.spark.storage.BlockManager.getLocalValues(BlockManager.scala:433)
	at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:672)
	at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:330)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:281)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:47)
	at org.apache.spark.scheduler.Task.run(Task.scala:85)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:745)
{code}

This suggest some kind of memory corruption to me.

I haven't seen this problem when testing in local mode, but I believe that's because no actual
replication takes place in that mode.

I've been able to consistently reproduce this problem with a very simple code snippet. The
following works without trouble:

{code}
import org.apache.spark.storage.StorageLevel, StorageLevel.OFF_HEAP
sc.range(0, 100).persist(OFF_HEAP).count
{code}

However, this will throw an exception similar to those above:

{code}
import org.apache.spark.storage.StorageLevel
val OFF_HEAP_2 = StorageLevel(useDisk = true, useMemory = true, useOffHeap = true, deserialized
= false, replication = 2)
sc.range(0, 100).persist(OFF_HEAP_2).count
{code}

I've even received an exception when testing with `sc.range(0, 0)`, but others have not.


> Spark 2.0 off heap RDD persistence with replication factor 2 leads to in-memory data
corruption
> -----------------------------------------------------------------------------------------------
>
>                 Key: SPARK-17204
>                 URL: https://issues.apache.org/jira/browse/SPARK-17204
>             Project: Spark
>          Issue Type: Bug
>          Components: Spark Core
>    Affects Versions: 2.0.0
>            Reporter: Michael Allman
>
> We use the {{OFF_HEAP}} storage level extensively with great success. We've tried off-heap
storage with replication factor 2 and have always received exceptions on the executor side
very shortly after starting the job. For example:
> {code}
> com.esotericsoftware.kryo.KryoException: Encountered unregistered class ID: 9086
> 	at com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:137)
> 	at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:670)
> 	at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:781)
> 	at org.apache.spark.serializer.KryoDeserializationStream.readObject(KryoSerializer.scala:229)
> 	at org.apache.spark.serializer.DeserializationStream$$anon$1.getNext(Serializer.scala:169)
> 	at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)
> 	at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32)
> 	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
> 	at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:461)
> 	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
> 	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificColumnarIterator.hasNext(Unknown
Source)
> 	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.agg_doAggregateWithoutKey$(Unknown
Source)
> 	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown
Source)
> 	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
> 	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370)
> 	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
> 	at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125)
> 	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79)
> 	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:47)
> 	at org.apache.spark.scheduler.Task.run(Task.scala:85)
> 	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
> 	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> 	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> 	at java.lang.Thread.run(Thread.java:745)
> {code}
> or
> {code}
> java.lang.IndexOutOfBoundsException: Index: 6, Size: 0
> 	at java.util.ArrayList.rangeCheck(ArrayList.java:653)
> 	at java.util.ArrayList.get(ArrayList.java:429)
> 	at com.esotericsoftware.kryo.util.MapReferenceResolver.getReadObject(MapReferenceResolver.java:60)
> 	at com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:834)
> 	at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:788)
> 	at org.apache.spark.serializer.KryoDeserializationStream.readObject(KryoSerializer.scala:229)
> 	at org.apache.spark.serializer.DeserializationStream$$anon$1.getNext(Serializer.scala:169)
> 	at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)
> 	at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32)
> 	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
> 	at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:461)
> 	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
> 	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificColumnarIterator.hasNext(Unknown
Source)
> 	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.agg_doAggregateWithoutKey$(Unknown
Source)
> 	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown
Source)
> 	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
> 	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370)
> 	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
> 	at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125)
> 	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79)
> 	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:47)
> 	at org.apache.spark.scheduler.Task.run(Task.scala:85)
> 	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
> 	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> 	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> 	at java.lang.Thread.run(Thread.java:745)
> {code}
> or
> {code}
> java.lang.NullPointerException
> 	at org.apache.spark.sql.execution.columnar.InMemoryTableScanExec$$anonfun$doExecute$1$$anonfun$6.apply(InMemoryTableScanExec.scala:141)
> 	at org.apache.spark.sql.execution.columnar.InMemoryTableScanExec$$anonfun$doExecute$1$$anonfun$6.apply(InMemoryTableScanExec.scala:140)
> 	at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:463)
> 	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
> 	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificColumnarIterator.hasNext(Unknown
Source)
> 	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificColumnarIterator.hasNext(Unknown
Source)
> 	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.agg_doAggregateWithoutKey$(Unknown
Source)
> 	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown
Source)
> 	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
> 	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370)
> 	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
> 	at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125)
> 	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79)
> 	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:47)
> 	at org.apache.spark.scheduler.Task.run(Task.scala:85)
> 	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
> 	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> 	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> 	at java.lang.Thread.run(Thread.java:745)
> {code}
> We've tried switching to Java serialization and get a different exception:
> {code}
> java.io.StreamCorruptedException: invalid stream header: 780000D0
> 	at java.io.ObjectInputStream.readStreamHeader(ObjectInputStream.java:808)
> 	at java.io.ObjectInputStream.<init>(ObjectInputStream.java:301)
> 	at org.apache.spark.serializer.JavaDeserializationStream$$anon$1.<init>(JavaSerializer.scala:63)
> 	at org.apache.spark.serializer.JavaDeserializationStream.<init>(JavaSerializer.scala:63)
> 	at org.apache.spark.serializer.JavaSerializerInstance.deserializeStream(JavaSerializer.scala:122)
> 	at org.apache.spark.serializer.SerializerManager.dataDeserializeStream(SerializerManager.scala:146)
> 	at org.apache.spark.storage.BlockManager.getLocalValues(BlockManager.scala:433)
> 	at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:672)
> 	at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:330)
> 	at org.apache.spark.rdd.RDD.iterator(RDD.scala:281)
> 	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
> 	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
> 	at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
> 	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
> 	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
> 	at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
> 	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
> 	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
> 	at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
> 	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79)
> 	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:47)
> 	at org.apache.spark.scheduler.Task.run(Task.scala:85)
> 	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
> 	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> 	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> 	at java.lang.Thread.run(Thread.java:745)
> {code}
> This suggest some kind of memory corruption to me.
> I haven't seen this problem when testing in local mode, but I believe that's because
no actual replication takes place in that mode.
> I've been able to consistently reproduce this problem with a very simple code snippet.
The following works without trouble:
> {code}
> import org.apache.spark.storage.StorageLevel, StorageLevel.OFF_HEAP
> sc.range(0, 100).persist(OFF_HEAP).count
> {code}
> However, this will throw an exception similar to those above:
> {code}
> import org.apache.spark.storage.StorageLevel
> val OFF_HEAP_2 = StorageLevel(useDisk = true, useMemory = true, useOffHeap = true, deserialized
= false, replication = 2)
> sc.range(0, 100).persist(OFF_HEAP_2).count
> {code}
> I've even received an exception when testing with {{sc.range(0, 0)}}, but others have
not.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org


Mime
View raw message