beam-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Aljoscha Krettek <aljos...@apache.org>
Subject Re: What does this exception mean to you?
Date Fri, 04 Nov 2016 23:54:41 GMT
Yes, deg of parallelism should be 64 then. A higher slot count means more
overhead. It's similar to how the number of parallel threads should not be
to high when compared to the amount of CPU cores. If you have 1 CPU core
and run a program that is split across 1000 Threads then that will be quite
a bit slower than one thread executing on that one CPU core.

What was the degree of parallelism when you where executing your program
with 512 slots per node? 2048?

I would also again suggest to try and find out how data is written to the
partitions of the linroad3 topic. If I'm not mistaken Kafka should have
metrics that reveal that sort of information. I'm not an expert on that
though, so I don't know the specifics of how that works on your system.

On Sat, 5 Nov 2016 at 00:26 amir bahmanyari <amirtousa@yahoo.com> wrote:

>
> Thanks Aljoscha,
> Sure, will try 16 slots per node. So the deg of parallelism should be 16x4=
> *64, * i assume.
> Question: Why slots per node = 16 may perform better than 512?
> Doesn't the higher deg of parallelism imply higher app throughput?
>
> Its still a mystery why System.println() goes to only one node *.out files
> .
> And whenever there is a runtime issue, THAT NODE with *.out crashes and
> everything else is still up & running.
> My underestanding of "cluster" is we should get app output in all nodes in
> the cluster depending on whicg record is being processed in which node.
>
> The data is sent to Kafka01 node from a Java client, reading one record at
> a time from a data file to Kafka port number 9092 in node Kafka01 (master)
> and to the topic linroad3.
> Kafka01 node and Kafka02 node are clustered as indicated by the output of
> --describe below.
>
> server.properties:listeners=PLAINTEXT://:*9092*
> --zookeeper kafka01:*2181*
>
> Pls let me know if this looks ok to you+ have a great weekend.
>
> Thanks again.
> Amir-
>
>
>
> ------------------------------
> *From:* Aljoscha Krettek <aljoscha@apache.org>
> *To:* amir bahmanyari <amirtousa@yahoo.com>; "
> user@beam.incubator.apache.org" <user@beam.incubator.apache.org>
> *Sent:* Friday, November 4, 2016 3:24 PM
>
> *Subject:* Re: What does this exception mean to you?
>
> Ok, with those machines I highly recommend to set the slot count to 16, at
> most.
>
> For Kafka, the fact that the partitions are distributed across the
> machines (which they seem to be) does not guarantee that the data that is
> written is written evenly to all partitions. How is the data written to
> Kafka?
>
> On Fri, 4 Nov 2016 at 19:31 amir bahmanyari <amirtousa@yahoo.com> wrote:
>
> Hi Aljoscha
> Sorry. the Kafka VMs are A10 (not Dxy).
> IN case the images dont get there:
> A11: 16 cores, 112 GB, 16 Data disks, 16x500 Max IOPS, Load Balancing
> A10: 8 cores, 56 GB, 16 Data disks, 16x500 Max IOPS, Load Balancing
>
> Thanks.
> Amir-
>
> ------------------------------
> *From:* amir bahmanyari <amirtousa@yahoo.com>
> *To:* "user@beam.incubator.apache.org" <user@beam.incubator.apache.org>
> *Sent:* Friday, November 4, 2016 11:27 AM
>
> *Subject:* Re: What does this exception mean to you?
>
> Hi Aljoscha,
> Thanks for your reply.
> I am using Microsoft Azure VM A11 for a 4 nodes Beam/Flink cluster & A10
> for Ingestion i.e. Kafka 2 nodes cluster given below.
> [image: Inline image]
> [image: Inline image]
>
> After I create the partition on Kafka, the describe option show that its
> is evenly distributed between the two Kafka nodes.
> Or, at least this is my understanding. Below is partial output and it
> shows that all 2048 partitions are in use.
> Thanks so much for your help. I hope we can raw some conclusion out of
> this and find the bottleneck.
> Have a great weekend.
> Amir-
>
> [aba@kafka01 kafka_2.11-0.10.0.1]$ ./bin/kafka-topics.sh --describe
> --zookeeper kafka01:2181 --topic linroad3 |more
> Topic:linroad3  PartitionCount:2048     ReplicationFactor:2     Configs:
>         Topic: linroad3 Partition: *0*    *Leader: 1 *      *Replicas:
> 1,2*   *Isr: 1,2*
>         Topic: linroad3 Partition: 1    *Leader: 2 *      *Replicas: 2,1*
>   *Isr: 2,1*
>         Topic: linroad3 Partition: 2    Leader: 1       Replicas: 1,2
> Isr: 1,2
>         Topic: linroad3 Partition: 3    Leader: 2       Replicas: 2,1
> Isr: 2,1
>         Topic: linroad3 Partition: 4    Leader: 1       Replicas: 1,2
> Isr: 1,2
>         Topic: linroad3 Partition: 5    Leader: 2       Replicas: 2,1
> Isr: 2,1
>         Topic: linroad3 Partition: 6    Leader: 1       Replicas: 1,2
> Isr: 1,2
>         Topic: linroad3 Partition: 7    Leader: 2       Replicas: 2,1
> Isr: 2,1
>         Topic: linroad3 Partition: 8    Leader: 1       Replicas: 1,2
> Isr: 1,2
>         Topic: linroad3 Partition: 9    Leader: 2       Replicas: 2,1
> Isr: 2,1
>         Topic: linroad3 Partition: 10   Leader: 1       Replicas: 1,2
> Isr: 1,2
>         Topic: linroad3 Partition: 11   Leader: 2       Replicas: 2,1
> Isr: 2,1
>         Topic: linroad3 Partition: 12   Leader: 1       Replicas: 1,2
> Isr: 1,2
>         Topic: linroad3 Partition: 13   Leader: 2       Replicas: 2,1
> Isr: 2,1
>         Topic: linroad3 Partition: 14   Leader: 1       Replicas: 1,2
> Isr: 1,2
>         Topic: linroad3 Partition: 15   Leader: 2       Replicas: 2,1
> Isr: 2,1
>         Topic: linroad3 Partition: 16   Leader: 1       Replicas: 1,2
> Isr: 1,2
>         Topic: linroad3 Partition: 17   Leader: 2       Replicas: 2,1
> Isr: 2,1
>         Topic: linroad3 Partition: 18   Leader: 1       Replicas: 1,2
> Isr: 1,2
>         Topic: linroad3 Partition: 19   Leader: 2       Replicas: 2,1
> Isr: 2,1
>         Topic: linroad3 Partition: 20   Leader: 1       Replicas: 1,2
> Isr: 1,2
>         Topic: linroad3 Partition: 21   Leader: 2       Replicas: 2,1
> Isr: 2,1
>         Topic: linroad3 Partition: 22   Leader: 1       Replicas: 1,2
> Isr: 1,2
>
> .................................................................................................
>         Topic: linroad3 Partition: 2045 Leader: 2       Replicas: 2,1
> Isr: 2,1
>         Topic: linroad3 Partition: 2046 Leader: 1       Replicas: 1,2
> Isr: 1,2
>         Topic: linroad3 Partition: *2047 *Leader: 2       Replicas: 2,1
> Isr: 2,1
>
>
>
> ------------------------------
> *From:* Aljoscha Krettek <aljoscha@apache.org>
> *To:* amir bahmanyari <amirtousa@yahoo.com>; "
> user@beam.incubator.apache.org" <user@beam.incubator.apache.org>
> *Sent:* Friday, November 4, 2016 10:44 AM
> *Subject:* Re: What does this exception mean to you?
>
> Just out of curiosity, what machines are you running this on? I'm asking
> because the number of task slots should roughly correlate with the number
> of CPU cores.
>
> On Fri, 4 Nov 2016 at 10:34 Aljoscha Krettek <aljoscha@apache.org> wrote:
>
> You should try and find out why everything is just happening on one node.
> Have you looked at your Kafka Partitions, i.e. is the data evenly
> distributed across the partitions of your Kafka topic or is all data pushed
> to one partition? This would actually explain why processing is only
> happening one one node, namely the node that is reading the partition that
> has all the data.
>
> On Thu, 3 Nov 2016 at 20:53 amir bahmanyari <amirtousa@yahoo.com> wrote:
>
> Thanks Aljoscha.
> I have been tuning Flink memory, NW buffers etc.
> And this occurred in THAT ONE NODE that I see *.out logs get created by
> Flink.
> I lowered the memory that Flink allocates i.e. 70% by default to 50%.
> And this exception was thrown in that one node only. Other nodes were up &
> didnt crash.
> There is SOMETHING different about THAT ONE NODE :-) I cannot figure it
> out.
> At every ./start-cluster, THAT ONE NODE may/may not change on random basis.
> So I cannt just tune THAT ONE NODE. Next time, another node may become
> THAT ONE NODE.
>
> I have the followings set in flink-conf.yaml in each node:
>
> akka.ask.timeout : 300s
> jobmanager.heap.mb: 256 //Could this be too small?
> taskmanager.heap.mb: 102400
> taskmanager.memory.fraction: 0.6 //Changing this to a lower value causes
> the exception below. Am testing with 0.6 <0.7 default.
> taskmanager.numberOfTaskSlots: 512
> taskmanager.memory.preallocate: false
> parallelism.default: 2048
> taskmanager.network.numberOfBuffers: 131072
>
>
> Appreciate any feedback.
> Amir-
> ------------------------------
> *From:* Aljoscha Krettek <aljoscha@apache.org>
> *To:* amir bahmanyari <amirtousa@yahoo.com>; "
> user@beam.incubator.apache.org" <user@beam.incubator.apache.org>
> *Sent:* Thursday, November 3, 2016 12:45 AM
> *Subject:* Re: What does this exception mean to you?
>
> That looks like a Flink problem. The TaskManager on beam4 seems to have
> crashed for some reason. You might be able to find that reason by looking
> at the logs on that machine.
>
> On Thu, 3 Nov 2016 at 04:53 amir bahmanyari <amirtousa@yahoo.com> wrote:
>
> Thanks+regards,
> beam4 and beam1 are hostnames.
> BenchBeamRunners.java is my Beam app running in a four servers
> FlinkCluster.
> Other nodes are still running except the one that failed beam4.
> beam1 has the JM running.
>
> Amir-
>
> java.lang.RuntimeException: Pipeline execution failed
>         at
> org.apache.beam.runners.flink.FlinkRunner.run(FlinkRunner.java:113)
>         at
> org.apache.beam.runners.flink.FlinkRunner.run(FlinkRunner.java:48)
>         at org.apache.beam.sdk.Pipeline.run(Pipeline.java:183)
>         at
> benchmark.flinkspark.flink.BenchBeamRunners.main(BenchBeamRunners.java:622)
>         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>         at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>         at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>         at java.lang.reflect.Method.invoke(Method.java:498)
>         at
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:505)
>         at
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403)
>         at
> org.apache.flink.client.program.Client.runBlocking(Client.java:248)
>         at
> org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.java:866)
>         at org.apache.flink.client.CliFrontend.run(CliFrontend.java:333)
>         at
> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1189)
>         at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1239)
> Caused by: org.apache.flink.client.program.ProgramInvocationException: The
> program execution failed: Job execution failed.
>         at
> org.apache.flink.client.program.Client.runBlocking(Client.java:381)
>         at
> org.apache.flink.client.program.Client.runBlocking(Client.java:355)
>         at
> org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:65)
>         at
> org.apache.beam.runners.flink.FlinkPipelineExecutionEnvironment.executePipeline(FlinkPipelineExecutionEnvironment.java:118)
>         at
> org.apache.beam.runners.flink.FlinkRunner.run(FlinkRunner.java:110)
>         ... 14 more
> Caused by: org.apache.flink.runtime.client.JobExecutionException: Job
> execution failed.
>         at
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply$mcV$sp(JobManager.scala:714)
>         at
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:660)
>         at
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:660)
>         at
> scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
>         at
> scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
>         at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41)
>         at
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:401)
>         at
> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>         at
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.pollAndExecAll(ForkJoinPool.java:1253)
>         at
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1346)
>         at
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>         at
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> Caused by: java.lang.Exception: The slot in which the task was executed
> has been released. Probably loss of TaskManager
> 06dff71ba6ab965ec323c8ee6bf3d7d1 @ beam4 - 512 slots - URL: akka.tcp://
> flink@10.0.0.7:44399/user/taskmanager
>         at
> org.apache.flink.runtime.instance.SimpleSlot.releaseSlot(SimpleSlot.java:151)
>         at
> org.apache.flink.runtime.instance.SlotSharingGroupAssignment.releaseSharedSlot(SlotSharingGroupAssignment.java:547)
>         at
> org.apache.flink.runtime.instance.SharedSlot.releaseSlot(SharedSlot.java:119)
>         at
> org.apache.flink.runtime.instance.Instance.markDead(Instance.java:156)
>         at
> org.apache.flink.runtime.instance.InstanceManager.unregisterTaskManager(InstanceManager.java:215)
>         at
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1.applyOrElse(JobManager.scala:847)
>         at
> scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
>         at
> org.apache.flink.runtime.LeaderSessionMessageFilter$$anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:44)
>         at
> scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
>         at
> org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33)
>         at
> org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28)
>         at
> scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
>         at
> org.apache.flink.runtime.LogMessages$$anon$1.applyOrElse(LogMessages.scala:28)
>         at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
>         at
> org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:106)
>         at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
>         at
> akka.actor.dungeon.DeathWatch$class.receivedTerminated(DeathWatch.scala:46)
>         at akka.actor.ActorCell.receivedTerminated(ActorCell.scala:369)
>         at akka.actor.ActorCell.autoReceiveMessage(ActorCell.scala:501)
>         at akka.actor.ActorCell.invoke(ActorCell.scala:486)
>         at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)
>         at akka.dispatch.Mailbox.run(Mailbox.scala:221)
>         at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
>         at
> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>         at
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>         ... 2 more
>
>
>
>
>
>
>
>

Mime
View raw message