flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Chawla,Sumit " <sumitkcha...@gmail.com>
Subject Get Flink ExecutionGraph Programmatically
Date Tue, 20 Sep 2016 21:24:35 GMT
Hi All


I am trying to get JOB  accumulators.  ( I am aware that I can get the
accumulators through REST APIs as well, but i wanted to avoid JSON
parsing).

Looking at JobAccumulatorsHandler i am trying to get execution graph for
currently running job.  Following is my code:

  InetSocketAddress initialJobManagerAddress=new
InetSocketAddress(hostName,port);
            InetAddress ownHostname;
            ownHostname=
ConnectionUtils.findConnectingAddress(initialJobManagerAddress,2000,400);

            ActorSystem actorSystem= AkkaUtils.createActorSystem(configuration,
                    new Some(new
Tuple2<String,Object>(ownHostname.getCanonicalHostName(),0)));

            FiniteDuration timeout= FiniteDuration.apply(10, TimeUnit.SECONDS);

            ActorGateway akkaActorGateway=
LeaderRetrievalUtils.retrieveLeaderGateway(

LeaderRetrievalUtils.createLeaderRetrievalService(configuration),
                    actorSystem,timeout
            );


            Future<Object> future=akkaActorGateway.ask(new
RequestJobDetails(true,false),timeout);

            MultipleJobsDetails result=(MultipleJobsDetails)
Await.result(future,timeout);
            ExecutionGraphHolder executionGraphHolder=new
ExecutionGraphHolder(timeout);
            LOG.info(result.toString());
            for(JobDetails detail:result.getRunningJobs()){
                LOG.info(detail.getJobName() + "  ID " + detail.getJobId());

*                ExecutionGraph
executionGraph=executionGraphHolder.getExecutionGraph(detail.getJobId(),akkaActorGateway);*
               LOG.info("Accumulators " +
executionGraph.aggregateUserAccumulators());
            }


However, i am receiving following error in Flink:

2016-09-20T14:19:53,201 [flink-akka.actor.default-dispatcher-3] nobody
ERROR akka.remote.EndpointWriter - Transient association error (association
remains live)
java.io.NotSerializableException:
org.apache.flink.runtime.checkpoint.CheckpointCoordinator
        at
java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
~[?:1.8.0_92]
        at
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
~[?:1.8.0_92]
        at
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
~[?:1.8.0_92]
        at
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
~[?:1.8.0_92]
        at
java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
~[?:1.8.0_92]
        at
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
~[?:1.8.0_92]
        at
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
~[?:1.8.0_92]
        at
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
~[?:1.8.0_92]
        at
java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
~[?:1.8.0_92]
        at
java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
~[?:1.8.0_92]
        at
akka.serialization.JavaSerializer$$anonfun$toBinary$1.apply$mcV$sp(Serializer.scala:129)
~[akka-actor_2.10-2.3.7.jar:?]
        at
akka.serialization.JavaSerializer$$anonfun$toBinary$1.apply(Serializer.scala:129)
~[akka-actor_2.10-2.3.7.jar:?]
        at
akka.serialization.JavaSerializer$$anonfun$toBinary$1.apply(Serializer.scala:129)
~[akka-actor_2.10-2.3.7.jar:?]
        at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
~[scala-library-2.10.5.jar:?]
        at akka.serialization.JavaSerializer.toBinary(Serializer.scala:129)
~[akka-actor_2.10-2.3.7.jar:?]
        at
akka.remote.MessageSerializer$.serialize(MessageSerializer.scala:36)
~[akka-remote_2.10-2.3.7.jar:?]
        at
akka.remote.EndpointWriter$$anonfun$serializeMessage$1.apply(Endpoint.scala:845)
~[akka-remote_2.10-2.3.7.jar:?]
        at
akka.remote.EndpointWriter$$anonfun$serializeMessage$1.apply(Endpoint.scala:845)
~[akka-remote_2.10-2.3.7.jar:?]
        at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
~[scala-library-2.10.5.jar:?]
        at akka.remote.EndpointWriter.serializeMessage(Endpoint.scala:844)
~[akka-remote_2.10-2.3.7.jar:?]

Any reason why its failing? This code works when invoked through
WebRuntimeMonitor.

Regards
Sumit Chawla

Mime
View raw message