flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Ufuk Celebi <...@apache.org>
Subject Re: Apache Flink 1.0.3 IllegalStateException Partiction State on chaining
Date Wed, 29 Jun 2016 11:48:08 GMT
Hey Andrea! Sorry for the bad user experience.

Regarding the network buffers: you should be able to run it after
increasing the number of network buffers, just account for it when
specifying the heap size etc. You currently allocate 32768 * 16384
bytes = 512 MB for them. If you have a very long pipeline and high
parallelism, you should increase it accordingly. How much memory do
you have on your machines?

Regarding the IllegalStateException: I suspect that this is **not**
the root failure cause. The null ExecutionState can only happen, if
the producer task (from which data is requested) failed during the
request. The error message is confusing and I opened a JIRA to fix it:
https://issues.apache.org/jira/browse/FLINK-4131. Can you please check
your complete logs to see what the root cause might be, e.g. why did
the producer fail?


On Wed, Jun 29, 2016 at 12:19 PM, ANDREA SPINA
<74598@studenti.unimore.it> wrote:
> Hi everyone,
>
> I am running some Flink experiments with Peel benchmark
> http://peel-framework.org/ and I am struggling with exceptions: the
> environment is a 25-nodes cluster, 16 cores per nodes. The dataset is ~80GiB
> and is located on Hdfs 2.7.1. Flink version is 1.0.3.
>
> At the beginning I tried with 400 as degree of parallelism but not enough
> numberOfBuffers was raised so I changed the parallelism to 200. Flink
> configuration follows:
>
> jobmanager.rpc.address = ${runtime.hostname}
> akka.log.lifecycle.events = ON
> akka.ask.timeout = 300s
> jobmanager.rpc.port = 6002
> jobmanager.heap.mb = 1024
> jobmanager.web.port = 6004
> taskmanager.heap.mb = 28672
> taskmanager.memory.fraction = 0.7
> taskmanager.network.numberOfBuffers = 32768
> taskmanager.network.bufferSizeInBytes = 16384
> taskmanager.tmp.dirs =
> "/data/1/peel/flink/tmp:/data/2/peel/flink/tmp:/data/3/peel/flink/tmp:/data/4/peel/flink/tmp"
> taskmanager.debug.memory.startLogThread = true
>
> With a parallelism of 200 the following exception will raise from a node of
> the cluster:
>
> 2016-06-29 11:31:55,673 INFO  org.apache.flink.runtime.taskmanager.Task
> - CHAIN Reduce (Reduce at dima.tu.berlin.benchmark.fli
> nk.mlr.solver.sGradientDescentL2.createInitialWeightsVector(sGradientDescentL2.scala:43))
> -> Map (Map at dima.tu.berlin.benchmark.flink.mlr.solver
> .sGradientDescentL2.createInitialVector(sGradientDescentL2.scala:69)) (1/1)
> switched to FAILED with exception.
> java.lang.IllegalStateException: Received unexpected partition state null
> for partition request. This is a bug.
>         at
> org.apache.flink.runtime.taskmanager.Task.onPartitionStateUpdate(Task.java:994)
>
>
> The reduce code is:
>
> 43  val dimensionsDS = data.map(_.vector.size).reduce((a, b) => b)
>
> The map code is:
>
> 68  def createInitialVector(dimensionDS: DataSet[Int]): DataSet[Vector] = {
> 69    dimensionDS.map {
> 70      dimension =>
> 71      val values = DenseVector(Array.fill(dimension)(0.0))
> 72      values
> 73    }
> 74  }
>
> I can't figure out a solution, thank you for your help.
>
> Andrea
>
> --
> Andrea Spina
> N.Tessera: 74598
> MAT: 89369
> Ingegneria Informatica [LM] (D.M. 270)

Mime
View raw message