flink-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Chesnay Schepler <ches...@apache.org>
Subject Re: Get Flink ExecutionGraph Programmatically
Date Wed, 21 Sep 2016 08:06:52 GMT
Hello,

this is a rather subtle issue you stumbled upon here.

The ExecutionGraph is not serializable. The only reason why the 
WebInterface can access it is because it runs in the same JVM as the 
JobManager.

I'm not sure if there is a way for what you are trying to do.

Regards,
Chesnay

On 21.09.2016 06:11, Chawla,Sumit wrote:
> Hi All
>
>
> I am trying to get JOB  accumulators.  ( I am aware that I can get the
> accumulators through REST APIs as well, but i wanted to avoid JSON
> parsing).
>
> Looking at JobAccumulatorsHandler i am trying to get execution graph for
> currently running job.  Following is my code:
>
>    InetSocketAddress initialJobManagerAddress=new
> InetSocketAddress(hostName,port);
>              InetAddress ownHostname;
>              ownHostname=
> ConnectionUtils.findConnectingAddress(initialJobManagerAddress,2000,400);
>
>              ActorSystem actorSystem= AkkaUtils.createActorSystem(configuration,
>                      new Some(new
> Tuple2<String,Object>(ownHostname.getCanonicalHostName(),0)));
>
>              FiniteDuration timeout= FiniteDuration.apply(10, TimeUnit.SECONDS);
>
>              ActorGateway akkaActorGateway=
> LeaderRetrievalUtils.retrieveLeaderGateway(
>
> LeaderRetrievalUtils.createLeaderRetrievalService(configuration),
>                      actorSystem,timeout
>              );
>
>
>              Future<Object> future=akkaActorGateway.ask(new
> RequestJobDetails(true,false),timeout);
>
>              MultipleJobsDetails result=(MultipleJobsDetails)
> Await.result(future,timeout);
>              ExecutionGraphHolder executionGraphHolder=new
> ExecutionGraphHolder(timeout);
>              LOG.info(result.toString());
>              for(JobDetails detail:result.getRunningJobs()){
>                  LOG.info(detail.getJobName() + "  ID " + detail.getJobId());
>
> *                ExecutionGraph
> executionGraph=executionGraphHolder.getExecutionGraph(detail.getJobId(),akkaActorGateway);*
>                 LOG.info("Accumulators " +
> executionGraph.aggregateUserAccumulators());
>              }
>
>
> However, i am receiving following error in Flink:
>
> 2016-09-20T14:19:53,201 [flink-akka.actor.default-dispatcher-3] nobody
> ERROR akka.remote.EndpointWriter - Transient association error (association
> remains live)
> java.io.NotSerializableException: org.apache.flink.runtime.checkpoint.
> CheckpointCoordinator
>          at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
> ~[?:1.8.0_92]
>          at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
> ~[?:1.8.0_92]
>          at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
> ~[?:1.8.0_92]
>          at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
> ~[?:1.8.0_92]
>          at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
> ~[?:1.8.0_92]
>          at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
> ~[?:1.8.0_92]
>          at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
> ~[?:1.8.0_92]
>          at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
> ~[?:1.8.0_92]
>          at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
> ~[?:1.8.0_92]
>          at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
> ~[?:1.8.0_92]
>          at akka.serialization.JavaSerializer$$anonfun$
> toBinary$1.apply$mcV$sp(Serializer.scala:129) ~[akka-actor_2.10-2.3.7.jar:?]
>          at akka.serialization.JavaSerializer$$anonfun$
> toBinary$1.apply(Serializer.scala:129) ~[akka-actor_2.10-2.3.7.jar:?]
>          at akka.serialization.JavaSerializer$$anonfun$
> toBinary$1.apply(Serializer.scala:129) ~[akka-actor_2.10-2.3.7.jar:?]
>          at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
> ~[scala-library-2.10.5.jar:?]
>          at akka.serialization.JavaSerializer.toBinary(Serializer.scala:129)
> ~[akka-actor_2.10-2.3.7.jar:?]
>          at akka.remote.MessageSerializer$.serialize(MessageSerializer.scala:36)
> ~[akka-remote_2.10-2.3.7.jar:?]
>          at akka.remote.EndpointWriter$$anonfun$serializeMessage$1.apply(Endpoint.scala:845)
> ~[akka-remote_2.10-2.3.7.jar:?]
>          at akka.remote.EndpointWriter$$anonfun$serializeMessage$1.apply(Endpoint.scala:845)
> ~[akka-remote_2.10-2.3.7.jar:?]
>          at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
> ~[scala-library-2.10.5.jar:?]
>          at akka.remote.EndpointWriter.serializeMessage(Endpoint.scala:844)
> ~[akka-remote_2.10-2.3.7.jar:?]
>
> Any reason why its failing? This code works when invoked through
> WebRuntimeMonitor.
>
> Regards
> Sumit Chawla
>


Mime
View raw message