flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Chen Qin <qinnc...@gmail.com>
Subject s3 statebackend user state size
Date Tue, 10 May 2016 15:07:14 GMT
Hi there,

With S3 as state backend, as well as keeping a large chunk of user state on heap. I can see
task manager starts to fail without showing OOM exception. Instead, it shows a generic error
message (below) when checkpoint triggered. I assume this has something to do with how state
were kept in buffer and flush to s3 when checkpoint triggered. 

Future, to keep large key/value space, wiki point out using rocksdb as backend. My understanding
is using rocksdb will write to local file systems instead of sync to s3. Does flink support
memory->rocksdb(local disk)->s3 checkpoint state split yet? Or would implement kvstate
interface makes flink take care of large state problem?

Chen

java.lang.Exception: The slot in which the task was executed has been released. Probably loss
of TaskManager eddbcda03a61f61210063a7cd2148b36 @ 10.163.98.18 - 24 slots - URL: akka.tcp://flink@10.163.98.18:6124/user/taskmanager
at org.apache.flink.runtime.instance.SimpleSlot.releaseSlot(SimpleSlot.java:151) at org.apache.flink.runtime.instance.SlotSharingGroupAssignment.releaseSharedSlot(SlotSharingGroupAssignment.java:547)
at org.apache.flink.runtime.instance.SharedSlot.releaseSlot(SharedSlot.java:119) at org.apache.flink.runtime.instance.Instance.markDead(Instance.java:156)
at org.apache.flink.runtime.instance.InstanceManager.unregisterTaskManager(InstanceManager.java:215)
at org.apache.flink.runtime.jobmanager.JobManager$anonfun$handleMessage$1.applyOrElse(JobManager.scala:847)
at scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33) at
scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25) at org.apache.flink.runtime.LeaderSessionMessageFilter$anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:44)
at scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33) at
scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25) at org.apache.flink.runtime.LogMessages$anon$1.apply(LogMessages.scala:33)
at org.apache.flink.runtime.LogMessages$anon$1.apply(LogMessages.scala:28) at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)
at org.apache.flink.runtime.LogMessages$anon$1.applyOrElse(LogMessages.scala:28) at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
at org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:106) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
at akka.actor.dungeon.DeathWatch$class.receivedTerminated(DeathWatch.scala:46) at akka.actor.ActorCell.receivedTerminated(ActorCell.scala:369)
at akka.actor.ActorCell.autoReceiveMessage(ActorCell.scala:501) at akka.actor.ActorCell.invoke(ActorCell.scala:486)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254) at akka.dispatch.Mailbox.run(Mailbox.scala:221)
at akka.dispatch.Mailbox.exec(Mailbox.scala:231) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Mime
View raw message