flink-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Chawla,Sumit " <sumitkcha...@gmail.com>
Subject Re: Get Flink ExecutionGraph Programmatically
Date Wed, 21 Sep 2016 19:14:58 GMT
Hi Sean

My goal here is to get User Accumulators.  I know there exists the REST
Calls.  But since i am running my code in the same JVM, i wanted to avoid
go over HTTP.  I saw this code in JobAccumulatorsHandler and tried to use
this.  Would you suggest some alternative approach to avoid this over the
network serialization for Akka?

Regards
Sumit Chawla


On Wed, Sep 21, 2016 at 11:37 AM, Stephan Ewen <sewen@apache.org> wrote:

> Between two different actor systems in the same JVM, messages are still
> serialized (they go through a local socket, I think).
>
> Getting the execution graph is not easily possible, and not intended, as it
> actually contains RPC resources, etc.
>
> What do you need from the execution graph? Maybe there is another way to
> achieve that...
>
> On Wed, Sep 21, 2016 at 8:08 PM, Chawla,Sumit <sumitkchawla@gmail.com>
> wrote:
>
> > Hi Chesney
> >
> > I am actually running this code in the same JVM as the WebInterface and
> > JobManager.  I am programmatically, starting the JobManager. and  then
> > running this code in same JVM to query metrics.  Only difference could be
> > that i am creating a new Akka ActorSystem, and ActorGateway. Not sure if
> it
> > forces it to execute the code as if request is coming over the wire.  I
> am
> > not very well aware of Akka internals, so may be somebody can shed some
> > light on it.
> >
> > Regards
> > Sumit Chawla
> >
> >
> > On Wed, Sep 21, 2016 at 1:06 AM, Chesnay Schepler <chesnay@apache.org>
> > wrote:
> >
> > > Hello,
> > >
> > > this is a rather subtle issue you stumbled upon here.
> > >
> > > The ExecutionGraph is not serializable. The only reason why the
> > > WebInterface can access it is because it runs in the same JVM as the
> > > JobManager.
> > >
> > > I'm not sure if there is a way for what you are trying to do.
> > >
> > > Regards,
> > > Chesnay
> > >
> > >
> > > On 21.09.2016 06:11, Chawla,Sumit wrote:
> > >
> > >> Hi All
> > >>
> > >>
> > >> I am trying to get JOB  accumulators.  ( I am aware that I can get the
> > >> accumulators through REST APIs as well, but i wanted to avoid JSON
> > >> parsing).
> > >>
> > >> Looking at JobAccumulatorsHandler i am trying to get execution graph
> for
> > >> currently running job.  Following is my code:
> > >>
> > >>    InetSocketAddress initialJobManagerAddress=new
> > >> InetSocketAddress(hostName,port);
> > >>              InetAddress ownHostname;
> > >>              ownHostname=
> > >> ConnectionUtils.findConnectingAddress(initialJobManagerAddress,2000,
> > 400);
> > >>
> > >>              ActorSystem actorSystem= AkkaUtils.createActorSystem(co
> > >> nfiguration,
> > >>                      new Some(new
> > >> Tuple2<String,Object>(ownHostname.getCanonicalHostName(),0)));
> > >>
> > >>              FiniteDuration timeout= FiniteDuration.apply(10,
> > >> TimeUnit.SECONDS);
> > >>
> > >>              ActorGateway akkaActorGateway=
> > >> LeaderRetrievalUtils.retrieveLeaderGateway(
> > >>
> > >> LeaderRetrievalUtils.createLeaderRetrievalService(configuration),
> > >>                      actorSystem,timeout
> > >>              );
> > >>
> > >>
> > >>              Future<Object> future=akkaActorGateway.ask(new
> > >> RequestJobDetails(true,false),timeout);
> > >>
> > >>              MultipleJobsDetails result=(MultipleJobsDetails)
> > >> Await.result(future,timeout);
> > >>              ExecutionGraphHolder executionGraphHolder=new
> > >> ExecutionGraphHolder(timeout);
> > >>              LOG.info(result.toString());
> > >>              for(JobDetails detail:result.getRunningJobs()){
> > >>                  LOG.info(detail.getJobName() + "  ID " +
> > >> detail.getJobId());
> > >>
> > >> *                ExecutionGraph
> > >> executionGraph=executionGraphHolder.getExecutionGraph(detail.
> > getJobId(),
> > >> akkaActorGateway);*
> > >>
> > >>                 LOG.info("Accumulators " +
> > >> executionGraph.aggregateUserAccumulators());
> > >>              }
> > >>
> > >>
> > >> However, i am receiving following error in Flink:
> > >>
> > >> 2016-09-20T14:19:53,201 [flink-akka.actor.default-dispatcher-3]
> nobody
> > >> ERROR akka.remote.EndpointWriter - Transient association error
> > >> (association
> > >> remains live)
> > >> java.io.NotSerializableException: org.apache.flink.runtime.
> checkpoint.
> > >> CheckpointCoordinator
> > >>          at java.io.ObjectOutputStream.writeObject0(
> ObjectOutputStream.
> > >> java:1184)
> > >> ~[?:1.8.0_92]
> > >>          at java.io.ObjectOutputStream.defaultWriteFields(
> > ObjectOutputSt
> > >> ream.java:1548)
> > >> ~[?:1.8.0_92]
> > >>          at java.io.ObjectOutputStream.writeSerialData(
> > ObjectOutputStrea
> > >> m.java:1509)
> > >> ~[?:1.8.0_92]
> > >>          at java.io.ObjectOutputStream.writeOrdinaryObject(
> > ObjectOutputS
> > >> tream.java:1432)
> > >> ~[?:1.8.0_92]
> > >>          at java.io.ObjectOutputStream.writeObject0(
> ObjectOutputStream.
> > >> java:1178)
> > >> ~[?:1.8.0_92]
> > >>          at java.io.ObjectOutputStream.defaultWriteFields(
> > ObjectOutputSt
> > >> ream.java:1548)
> > >> ~[?:1.8.0_92]
> > >>          at java.io.ObjectOutputStream.writeSerialData(
> > ObjectOutputStrea
> > >> m.java:1509)
> > >> ~[?:1.8.0_92]
> > >>          at java.io.ObjectOutputStream.writeOrdinaryObject(
> > ObjectOutputS
> > >> tream.java:1432)
> > >> ~[?:1.8.0_92]
> > >>          at java.io.ObjectOutputStream.writeObject0(
> ObjectOutputStream.
> > >> java:1178)
> > >> ~[?:1.8.0_92]
> > >>          at java.io.ObjectOutputStream.writeObject(
> ObjectOutputStream.
> > >> java:348)
> > >> ~[?:1.8.0_92]
> > >>          at akka.serialization.JavaSerializer$$anonfun$
> > >> toBinary$1.apply$mcV$sp(Serializer.scala:129)
> > >> ~[akka-actor_2.10-2.3.7.jar:?]
> > >>          at akka.serialization.JavaSerializer$$anonfun$
> > >> toBinary$1.apply(Serializer.scala:129) ~[akka-actor_2.10-2.3.7.jar:?]
> > >>          at akka.serialization.JavaSerializer$$anonfun$
> > >> toBinary$1.apply(Serializer.scala:129) ~[akka-actor_2.10-2.3.7.jar:?]
> > >>          at scala.util.DynamicVariable.withValue(DynamicVariable.
> scala:
> > >> 57)
> > >> ~[scala-library-2.10.5.jar:?]
> > >>          at akka.serialization.JavaSerializer.toBinary(
> > Serializer.scala:
> > >> 129)
> > >> ~[akka-actor_2.10-2.3.7.jar:?]
> > >>          at akka.remote.MessageSerializer$
> > .serialize(MessageSerializer.s
> > >> cala:36)
> > >> ~[akka-remote_2.10-2.3.7.jar:?]
> > >>          at akka.remote.EndpointWriter$$anonfun$serializeMessage$1.
> > apply
> > >> (Endpoint.scala:845)
> > >> ~[akka-remote_2.10-2.3.7.jar:?]
> > >>          at akka.remote.EndpointWriter$$anonfun$serializeMessage$1.
> > apply
> > >> (Endpoint.scala:845)
> > >> ~[akka-remote_2.10-2.3.7.jar:?]
> > >>          at scala.util.DynamicVariable.withValue(DynamicVariable.
> scala:
> > >> 57)
> > >> ~[scala-library-2.10.5.jar:?]
> > >>          at akka.remote.EndpointWriter.serializeMessage(Endpoint.
> scala:
> > >> 844)
> > >> ~[akka-remote_2.10-2.3.7.jar:?]
> > >>
> > >> Any reason why its failing? This code works when invoked through
> > >> WebRuntimeMonitor.
> > >>
> > >> Regards
> > >> Sumit Chawla
> > >>
> > >>
> > >
> >
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message