flink-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From amir bahmanyari <amirto...@yahoo.com.INVALID>
Subject Re: Get Flink ExecutionGraph Programmatically
Date Wed, 21 Sep 2016 18:37:02 GMT
My only 2 cents is that when I started to turn the mem pre-allocation param, to true &
#slots & #buffers....I started to get all kinds of Akka & Disassociated exceptions
thrown by the JM regarding the TMs...So yes, since I am also not well aware of Akka internals...I
went back to my previous config & continued with turning knobs that wouldn't cause Akka
exceptions.Thanks+regardsAmir-

      From: "Chawla,Sumit" <sumitkchawla@gmail.com>
 To: dev@flink.apache.org 
 Sent: Wednesday, September 21, 2016 11:08 AM
 Subject: Re: Get Flink ExecutionGraph Programmatically
   
Hi Chesney

I am actually running this code in the same JVM as the WebInterface and
JobManager.  I am programmatically, starting the JobManager. and  then
running this code in same JVM to query metrics.  Only difference could be
that i am creating a new Akka ActorSystem, and ActorGateway. Not sure if it
forces it to execute the code as if request is coming over the wire.  I am
not very well aware of Akka internals, so may be somebody can shed some
light on it.

Regards
Sumit Chawla


On Wed, Sep 21, 2016 at 1:06 AM, Chesnay Schepler <chesnay@apache.org>
wrote:

> Hello,
>
> this is a rather subtle issue you stumbled upon here.
>
> The ExecutionGraph is not serializable. The only reason why the
> WebInterface can access it is because it runs in the same JVM as the
> JobManager.
>
> I'm not sure if there is a way for what you are trying to do.
>
> Regards,
> Chesnay
>
>
> On 21.09.2016 06:11, Chawla,Sumit wrote:
>
>> Hi All
>>
>>
>> I am trying to get JOB  accumulators.  ( I am aware that I can get the
>> accumulators through REST APIs as well, but i wanted to avoid JSON
>> parsing).
>>
>> Looking at JobAccumulatorsHandler i am trying to get execution graph for
>> currently running job.  Following is my code:
>>
>>    InetSocketAddress initialJobManagerAddress=new
>> InetSocketAddress(hostName,port);
>>              InetAddress ownHostname;
>>              ownHostname=
>> ConnectionUtils.findConnectingAddress(initialJobManagerAddress,2000,400);
>>
>>              ActorSystem actorSystem= AkkaUtils.createActorSystem(co
>> nfiguration,
>>                      new Some(new
>> Tuple2<String,Object>(ownHostname.getCanonicalHostName(),0)));
>>
>>              FiniteDuration timeout= FiniteDuration.apply(10,
>> TimeUnit.SECONDS);
>>
>>              ActorGateway akkaActorGateway=
>> LeaderRetrievalUtils.retrieveLeaderGateway(
>>
>> LeaderRetrievalUtils.createLeaderRetrievalService(configuration),
>>                      actorSystem,timeout
>>              );
>>
>>
>>              Future<Object> future=akkaActorGateway.ask(new
>> RequestJobDetails(true,false),timeout);
>>
>>              MultipleJobsDetails result=(MultipleJobsDetails)
>> Await.result(future,timeout);
>>              ExecutionGraphHolder executionGraphHolder=new
>> ExecutionGraphHolder(timeout);
>>              LOG.info(result.toString());
>>              for(JobDetails detail:result.getRunningJobs()){
>>                  LOG.info(detail.getJobName() + "  ID " +
>> detail.getJobId());
>>
>> *                ExecutionGraph
>> executionGraph=executionGraphHolder.getExecutionGraph(detail.getJobId(),
>> akkaActorGateway);*
>>
>>                LOG.info("Accumulators " +
>> executionGraph.aggregateUserAccumulators());
>>              }
>>
>>
>> However, i am receiving following error in Flink:
>>
>> 2016-09-20T14:19:53,201 [flink-akka.actor.default-dispatcher-3] nobody
>> ERROR akka.remote.EndpointWriter - Transient association error
>> (association
>> remains live)
>> java.io.NotSerializableException: org.apache.flink.runtime.checkpoint.
>> CheckpointCoordinator
>>          at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.
>> java:1184)
>> ~[?:1.8.0_92]
>>          at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputSt
>> ream.java:1548)
>> ~[?:1.8.0_92]
>>          at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStrea
>> m.java:1509)
>> ~[?:1.8.0_92]
>>          at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputS
>> tream.java:1432)
>> ~[?:1.8.0_92]
>>          at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.
>> java:1178)
>> ~[?:1.8.0_92]
>>          at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputSt
>> ream.java:1548)
>> ~[?:1.8.0_92]
>>          at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStrea
>> m.java:1509)
>> ~[?:1.8.0_92]
>>          at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputS
>> tream.java:1432)
>> ~[?:1.8.0_92]
>>          at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.
>> java:1178)
>> ~[?:1.8.0_92]
>>          at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.
>> java:348)
>> ~[?:1.8.0_92]
>>          at akka.serialization.JavaSerializer$$anonfun$
>> toBinary$1.apply$mcV$sp(Serializer.scala:129)
>> ~[akka-actor_2.10-2.3.7.jar:?]
>>          at akka.serialization.JavaSerializer$$anonfun$
>> toBinary$1.apply(Serializer.scala:129) ~[akka-actor_2.10-2.3.7.jar:?]
>>          at akka.serialization.JavaSerializer$$anonfun$
>> toBinary$1.apply(Serializer.scala:129) ~[akka-actor_2.10-2.3.7.jar:?]
>>          at scala.util.DynamicVariable.withValue(DynamicVariable.scala:
>> 57)
>> ~[scala-library-2.10.5.jar:?]
>>          at akka.serialization.JavaSerializer.toBinary(Serializer.scala:
>> 129)
>> ~[akka-actor_2.10-2.3.7.jar:?]
>>          at akka.remote.MessageSerializer$.serialize(MessageSerializer.s
>> cala:36)
>> ~[akka-remote_2.10-2.3.7.jar:?]
>>          at akka.remote.EndpointWriter$$anonfun$serializeMessage$1.apply
>> (Endpoint.scala:845)
>> ~[akka-remote_2.10-2.3.7.jar:?]
>>          at akka.remote.EndpointWriter$$anonfun$serializeMessage$1.apply
>> (Endpoint.scala:845)
>> ~[akka-remote_2.10-2.3.7.jar:?]
>>          at scala.util.DynamicVariable.withValue(DynamicVariable.scala:
>> 57)
>> ~[scala-library-2.10.5.jar:?]
>>          at akka.remote.EndpointWriter.serializeMessage(Endpoint.scala:
>> 844)
>> ~[akka-remote_2.10-2.3.7.jar:?]
>>
>> Any reason why its failing? This code works when invoked through
>> WebRuntimeMonitor.
>>
>> Regards
>> Sumit Chawla
>>
>>
>


   
Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message