flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Till Rohrmann <trohrm...@apache.org>
Subject Re: Submitting jobs from within Scala code
Date Fri, 17 Jul 2015 15:32:30 GMT
This should be rather easy to add with the latest addition of the
ActorGateway and the message decoration.
​

On Fri, Jul 17, 2015 at 5:04 PM, Stephan Ewen <sewen@apache.org> wrote:

> Seems that version mismatches are one of the most common sources of
> issues...
>
> Maybe we should think about putting a version number into the messages (at
> least between client and JobManager) and fail fast on version mismatches...
>
> On Thu, Jul 16, 2015 at 5:56 PM, Till Rohrmann <trohrmann@apache.org>
> wrote:
>
>> Good to hear that your problem is solved :-)
>>
>> Cheers,
>> Till
>>
>> On Thu, Jul 16, 2015 at 5:45 PM, Philipp Goetze <
>> philipp.goetze@tu-ilmenau.de> wrote:
>>
>>>  Hi Till,
>>>
>>> many thanks for your effort. I finally got it working.
>>>
>>> I'm a bit embarrassed because the issue was solved by using the same
>>> flink-dist-JAR from the locally running Flink version. So to say I used an
>>> older Snapshot version for compiling than for running :-[
>>>
>>> Best Regards,
>>> Philipp
>>>
>>>
>>> On 16.07.2015 17:35, Till Rohrmann wrote:
>>>
>>>  Hi Philipp,
>>>
>>> what I usually do to run a Flink program on a cluster from within my
>>> IDE, I create a RemoteExecutionEnvironment. Since I have one UDF (the
>>> map function which doubles the values) defined, I also need to specify the
>>> jar containing this class. In my case, the jar is called
>>> test-1.0-SNAPSHOT.jar.
>>>
>>> def main(args: Array[String]) {
>>>     // set up the execution environment
>>>     val env = ExecutionEnvironment.createRemoteEnvironment("localhost", 6123,
"target/test-1.0-SNAPSHOT.jar")
>>>
>>>     val elements = env.fromElements(1,2,3,4,5)
>>>
>>>     val doubled = elements.map(x => 2*x)
>>>
>>>     doubled.printOnTaskManager("TaskManager")
>>>
>>>     // execute program
>>>     env.execute("Flink Scala API Skeleton")
>>>   }
>>>
>>> But I also tried your approach of how to submit jobs to Flink and it
>>> worked for me as well. Therefore, I guess that there is something wrong
>>> with your job. What happens in PigStorage().load?
>>>
>>> Cheers,
>>> Till
>>> ​
>>>
>>> On Thu, Jul 16, 2015 at 4:35 PM, Philipp Goetze <
>>> philipp.goetze@tu-ilmenau.de> wrote:
>>>
>>>>  Hey Tim,
>>>>
>>>> I think my previous mail was intercepted or something similar. However
>>>> you can find my reply below. I already tried a simpler job which just does
>>>> a env.fromElements... but still the same stack.
>>>>
>>>> How do you normally submit jobs (jars) from within the code?
>>>>
>>>> Best Regards,
>>>> Philipp
>>>>
>>>>
>>>> -------- Forwarded Message --------  Subject: Re: Submitting jobs from
>>>> within Scala code  Date: Thu, 16 Jul 2015 14:31:01 +0200  From: Philipp
>>>> Goetze <philipp.goetze@tu-ilmenau.de> <philipp.goetze@tu-ilmenau.de>
 To:
>>>> user@flink.apache.org
>>>>
>>>> Hey,
>>>>
>>>> from the JobManager I do not get any more hints:
>>>>
>>>> 13:36:06,674 DEBUG org.apache.flink.runtime.jobmanager.MemoryArchivist  
        - Received message RequestJobCounts at akka://flink/user/archive from Actor[akka://flink/temp/$gc].
>>>> 13:36:06,674 DEBUG org.apache.flink.runtime.jobmanager.MemoryArchivist  
        - Handled message RequestJobCounts in 0 ms from Actor[akka://flink/temp/$gc].
>>>> 13:36:06,674 DEBUG org.eclipse.jetty.util.log                           
        - RESPONSE /jobsInfo  200
>>>> 13:36:06,965 DEBUG org.apache.flink.runtime.jobmanager.JobManager       
        - Received message RequestBlobManagerPort at akka://flink/user/jobmanager from Actor[akka.tcp://flink@127.0.0.1:43640/temp/$b].
>>>> 13:36:06,965 DEBUG org.apache.flink.runtime.jobmanager.JobManager       
        - Handled message RequestBlobManagerPort in 0 ms from Actor[akka.tcp://flink@127.0.0.1:43640/temp/$b].
>>>> 13:36:06,984 DEBUG org.apache.flink.runtime.blob.BlobServerConnection   
        - Received PUT request for content addressable BLOB
>>>> 13:36:07,086 DEBUG org.apache.flink.runtime.jobmanager.JobManager       
        - Received message SubmitJob(org.apache.flink.runtime.jobgraph.JobGraph@18746a85,true)
at akka://flink/user/jobmanager from Actor[akka.tcp://flink@127.0.0.1:43640/user/$a#224238443].
>>>> 13:36:07,087 INFO  org.apache.flink.runtime.jobmanager.JobManager       
        - Received job 146bc2162de9353bbd457a74eda59ae3 (Starting Query).
>>>> 13:36:07,087 DEBUG org.apache.flink.runtime.jobmanager.JobManager       
        - Running initialization on master for job 146bc2162de9353bbd457a74eda59ae3 (Starting
Query).
>>>> 13:36:07,087 ERROR org.apache.flink.runtime.jobmanager.JobManager       
        - Failed to submit job 146bc2162de9353bbd457a74eda59ae3 (Starting Query)
>>>> org.apache.flink.runtime.client.JobSubmissionException: The vertex null (null)
has no invokable class.
>>>> 	at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$4.apply(JobManager.scala:511)
>>>> 	at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$4.apply(JobManager.scala:507)
>>>> 	at scala.collection.Iterator$class.foreach(Iterator.scala:743)
>>>> 	at scala.collection.AbstractIterator.foreach(Iterator.scala:1195)
>>>> 	at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
>>>> 	at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>>>> 	at org.apache.flink.runtime.jobmanager.JobManager.org$apache$flink$runtime$jobmanager$JobManager$$submitJob(JobManager.scala:507)
>>>> 	at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$receiveWithLogMessages$1.applyOrElse(JobManager.scala:190)
>>>> 	at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
>>>> 	at org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:43)
>>>> 	at org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:29)
>>>> 	at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
>>>> 	at org.apache.flink.runtime.ActorLogMessages$$anon$1.applyOrElse(ActorLogMessages.scala:29)
>>>> 	at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
>>>> 	at org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:92)
>>>> 	at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
>>>> 	at akka.actor.ActorCell.invoke(ActorCell.scala:487)
>>>> 	at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)
>>>> 	at akka.dispatch.Mailbox.run(Mailbox.scala:221)
>>>> 	at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
>>>> 	at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>>>> 	at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>>>> 	at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>>>> 	at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>>>> 13:36:07,088 DEBUG org.apache.flink.runtime.executiongraph.ExecutionGraph
       - Starting Query switched from CREATED to FAILING.
>>>> 13:36:07,088 DEBUG org.apache.flink.runtime.executiongraph.ExecutionGraph
       - Starting Query switched from FAILING to FAILED.
>>>> 13:36:07,088 DEBUG org.apache.flink.runtime.jobmanager.JobManager       
        - Handled message SubmitJob(org.apache.flink.runtime.jobgraph.JobGraph@18746a85,true)
in 1 ms from Actor[akka.tcp://flink@127.0.0.1:43640/user/$a#224238443].
>>>> 13:36:07,524 DEBUG Remoting                                             
        - Remote system with address [akka.tcp://flink@127.0.0.1:43640] has shut down. Address
is now gated for 5000 ms, all messages to this address will be delivered to dead letters.
>>>>
>>>>
>>>>
>>>>
>>>> The code of the job is quite simple (just a test-case). As stated
>>>> before it works when using the wrapper script and the web client. I think
>>>> something is wrong in the submitJar method I posted earlier. But here the
>>>> code of the submitted job:
>>>>
>>>> import org.apache.flink.api.scala._
>>>>
>>>> import dbis.flink._
>>>>
>>>> object load {
>>>>
>>>> def tupleAToString(t: List[Any]): String = {
>>>>
>>>>   implicit def anyToSeq(a: Any) = a.asInstanceOf[Seq[Any]]
>>>>
>>>>   val sb = new StringBuilder
>>>>
>>>>   sb.append(t(0))
>>>>
>>>>   sb.toString
>>>>
>>>> }
>>>>
>>>>     def main(args: Array[String]) {
>>>>
>>>>         val env = ExecutionEnvironment.getExecutionEnvironment
>>>>
>>>>         val A = PigStorage().load(env, "/home/blaze/Documents/TU_Ilmenau/Masterthesis/projects/pigspark/src/it/resources/file.txt",
'\t')
>>>>
>>>>         A.map(t => tupleAToString(t)).writeAsText("/home/blaze/Documents/TU_Ilmenau/Masterthesis/projects/pigspark/result1.out")
>>>>
>>>>         env.execute("Starting Query")
>>>>
>>>>     }
>>>>
>>>> }
>>>>
>>>>
>>>> Best Regards,
>>>> Philipp
>>>>
>>>
>>>
>>>
>>
>

Mime
View raw message