beam-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From amir bahmanyari <amirto...@yahoo.com>
Subject Re: What does this exception mean to you?
Date Fri, 04 Nov 2016 18:27:32 GMT
Hi Ajoscha,Thanks for your reply.I am using Microsoft Azure VM A11 for a 4 nodes Beam/Flink
cluster & Dxy for Ingestion i.e. Kafka 2 nodes cluster given below.


After I create the partition on Kafka, the describe option show that its is evenly distributed
between the two Kafka nodes.Or, at least this is my understanding. Below is partial output
and it shows that all 2048 partitions are in use.Thanks so much for your help. I hope we can
raw some conclusion out of this and find the bottleneck.Have a great weekend.Amir-
[aba@kafka01 kafka_2.11-0.10.0.1]$ ./bin/kafka-topics.sh --describe --zookeeper kafka01:2181
--topic linroad3 |moreTopic:linroad3  PartitionCount:2048     ReplicationFactor:2    
Configs:        Topic: linroad3 Partition: 0    Leader: 1       Replicas: 1,2  
Isr: 1,2        Topic: linroad3 Partition: 1    Leader: 2       Replicas: 2,1  
Isr: 2,1        Topic: linroad3 Partition: 2    Leader: 1       Replicas: 1,2  
Isr: 1,2        Topic: linroad3 Partition: 3    Leader: 2       Replicas: 2,1  
Isr: 2,1        Topic: linroad3 Partition: 4    Leader: 1       Replicas: 1,2  
Isr: 1,2        Topic: linroad3 Partition: 5    Leader: 2       Replicas: 2,1  
Isr: 2,1        Topic: linroad3 Partition: 6    Leader: 1       Replicas: 1,2  
Isr: 1,2        Topic: linroad3 Partition: 7    Leader: 2       Replicas: 2,1  
Isr: 2,1        Topic: linroad3 Partition: 8    Leader: 1       Replicas: 1,2  
Isr: 1,2        Topic: linroad3 Partition: 9    Leader: 2       Replicas: 2,1  
Isr: 2,1        Topic: linroad3 Partition: 10   Leader: 1       Replicas: 1,2   Isr:
1,2        Topic: linroad3 Partition: 11   Leader: 2       Replicas: 2,1   Isr: 2,1 
      Topic: linroad3 Partition: 12   Leader: 1       Replicas: 1,2   Isr: 1,2   
    Topic: linroad3 Partition: 13   Leader: 2       Replicas: 2,1   Isr: 2,1     
  Topic: linroad3 Partition: 14   Leader: 1       Replicas: 1,2   Isr: 1,2       
Topic: linroad3 Partition: 15   Leader: 2       Replicas: 2,1   Isr: 2,1        Topic:
linroad3 Partition: 16   Leader: 1       Replicas: 1,2   Isr: 1,2        Topic: linroad3
Partition: 17   Leader: 2       Replicas: 2,1   Isr: 2,1        Topic: linroad3 Partition:
18   Leader: 1       Replicas: 1,2   Isr: 1,2        Topic: linroad3 Partition: 19
  Leader: 2       Replicas: 2,1   Isr: 2,1        Topic: linroad3 Partition: 20  
Leader: 1       Replicas: 1,2   Isr: 1,2        Topic: linroad3 Partition: 21   Leader:
2       Replicas: 2,1   Isr: 2,1        Topic: linroad3 Partition: 22   Leader: 1
      Replicas: 1,2   Isr: 1,2................................................................................................. 
      Topic: linroad3 Partition: 2045 Leader: 2       Replicas: 2,1   Isr: 2,1   
    Topic: linroad3 Partition: 2046 Leader: 1       Replicas: 1,2   Isr: 1,2     
  Topic: linroad3 Partition: 2047 Leader: 2       Replicas: 2,1   Isr: 2,1


      From: Aljoscha Krettek <aljoscha@apache.org>
 To: amir bahmanyari <amirtousa@yahoo.com>; "user@beam.incubator.apache.org" <user@beam.incubator.apache.org>

 Sent: Friday, November 4, 2016 10:44 AM
 Subject: Re: What does this exception mean to you?
   
Just out of curiosity, what machines are you running this on? I'm asking because the number
of task slots should roughly correlate with the number of CPU cores.
On Fri, 4 Nov 2016 at 10:34 Aljoscha Krettek <aljoscha@apache.org> wrote:

You should try and find out why everything is just happening on one node. Have you looked
at your Kafka Partitions, i.e. is the data evenly distributed across the partitions of your
Kafka topic or is all data pushed to one partition? This would actually explain why processing
is only happening one one node, namely the node that is reading the partition that has all
the data.
On Thu, 3 Nov 2016 at 20:53 amir bahmanyari <amirtousa@yahoo.com> wrote:

Thanks Aljoscha.I have been tuning Flink memory, NW buffers etc.And this occurred in THAT
ONE NODE that I see *.out logs get created by Flink.I lowered the memory that Flink allocates
i.e. 70% by default to 50%. And this exception was thrown in that one node only. Other nodes
were up & didnt crash.There is SOMETHING different about THAT ONE NODE :-) I cannot figure
it out.At every ./start-cluster, THAT ONE NODE may/may not change on random basis.So I cannt
just tune THAT ONE NODE. Next time, another node may become THAT ONE NODE.
I have the followings set in flink-conf.yaml in each node:
akka.ask.timeout : 300s
jobmanager.heap.mb: 256 //Could this be too small? taskmanager.heap.mb: 102400taskmanager.memory.fraction:
0.6 //Changing this to a lower value causes the exception below. Am testing with 0.6 <0.7
default.
taskmanager.numberOfTaskSlots: 512
taskmanager.memory.preallocate: false
parallelism.default: 2048
taskmanager.network.numberOfBuffers: 131072


Appreciate any feedback.Amir-
    From: Aljoscha Krettek <aljoscha@apache.org>
 To: amir bahmanyari <amirtousa@yahoo.com>; "user@beam.incubator.apache.org" <user@beam.incubator.apache.org>

 Sent: Thursday, November 3, 2016 12:45 AM
 Subject: Re: What does this exception mean to you?
  
That looks like a Flink problem. The TaskManager on beam4 seems to have crashed for some reason.
You might be able to find that reason by looking at the logs on that machine.
On Thu, 3 Nov 2016 at 04:53 amir bahmanyari <amirtousa@yahoo.com> wrote:

Thanks+regards,beam4 and beam1 are hostnames.BenchBeamRunners.java is my Beam app running
in a four servers FlinkCluster.
Other nodes are still running except the one that failed beam4.beam1 has the JM running.
Amir-
java.lang.RuntimeException: Pipeline execution failed        at org.apache.beam.runners.flink.FlinkRunner.run(FlinkRunner.java:113) 
      at org.apache.beam.runners.flink.FlinkRunner.run(FlinkRunner.java:48)       
at org.apache.beam.sdk.Pipeline.run(Pipeline.java:183)        at benchmark.flinkspark.flink.BenchBeamRunners.main(BenchBeamRunners.java:622) 
      at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 
      at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 
      at java.lang.reflect.Method.invoke(Method.java:498)        at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:505) 
      at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403) 
      at org.apache.flink.client.program.Client.runBlocking(Client.java:248)       
at org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.java:866)     
  at org.apache.flink.client.CliFrontend.run(CliFrontend.java:333)        at org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1189) 
      at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1239)Caused by: org.apache.flink.client.program.ProgramInvocationException:
The program execution failed: Job execution failed.        at org.apache.flink.client.program.Client.runBlocking(Client.java:381) 
      at org.apache.flink.client.program.Client.runBlocking(Client.java:355)       
at org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:65) 
      at org.apache.beam.runners.flink.FlinkPipelineExecutionEnvironment.executePipeline(FlinkPipelineExecutionEnvironment.java:118) 
      at org.apache.beam.runners.flink.FlinkRunner.run(FlinkRunner.java:110)       
... 14 moreCaused by: org.apache.flink.runtime.client.JobExecutionException: Job execution
failed.        at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply$mcV$sp(JobManager.scala:714) 
      at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:660) 
      at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:660) 
      at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24) 
      at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24) 
      at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41)        at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:401) 
      at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)       
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.pollAndExecAll(ForkJoinPool.java:1253) 
      at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1346) 
      at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)   
    at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)Caused
by: java.lang.Exception: The slot in which the task was executed has been released. Probably
loss of TaskManager 06dff71ba6ab965ec323c8ee6bf3d7d1 @ beam4 - 512 slots - URL: akka.tcp://flink@10.0.0.7:44399/user/taskmanager 
      at org.apache.flink.runtime.instance.SimpleSlot.releaseSlot(SimpleSlot.java:151) 
      at org.apache.flink.runtime.instance.SlotSharingGroupAssignment.releaseSharedSlot(SlotSharingGroupAssignment.java:547) 
      at org.apache.flink.runtime.instance.SharedSlot.releaseSlot(SharedSlot.java:119) 
      at org.apache.flink.runtime.instance.Instance.markDead(Instance.java:156)     
  at org.apache.flink.runtime.instance.InstanceManager.unregisterTaskManager(InstanceManager.java:215) 
      at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1.applyOrElse(JobManager.scala:847) 
      at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36) 
      at org.apache.flink.runtime.LeaderSessionMessageFilter$$anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:44) 
      at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36) 
      at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33)   
    at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28)     
  at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)        at org.apache.flink.runtime.LogMessages$$anon$1.applyOrElse(LogMessages.scala:28) 
      at akka.actor.Actor$class.aroundReceive(Actor.scala:465)        at org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:106) 
      at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)        at akka.actor.dungeon.DeathWatch$class.receivedTerminated(DeathWatch.scala:46) 
      at akka.actor.ActorCell.receivedTerminated(ActorCell.scala:369)        at akka.actor.ActorCell.autoReceiveMessage(ActorCell.scala:501) 
      at akka.actor.ActorCell.invoke(ActorCell.scala:486)        at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254) 
      at akka.dispatch.Mailbox.run(Mailbox.scala:221)        at akka.dispatch.Mailbox.exec(Mailbox.scala:231) 
      at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)       
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)     
  ... 2 more



 



   
Mime
View raw message