Return-Path: X-Original-To: apmail-flink-user-archive@minotaur.apache.org Delivered-To: apmail-flink-user-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 14446183D2 for ; Mon, 29 Jun 2015 07:53:59 +0000 (UTC) Received: (qmail 69935 invoked by uid 500); 29 Jun 2015 07:53:59 -0000 Delivered-To: apmail-flink-user-archive@flink.apache.org Received: (qmail 69859 invoked by uid 500); 29 Jun 2015 07:53:58 -0000 Mailing-List: contact user-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: user@flink.apache.org Delivered-To: mailing list user@flink.apache.org Received: (qmail 69850 invoked by uid 99); 29 Jun 2015 07:53:58 -0000 Received: from mail-relay.apache.org (HELO mail-relay.apache.org) (140.211.11.15) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 29 Jun 2015 07:53:58 +0000 Received: from mail-yk0-f179.google.com (mail-yk0-f179.google.com [209.85.160.179]) by mail-relay.apache.org (ASF Mail Server at mail-relay.apache.org) with ESMTPSA id 8B7B01A0254 for ; Mon, 29 Jun 2015 07:53:58 +0000 (UTC) Received: by ykdy1 with SMTP id y1so107954491ykd.2 for ; Mon, 29 Jun 2015 00:53:57 -0700 (PDT) X-Received: by 10.129.41.18 with SMTP id p18mr17678061ywp.147.1435564437644; Mon, 29 Jun 2015 00:53:57 -0700 (PDT) MIME-Version: 1.0 Received: by 10.37.110.215 with HTTP; Mon, 29 Jun 2015 00:53:38 -0700 (PDT) In-Reply-To: References: <558C3D35.6020508@informatik.hu-berlin.de> <558D128B.4090202@informatik.hu-berlin.de> <558D1B9F.9070700@informatik.hu-berlin.de> From: Robert Metzger Date: Mon, 29 Jun 2015 09:53:38 +0200 Message-ID: Subject: Re: ArrayIndexOutOfBoundsException when running job from JAR To: "user@flink.apache.org" Content-Type: multipart/alternative; boundary=001a1141ca825bad490519a364de --001a1141ca825bad490519a364de Content-Type: text/plain; charset=UTF-8 It is working in the IDE because there we execute everything in the same JVM, so the mapper can access the correct value of the static variable. When submitting a job with the CLI frontend, there are at least two JVMs involved, and code running in the JM/TM can not access the value from the static variable in the Cli frontend. On Sun, Jun 28, 2015 at 9:43 PM, Vasiliki Kalavri wrote: > Hi everyone, > > Mihail and I have now solved the issue. > > The exception was caused because the array size in question was read from > a static field of the enclosing class, inside an anonymous mapper. Making > the mapper a standalone class and passing the array size to the constructor > solved the issue. > > What I don't understand though, is why this worked fine when the job was > executed from inside the IDE. Is serialization handled differently > (skipped) in this case? > > Cheers, > Vasia. > > On 26 June 2015 at 11:30, Mihail Vieru > wrote: > >> Hi Vasia, >> >> *InitVerticesMapper* is called in the run method of APSP: >> >> * @Override* >> * public Graph, NullValue> run(Graph> Tuple2, NullValue> input) {* >> >> * VertexCentricConfiguration parameters = new >> VertexCentricConfiguration();* >> * parameters.setSolutionSetUnmanagedMemory(false);* >> >> * return input.mapVertices(new InitVerticesMapper(srcVertexId))* >> * .runVertexCentricIteration(new VertexDistanceUpdater> Tuple2, Integer>(srcVertexId),* >> * new MinDistanceMessenger> Tuple2, Integer, NullValue>(srcVertexId),* >> * maxIterations, parameters);* >> * }* >> >> I'll send you the full code via a private e-mail. >> >> Cheers, >> Mihail >> >> >> On 26.06.2015 11:10, Vasiliki Kalavri wrote: >> >> Hi Mihail, >> >> could you share your code or at least the implementations of >> getVerticesDataSet() and InitVerticesMapper so I can take a look? >> Where is InitVerticesMapper called above? >> >> Cheers, >> Vasia. >> >> >> On 26 June 2015 at 10:51, Mihail Vieru >> wrote: >> >>> Hi Robert, >>> >>> I'm using the same input data, as well as the same parameters I use in >>> the IDE's run configuration. >>> I don't run the job on the cluster (yet), but locally, by starting Flink >>> with the start-local.sh script. >>> >>> >>> I will try to explain my code a bit. The *Integer[] *array is >>> initialized in the *getVerticesDataSet()* method. >>> >>> * DataSet >> vertices = >>> getVerticesDataSet(env);* >>> * ...* >>> * Graph, NullValue> graph = >>> Graph.fromDataSet(vertices, edges, env);* >>> * ...* >>> * Graph, NullValue> >>> intermediateGraph = * >>> * graph.run(new APSP(srcVertexId, >>> maxIterations));* >>> >>> >>> In APSP I'm addressing it in the *InitVerticesMapper*, but is now >>> suddenly empty. >>> >>> Best, >>> Mihail >>> >>> >>> On 26.06.2015 10:00, Robert Metzger wrote: >>> >>> Hi Mihail, >>> >>> the NPE has been thrown from >>> *graphdistance.APSP$InitVerticesMapper.map(APSP.java:74)*. I guess that >>> is code written by you or a library you are using. >>> Maybe the data you are using on the cluster is different from your local >>> test data? >>> >>> Best, >>> Robert >>> >>> >>> On Thu, Jun 25, 2015 at 7:41 PM, Mihail Vieru < >>> vieru@informatik.hu-berlin.de> wrote: >>> >>>> Hi, >>>> >>>> I get an ArrayIndexOutOfBoundsException when I run my job from a JAR in >>>> the CLI. >>>> This doesn't occur in the IDE. >>>> >>>> I've build the JAR using the "maven-shade-plugin" and the pom.xml >>>> configuration Robert has provided here: >>>> >>>> https://stackoverflow.com/questions/30102523/linkage-failure-when-running-apache-flink-jobs >>>> I specify the entry point using the "-c" option. >>>> >>>> The array the Exception refers to is actually initialized when a >>>> vertices dataset is read from the file system. >>>> >>>> Any ideas on what could cause this issue? >>>> >>>> Best, >>>> Mihail >>>> >>>> P.S.: the stack trace: >>>> >>>> *org.apache.flink.client.program.ProgramInvocationException: The >>>> program execution failed: Job execution failed.* >>>> * at org.apache.flink.client.program.Client.run(Client.java:413)* >>>> * at org.apache.flink.client.program.Client.run(Client.java:356)* >>>> * at org.apache.flink.client.program.Client.run(Client.java:349)* >>>> * at >>>> org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:63)* >>>> * at graphdistance.KAPSPNaiveJob.main(KAPSPNaiveJob.java:56)* >>>> * at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)* >>>> * at >>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)* >>>> * at >>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)* >>>> * at java.lang.reflect.Method.invoke(Method.java:606)* >>>> * at >>>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:437)* >>>> * at >>>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:353)* >>>> * at org.apache.flink.client.program.Client.run(Client.java:315)* >>>> * at >>>> org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:584)* >>>> * at org.apache.flink.client.CliFrontend.run(CliFrontend.java:290)* >>>> * at >>>> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:880)* >>>> * at org.apache.flink.client.CliFrontend.main(CliFrontend.java:922)* >>>> *Caused by: org.apache.flink.runtime.client.JobExecutionException: Job >>>> execution failed.* >>>> * at >>>> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$receiveWithLogMessages$1.applyOrElse(JobManager.scala:314)* >>>> * at >>>> scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)* >>>> * at >>>> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)* >>>> * at >>>> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)* >>>> * at >>>> org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:36)* >>>> * at >>>> org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:29)* >>>> * at >>>> scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)* >>>> * at >>>> org.apache.flink.runtime.ActorLogMessages$$anon$1.applyOrElse(ActorLogMessages.scala:29)* >>>> * at akka.actor.Actor$class.aroundReceive(Actor.scala:465)* >>>> * at >>>> org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:92)* >>>> * at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)* >>>> * at akka.actor.ActorCell.invoke(ActorCell.scala:487)* >>>> * at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)* >>>> * at akka.dispatch.Mailbox.run(Mailbox.scala:221)* >>>> * at akka.dispatch.Mailbox.exec(Mailbox.scala:231)* >>>> * at >>>> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)* >>>> * at >>>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)* >>>> * at >>>> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)* >>>> * at >>>> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)* >>>> *Caused by: java.lang.ArrayIndexOutOfBoundsException: 0* >>>> * at graphdistance.APSP$InitVerticesMapper.map(APSP.java:74)* >>>> * at graphdistance.APSP$InitVerticesMapper.map(APSP.java:48)* >>>> * at org.apache.flink.graph.Graph$2.map(Graph.java:389)* >>>> * at org.apache.flink.graph.Graph$2.map(Graph.java:387)* >>>> * at >>>> org.apache.flink.runtime.operators.MapDriver.run(MapDriver.java:97)* >>>> * at >>>> org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:496)* >>>> * at >>>> org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:362)* >>>> * at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)* >>>> * at java.lang.Thread.run(Thread.java:745)* >>>> >>> >>> >>> >> >> > --001a1141ca825bad490519a364de Content-Type: text/html; charset=UTF-8 Content-Transfer-Encoding: quoted-printable
It is working in the IDE because there we execute everythi= ng in the same JVM, so the mapper can access the correct value of the stati= c variable.
When submitting a job with the CLI frontend, there are at l= east two JVMs involved, and code running in the JM/TM can not access the va= lue from the static variable in the Cli frontend.

On Sun, Jun 28, 2015 at 9:43 PM= , Vasiliki Kalavri <vasilikikalavri@gmail.com> wrote= :
Hi everyone,

Mihail and I have now solved the issue.

The exception was caused because the ar= ray size in question was read from a static field of the enclosing class, i= nside an anonymous mapper. Making the mapper a standalone class and passing= the array size to the constructor solved the issue.

What I don't understand thou= gh, is why this worked fine when the job was executed from inside the IDE. = Is serialization handled differently (skipped) in this case?

Cheers,
Vasia.

On 26 June 2015 at = 11:30, Mihail Vieru <vieru@informatik.hu-berlin.de> wrote:
=20 =20 =20
Hi Vasia,

InitVerticesMapper is called in the run method of APSP:

=C2=A0=C2=A0=C2=A0 @Override
=C2=A0=C2=A0=C2=A0 public Graph<K, Tuple2<Integer[],String= >, NullValue> run(Graph<K, Tuple2<Integer[],String>, NullValue> input) {

=C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 VertexCentricConfiguration= parameters =3D new VertexCentricConfiguration();
=C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 parameters.setSolutionSetU= nmanagedMemory(false);
=C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0
=C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 return input.mapVertices(n= ew InitVerticesMapper<K>(srcVertexId))
=C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 =C2=A0= =C2=A0=C2=A0 .runVertexCentricIteration(new VertexDistanceUpdater<K, Tuple2<Integer[],String>, Integer>(srcVertexId),
=C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 =C2=A0= =C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 new MinDistanceMessenger= <K, Tuple2<Integer[],String>, Integer, NullValue>(srcVertexId),
=C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 =C2=A0= =C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 maxIterations, parameter= s);
=C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 }

I'll send you the full code via a private e-mail.

Cheers,
Mihail


On 26.06.2015 11:10, Vasiliki Kalavri wrote:
Hi Mihail,

could you share your code or at least the implementations of getVerticesDataSet() and InitVerticesMapper so I can take a look?
Where is InitVerticesMapper called above?

Cheers,
Vasia.


On 26 June 2015 at 10:51, Mihail Vieru <vieru@informatik.hu-berlin.de> wrote:
Hi Robert,

I'm using the same input data, as well as the same parameters I use in the IDE's run configuration.
I don't run the job on the cluster (yet), but locally, by starting Flink with the start-local.sh script.


I will try to explain my code a bit. The Integer[] arr= ay is initialized in the getVerticesDataSet() method.

=C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 DataSet<Vertex<= ;Integer, Tuple2<Integer[],String> >> vertices =3D getVerticesDataSet(env);
=C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 ...
=C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 Graph<Integer= , Tuple2<Integer[],String>, NullValue> graph =3D Graph.fromDataSet(vertices, edges, env);
=C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 ...
=C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 Graph<Integer= , Tuple2<Integer[],String>, NullValue> intermediateGraph =3D
=C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2= =A0 =C2=A0=C2=A0=C2=A0 graph.run(new APSP<Integer>(srcVertexId, maxIterations));


In APSP I'm addressing it in the InitVerticesMapper, but is now suddenly empty.

Best,
Mihail


On 26.06.2015 10:00, Robert Metzger wrote:
Hi Mihail,

the NPE has been thrown from=C2=A0graphdistance.APSP$InitVerticesMapper.map(APSP.= java:74). I guess that is code written by you or a library you are using.
Maybe the data you are using on the cluster is different from your local test data?

Best,
Robert


On Thu, Jun 25, 2015 at 7:41 PM, Mihail Vieru <vieru@informati= k.hu-berlin.de> wrote:
Hi,
I get an ArrayIndexOutOfBoundsException when I run my job from a JAR in the CLI.
This doesn't occur in the IDE.

I've build the JAR using the "maven-shade-plugin" and the pom.xml configuration Robert has provided here:
= https://stackoverflow.com/questions/30102523/linkage-failure-when-running-a= pache-flink-jobs
I specify the entry point using the "-c&qu= ot; option.

The array the Exception refers to is actually initialized when a vertices dataset is read from the file system.

Any ideas on what could cause this issue?

Best,
Mihail

P.S.: the stack trace:

org.apache.flink.client.program.ProgramI= nvocationException: The program execution failed: Job execution failed.
=C2=A0=C2=A0=C2=A0 at org.apache.flink.client.program.Client.run(Cl= ient.java:413)
=C2=A0=C2=A0=C2=A0 at org.apache.flink.client.program.Client.run(Cl= ient.java:356)
=C2=A0=C2=A0=C2=A0 at org.apache.flink.client.program.Client.run(Cl= ient.java:349)
=C2=A0=C2=A0=C2=A0 at org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironme= nt.java:63)
=C2=A0=C2=A0=C2=A0 at graphdistance.KAPSPNaiveJob.main(KAPSPNaiveJo= b.java:56)
=C2=A0=C2=A0=C2=A0 at sun.reflect.NativeMethodAccessorImpl.invoke0(= Native Method)
=C2=A0=C2=A0=C2=A0 at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:5= 7)
=C2=A0=C2=A0=C2=A0 at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImp= l.java:43)
=C2=A0=C2=A0=C2=A0 at java.lang.reflect.Method.invoke(Method.java:6= 06)
=C2=A0=C2=A0=C2=A0 at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProg= ram.java:437)
=C2=A0=C2=A0=C2=A0 at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExe= cution(PackagedProgram.java:353)
=C2=A0=C2=A0=C2=A0 at org.apache.flink.client.program.Client.run(Cl= ient.java:315)
=C2=A0=C2=A0=C2=A0 at org.apache.flink.client.CliFrontend.executePr= ogram(CliFrontend.java:584)
=C2=A0=C2=A0=C2=A0 at org.apache.flink.client.CliFrontend.run(CliFr= ontend.java:290)
=C2=A0=C2=A0=C2=A0 at org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:880)
=C2=A0=C2=A0=C2=A0 at org.apache.flink.client.CliFrontend.main(CliF= rontend.java:922)
Caused by: org.apache.flink.runtime.client.JobExecutionE= xception: Job execution failed.
=C2=A0=C2=A0=C2=A0 at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$receiveWithLogMessa= ges$1.applyOrElse(JobManager.scala:314)
=C2=A0=C2=A0=C2=A0 at scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartial= Function.scala:33)
=C2=A0=C2=A0=C2=A0 at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction= .scala:33)
=C2=A0=C2=A0=C2=A0 at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction= .scala:25)
=C2=A0=C2=A0=C2=A0 at org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.sc= ala:36)
=C2=A0=C2=A0=C2=A0 at org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.sc= ala:29)
=C2=A0=C2=A0=C2=A0 at scala.PartialFunction$class.applyOrElse(Parti= alFunction.scala:118)
=C2=A0=C2=A0=C2=A0 at org.apache.flink.runtime.ActorLogMessages$$anon$1.applyOrElse(ActorLogMessa= ges.scala:29)
=C2=A0=C2=A0=C2=A0 at akka.actor.Actor$class.aroundReceive(Actor.sc= ala:465)
=C2=A0=C2=A0=C2=A0 at org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.sca= la:92)
=C2=A0=C2=A0=C2=A0 at akka.actor.ActorCell.receiveMessage(ActorCell= .scala:516)
=C2=A0=C2=A0=C2=A0 at akka.actor.ActorCell.invoke(ActorCell.scala:4= 87)
=C2=A0=C2=A0=C2=A0 at akka.dispatch.Mailbox.processMailbox(Mailbox.= scala:254)
=C2=A0=C2=A0=C2=A0 at akka.dispatch.Mailbox.run(Mailbox.scala:221)<= /i>
=C2=A0=C2=A0=C2=A0 at akka.dispatch.Mailbox.exec(Mailbox.scala:231)=
=C2=A0=C2=A0=C2=A0 at scala.concurrent.forkjoin.ForkJoinTask.doExec= (ForkJoinTask.java:260)
=C2=A0=C2=A0=C2=A0 at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:= 1339)
=C2=A0=C2=A0=C2=A0 at scala.concurrent.forkjoin.ForkJoinPool.runWor= ker(ForkJoinPool.java:1979)
=C2=A0=C2=A0=C2=A0 at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.jav= a:107)
Caused by: java.lang.ArrayIndexOutOfB= oundsException: 0
=C2=A0=C2=A0=C2=A0 at graphdistance.APSP$InitVerticesMapper.map(APS= P.java:74)
=C2=A0=C2=A0=C2=A0 at graphdistance.APSP$InitVerticesMapper.map(APS= P.java:48)
=C2=A0=C2=A0=C2=A0 at org.apache.flink.graph.Graph$2.map(Graph.java= :389)
=C2=A0=C2=A0=C2=A0 at org.apache.flink.graph.Graph$2.map(Graph.java= :387)
=C2=A0=C2=A0=C2=A0 at org.apache.flink.runtime.operators.MapDriver.= run(MapDriver.java:97)
=C2=A0=C2=A0=C2=A0 at org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java= :496)
=C2=A0=C2=A0=C2=A0 at org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.j= ava:362)
=C2=A0=C2=A0=C2=A0 at org.apache.flink.runtime.taskmanager.Task.run= (Task.java:559)
=C2=A0=C2=A0=C2=A0 at java.lang.Thread.run(Thread.java:745)






--001a1141ca825bad490519a364de--