flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ANDREA SPINA <74...@studenti.unimore.it>
Subject Re: FlinkML ALS matrix factorization: java.io.IOException: Thread 'SortMerger Reading Thread' terminated due to an exception: null
Date Fri, 02 Sep 2016 09:30:47 GMT
Hi Stefan,
Thank you so much for the answer. Ok, I'll do it asap.
For the sake of argument, could the issue be related to the low number of
blocks? I noticed the Flink implementation, as default, set the number of
blocks to the input count (which is actually a lot). So with a low
cardinality and big sized blocks, maybe they don't fit somewhere...
Thank you again.

Andrea

2016-09-02 10:51 GMT+02:00 Stefan Richter <s.richter@data-artisans.com>:

> Hi,
>
> unfortunately, the log does not contain the required information for this
> case. It seems like a sender to the SortMerger failed. The best way to find
> this problem is to take a look to the exceptions that are reported in the
> web front-end for the failing job. Could you check if you find any reported
> exceptions there and provide them to us?
>
> Best,
> Stefan
>
> Am 01.09.2016 um 11:56 schrieb ANDREA SPINA <74598@studenti.unimore.it>:
>
> Sure. Here <https://drive.google.com/open?id=0B6TTuPO7UoeFRXY3RW1KQnNrd3c>
> you can find the complete logs file.
> Still can not run through the issue. Thank you for your help.
>
> 2016-08-31 18:15 GMT+02:00 Flavio Pompermaier <pompermaier@okkam.it>:
>
>> I don't know whether my usual error is related to this one but is very
>> similar and it happens randomly...I still have to figure out the root cause
>> of the error:
>>
>> java.lang.Exception: The data preparation for task 'CHAIN GroupReduce
>> (GroupReduce at createResult(IndexMappingExecutor.java:43)) -> Map (Map
>> at main(Jsonizer.java:90))' , caused an error: Error obtaining the sorted
>> input: Thread 'SortMerger spilling thread' terminated due to an exception:
>> -2
>> at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:456)
>> at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTas
>> k.java:345)
>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
>> at java.lang.Thread.run(Thread.java:745)
>> Caused by: java.lang.RuntimeException: Error obtaining the sorted input:
>> Thread 'SortMerger spilling thread' terminated due to an exception: -2
>> at org.apache.flink.runtime.operators.sort.UnilateralSortMerger
>> .getIterator(UnilateralSortMerger.java:619)
>> at org.apache.flink.runtime.operators.BatchTask.getInput(BatchT
>> ask.java:1079)
>> at org.apache.flink.runtime.operators.GroupReduceDriver.prepare
>> (GroupReduceDriver.java:94)
>> at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:450)
>> ... 3 more
>> Caused by: java.io.IOException: Thread 'SortMerger spilling thread'
>> terminated due to an exception: -2
>> at org.apache.flink.runtime.operators.sort.UnilateralSortMerger
>> $ThreadBase.run(UnilateralSortMerger.java:800)
>> Caused by: java.lang.ArrayIndexOutOfBoundsException: -2
>> at java.util.ArrayList.elementData(ArrayList.java:418)
>> at java.util.ArrayList.get(ArrayList.java:431)
>> at com.esotericsoftware.kryo.util.MapReferenceResolver.getReadO
>> bject(MapReferenceResolver.java:42)
>> at com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:805)
>> at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:759)
>> at com.esotericsoftware.kryo.serializers.MapSerializer.read(
>> MapSerializer.java:135)
>> at com.esotericsoftware.kryo.serializers.MapSerializer.read(
>> MapSerializer.java:21)
>> at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
>> at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSeriali
>> zer.deserialize(KryoSerializer.java:219)
>> at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSeriali
>> zer.deserialize(KryoSerializer.java:245)
>> at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSeriali
>> zer.copy(KryoSerializer.java:255)
>> at org.apache.flink.api.java.typeutils.runtime.PojoSerializer.
>> copy(PojoSerializer.java:556)
>> at org.apache.flink.api.java.typeutils.runtime.TupleSerializerB
>> ase.copy(TupleSerializerBase.java:75)
>> at org.apache.flink.runtime.operators.sort.NormalizedKeySorter.
>> writeToOutput(NormalizedKeySorter.java:499)
>> at org.apache.flink.runtime.operators.sort.UnilateralSortMerger
>> $SpillingThread.go(UnilateralSortMerger.java:1344)
>> at org.apache.flink.runtime.operators.sort.UnilateralSortMerger
>> $ThreadBase.run(UnilateralSortMerger.java:796)
>>
>>
>> On Wed, Aug 31, 2016 at 5:57 PM, Stefan Richter <
>> s.richter@data-artisans.com> wrote:
>>
>>> Hi,
>>>
>>> could you provide the log outputs for your job (ideally with debug
>>> logging enabled)?
>>>
>>> Best,
>>> Stefan
>>>
>>> Am 31.08.2016 um 14:40 schrieb ANDREA SPINA <74598@studenti.unimore.it>:
>>>
>>> Hi everyone.
>>> I'm running the FlinkML ALS matrix factorization and I bumped into the
>>> following exception:
>>>
>>> org.apache.flink.client.program.ProgramInvocationException: The program
>>> execution failed: Job execution failed.
>>> at org.apache.flink.client.program.Client.runBlocking(Client.java:381)
>>> at org.apache.flink.client.program.Client.runBlocking(Client.java:355)
>>> at org.apache.flink.client.program.Client.runBlocking(Client.java:315)
>>> at org.apache.flink.client.program.ContextEnvironment.execute(C
>>> ontextEnvironment.java:60)
>>> at org.apache.flink.api.scala.ExecutionEnvironment.execute(Exec
>>> utionEnvironment.scala:652)
>>> at org.apache.flink.ml.common.FlinkMLTools$.persist(FlinkMLTool
>>> s.scala:94)
>>> at org.apache.flink.ml.recommendation.ALS$$anon$115.fit(ALS.scala:507)
>>> at org.apache.flink.ml.recommendation.ALS$$anon$115.fit(ALS.scala:433)
>>> at org.apache.flink.ml.pipeline.Estimator$class.fit(Estimator.scala:55)
>>> at org.apache.flink.ml.recommendation.ALS.fit(ALS.scala:122)
>>> at dima.tu.berlin.benchmark.flink.als.RUN$.main(RUN.scala:78)
>>> at dima.tu.berlin.benchmark.flink.als.RUN.main(RUN.scala)
>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>> at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAcce
>>> ssorImpl.java:62)
>>> at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMe
>>> thodAccessorImpl.java:43)
>>> at java.lang.reflect.Method.invoke(Method.java:498)
>>> at org.apache.flink.client.program.PackagedProgram.callMainMeth
>>> od(PackagedProgram.java:505)
>>> at org.apache.flink.client.program.PackagedProgram.invokeIntera
>>> ctiveModeForExecution(PackagedProgram.java:403)
>>> at org.apache.flink.client.program.Client.runBlocking(Client.java:248)
>>> at org.apache.flink.client.CliFrontend.executeProgramBlocking(C
>>> liFrontend.java:866)
>>> at org.apache.flink.client.CliFrontend.run(CliFrontend.java:333)
>>> at org.apache.flink.client.CliFrontend.parseParameters(CliFront
>>> end.java:1192)
>>> at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1243)
>>> Caused by: org.apache.flink.runtime.client.JobExecutionException: Job
>>> execution failed.
>>> at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$hand
>>> leMessage$1$$anonfun$applyOrElse$7.apply$mcV$sp(JobManager.scala:717)
>>> at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$hand
>>> leMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:663)
>>> at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$hand
>>> leMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:663)
>>> at scala.concurrent.impl.Future$PromiseCompletingRunnable.lifte
>>> dTree1$1(Future.scala:24)
>>> at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(F
>>> uture.scala:24)
>>> at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41)
>>> at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.
>>> exec(AbstractDispatcher.scala:401)
>>> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>>> at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.pollAndExec
>>> All(ForkJoinPool.java:1253)
>>> at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(For
>>> kJoinPool.java:1346)
>>> at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPoo
>>> l.java:1979)
>>> at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinW
>>> orkerThread.java:107)
>>> Caused by: java.lang.RuntimeException: Initializing the input processing
>>> failed: Error obtaining the sorted input: Thread 'SortMerger Reading
>>> Thread' terminated due to an exception: null
>>> at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTas
>>> k.java:325)
>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
>>> at java.lang.Thread.run(Thread.java:745)
>>> Caused by: java.lang.RuntimeException: Error obtaining the sorted input:
>>> Thread 'SortMerger Reading Thread' terminated due to an exception: null
>>> at org.apache.flink.runtime.operators.sort.UnilateralSortMerger
>>> .getIterator(UnilateralSortMerger.java:619)
>>> at org.apache.flink.runtime.operators.BatchTask.getInput(BatchT
>>> ask.java:1079)
>>> at org.apache.flink.runtime.operators.BatchTask.initLocalStrate
>>> gies(BatchTask.java:819)
>>> at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTas
>>> k.java:321)
>>> ... 2 more
>>> Caused by: java.io.IOException: Thread 'SortMerger Reading Thread'
>>> terminated due to an exception: null
>>> at org.apache.flink.runtime.operators.sort.UnilateralSortMerger
>>> $ThreadBase.run(UnilateralSortMerger.java:800)
>>> Caused by: org.apache.flink.runtime.io.network.partition.ProducerFailed
>>> Exception
>>> at org.apache.flink.runtime.io.network.partition.consumer.Local
>>> InputChannel.getNextLookAhead(LocalInputChannel.java:270)
>>> at org.apache.flink.runtime.io.network.partition.consumer.Local
>>> InputChannel.onNotification(LocalInputChannel.java:238)
>>> at org.apache.flink.runtime.io.network.partition.PipelinedSubpa
>>> rtition.release(PipelinedSubpartition.java:158)
>>> at org.apache.flink.runtime.io.network.partition.ResultPartitio
>>> n.release(ResultPartition.java:320)
>>> at org.apache.flink.runtime.io.network.partition.ResultPartitio
>>> nManager.releasePartitionsProducedBy(ResultPartitionManager.java:95)
>>> at org.apache.flink.runtime.io.network.NetworkEnvironment.unreg
>>> isterTask(NetworkEnvironment.java:370)
>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:657)
>>> at java.lang.Thread.run(Thread.java:745)
>>>
>>> I'm running with* flink-1.0.3*. I really can't figure out the reason
>>> behind that.
>>>
>>> My code simply calls the library as follows:
>>>
>>> val als = ALS()
>>>   .setIterations(numIterations)
>>>   .setNumFactors(rank)
>>>   .setBlocks(degreeOfParallelism)
>>>   .setSeed(42)
>>>   .setTemporaryPath(tempPath)
>>>
>>> als.fit(ratings, parameters)
>>>
>>> val (users, items) = als.factorsOption match {
>>>   case Some(_) => als.factorsOption.get
>>>   case _ => throw new RuntimeException
>>> }
>>>
>>> users.writeAsText(outputPath, WriteMode.OVERWRITE)
>>> items.writeAsText(outputPath, WriteMode.OVERWRITE)
>>>
>>> env.execute("ALS matrix factorization")
>>>
>>> where
>>> - ratings as the input dataset contains (uid, iid, rate) rows about 8e6
>>> users, 1e6 items and 700 rating per user average.
>>> - numIterations 10
>>> - rank 50
>>> - degreeOfParallelism 240
>>>
>>>
>>> *The error seems to be related to the final .persists() call.*at
>>> org.apache.flink.ml.recommendation.ALS$$anon$115.fit(ALS.scala:507)
>>>
>>> I'm running with a 15 nodes cluster - 16cpus per node - with the
>>> following valuable properties:
>>>
>>> jobmanager.heap.mb = 2048
>>> taskmanager.memory.fraction = 0.5
>>> taskmanager.heap.mb = 28672
>>> taskmanager.network.bufferSizeInBytes = 32768
>>> taskmanager.network.numberOfBuffers = 98304
>>> akka.ask.timeout = 300s
>>>
>>> Any help will be appreciated. Thank you.
>>>
>>> --
>>> *Andrea Spina*
>>> N.Tessera: *74598*
>>> MAT: *89369*
>>> *Ingegneria Informatica* *[LM] *(D.M. 270)
>>>
>>>
>>>
>>
>>
>>
>>
>>
>
>
> --
> *Andrea Spina*
> N.Tessera: *74598*
> MAT: *89369*
> *Ingegneria Informatica* *[LM] *(D.M. 270)
>
>
>


-- 
*Andrea Spina*
N.Tessera: *74598*
MAT: *89369*
*Ingegneria Informatica* *[LM] *(D.M. 270)

Mime
View raw message