flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Philipp Goetze <philipp.goe...@tu-ilmenau.de>
Subject Re: Submitting jobs from within Scala code
Date Thu, 16 Jul 2015 15:45:39 GMT
Hi Till,

many thanks for your effort. I finally got it working.

I'm a bit embarrassed because the issue was solved by using the same 
flink-dist-JAR from the locally running Flink version. So to say I used 
an older Snapshot version for compiling than for running :-[

Best Regards,
Philipp

On 16.07.2015 17:35, Till Rohrmann wrote:
>
> Hi Philipp,
>
> what I usually do to run a Flink program on a cluster from within my 
> IDE, I create a |RemoteExecutionEnvironment|. Since I have one UDF 
> (the map function which doubles the values) defined, I also need to 
> specify the jar containing this class. In my case, the jar is called 
> test-1.0-SNAPSHOT.jar.
>
> |def main(args: Array[String]) {
>      // set up the execution environment
>      val env = ExecutionEnvironment.createRemoteEnvironment("localhost", 6123, "target/test-1.0-SNAPSHOT.jar")
>
>      val elements = env.fromElements(1,2,3,4,5)
>
>      val doubled = elements.map(x => 2*x)
>
>      doubled.printOnTaskManager("TaskManager")
>
>      // execute program
>      env.execute("Flink Scala API Skeleton")
>    }
> |
>
> But I also tried your approach of how to submit jobs to Flink and it 
> worked for me as well. Therefore, I guess that there is something 
> wrong with your job. What happens in |PigStorage().load|?
>
> Cheers,
> Till
>
> ‚Äč
>
> On Thu, Jul 16, 2015 at 4:35 PM, Philipp Goetze 
> <philipp.goetze@tu-ilmenau.de <mailto:philipp.goetze@tu-ilmenau.de>> 
> wrote:
>
>     Hey Tim,
>
>     I think my previous mail was intercepted or something similar.
>     However you can find my reply below. I already tried a simpler job
>     which just does a env.fromElements... but still the same stack.
>
>     How do you normally submit jobs (jars) from within the code?
>
>     Best Regards,
>     Philipp
>
>
>     -------- Forwarded Message --------
>     Subject: 	Re: Submitting jobs from within Scala code
>     Date: 	Thu, 16 Jul 2015 14:31:01 +0200
>     From: 	Philipp Goetze <philipp.goetze@tu-ilmenau.de>
>     <mailto:philipp.goetze@tu-ilmenau.de>
>     To: 	user@flink.apache.org <mailto:user@flink.apache.org>
>
>
>
>     Hey,
>
>     from the JobManager I do not get any more hints:
>
>     13:36:06,674 DEBUG org.apache.flink.runtime.jobmanager.MemoryArchivist          
- Received message RequestJobCounts at akka://flink/user/archive from Actor[akka://flink/temp/$gc].
>     13:36:06,674 DEBUG org.apache.flink.runtime.jobmanager.MemoryArchivist          
- Handled message RequestJobCounts in 0 ms from Actor[akka://flink/temp/$gc].
>     13:36:06,674 DEBUG org.eclipse.jetty.util.log                                   
- RESPONSE /jobsInfo  200
>     13:36:06,965 DEBUG org.apache.flink.runtime.jobmanager.JobManager               
- Received message RequestBlobManagerPort at akka://flink/user/jobmanager from Actor[akka.tcp://flink@127.0.0.1:43640/temp/$b
 <mailto:akka.tcp://flink@127.0.0.1:43640/temp/$b>].
>     13:36:06,965 DEBUG org.apache.flink.runtime.jobmanager.JobManager               
- Handled message RequestBlobManagerPort in 0 ms from Actor[akka.tcp://flink@127.0.0.1:43640/temp/$b
 <mailto:akka.tcp://flink@127.0.0.1:43640/temp/$b>].
>     13:36:06,984 DEBUG org.apache.flink.runtime.blob.BlobServerConnection           
- Received PUT request for content addressable BLOB
>     13:36:07,086 DEBUG org.apache.flink.runtime.jobmanager.JobManager               
- Received message SubmitJob(org.apache.flink.runtime.jobgraph.JobGraph@18746a85,true) at
akka://flink/user/jobmanager from Actor[akka.tcp://flink@127.0.0.1:43640/user/$a#224238443
 <mailto:akka.tcp://flink@127.0.0.1:43640/user/$a#224238443>].
>     13:36:07,087 INFO  org.apache.flink.runtime.jobmanager.JobManager               
- Received job 146bc2162de9353bbd457a74eda59ae3 (Starting Query).
>     13:36:07,087 DEBUG org.apache.flink.runtime.jobmanager.JobManager               
- Running initialization on master for job 146bc2162de9353bbd457a74eda59ae3 (Starting Query).
>     13:36:07,087 ERROR org.apache.flink.runtime.jobmanager.JobManager               
- Failed to submit job 146bc2162de9353bbd457a74eda59ae3 (Starting Query)
>     org.apache.flink.runtime.client.JobSubmissionException: The vertex null (null) has
no invokable class.
>     	at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$4.apply(JobManager.scala:511)
>     	at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$4.apply(JobManager.scala:507)
>     	at scala.collection.Iterator$class.foreach(Iterator.scala:743)
>     	at scala.collection.AbstractIterator.foreach(Iterator.scala:1195)
>     	at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
>     	at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>     	atorg.apache.flink.runtime.jobmanager.JobManager.org  <http://org.apache.flink.runtime.jobmanager.JobManager.org>$apache$flink$runtime$jobmanager$JobManager$$submitJob(JobManager.scala:507)
>     	at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$receiveWithLogMessages$1.applyOrElse(JobManager.scala:190)
>     	at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
>     	at org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:43)
>     	at org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:29)
>     	at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
>     	at org.apache.flink.runtime.ActorLogMessages$$anon$1.applyOrElse(ActorLogMessages.scala:29)
>     	at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
>     	at org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:92)
>     	at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
>     	at akka.actor.ActorCell.invoke(ActorCell.scala:487)
>     	at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)
>     	at akka.dispatch.Mailbox.run(Mailbox.scala:221)
>     	at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
>     	at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>     	at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>     	at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>     	at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>     13:36:07,088 DEBUG org.apache.flink.runtime.executiongraph.ExecutionGraph       
- Starting Query switched from CREATED to FAILING.
>     13:36:07,088 DEBUG org.apache.flink.runtime.executiongraph.ExecutionGraph       
- Starting Query switched from FAILING to FAILED.
>     13:36:07,088 DEBUG org.apache.flink.runtime.jobmanager.JobManager               
- Handled message SubmitJob(org.apache.flink.runtime.jobgraph.JobGraph@18746a85,true) in 1
ms from Actor[akka.tcp://flink@127.0.0.1:43640/user/$a#224238443  <mailto:akka.tcp://flink@127.0.0.1:43640/user/$a#224238443>].
>     13:36:07,524 DEBUG Remoting                                                     
- Remote system with address [akka.tcp://flink@127.0.0.1:43640  <mailto:akka.tcp://flink@127.0.0.1:43640>]
has shut down. Address is now gated for 5000 ms, all messages to this address will be delivered
to dead letters.
>
>
>
>     The code of the job is quite simple (just a test-case). As stated
>     before it works when using the wrapper script and the web client.
>     I think something is wrong in the submitJar method I posted
>     earlier. But here the code of the submitted job:
>
>     import org.apache.flink.api.scala._
>
>     import dbis.flink._
>
>     object load {
>
>     def tupleAToString(t: List[Any]): String = {
>
>        implicit def anyToSeq(a: Any) = a.asInstanceOf[Seq[Any]]
>
>        val sb = new StringBuilder
>
>        sb.append(t(0))
>
>        sb.toString
>
>     }
>
>          def main(args: Array[String]) {
>
>              val env = ExecutionEnvironment.getExecutionEnvironment
>
>              val A = PigStorage().load(env, "/home/blaze/Documents/TU_Ilmenau/Masterthesis/projects/pigspark/src/it/resources/file.txt",
'\t')
>
>              A.map(t => tupleAToString(t)).writeAsText("/home/blaze/Documents/TU_Ilmenau/Masterthesis/projects/pigspark/result1.out")
>
>              env.execute("Starting Query")
>
>          }
>
>     }
>
>
>     Best Regards,
>     Philipp
>
>


Mime
View raw message