Return-Path: X-Original-To: apmail-flink-dev-archive@www.apache.org Delivered-To: apmail-flink-dev-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 3B7E417954 for ; Thu, 16 Apr 2015 08:15:31 +0000 (UTC) Received: (qmail 8318 invoked by uid 500); 16 Apr 2015 08:15:28 -0000 Delivered-To: apmail-flink-dev-archive@flink.apache.org Received: (qmail 8259 invoked by uid 500); 16 Apr 2015 08:15:28 -0000 Mailing-List: contact dev-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flink.apache.org Delivered-To: mailing list dev@flink.apache.org Received: (qmail 8248 invoked by uid 99); 16 Apr 2015 08:15:28 -0000 Received: from mail-relay.apache.org (HELO mail-relay.apache.org) (140.211.11.15) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 16 Apr 2015 08:15:28 +0000 Received: from mail-qg0-f41.google.com (mail-qg0-f41.google.com [209.85.192.41]) by mail-relay.apache.org (ASF Mail Server at mail-relay.apache.org) with ESMTPSA id AB85F1A0040 for ; Thu, 16 Apr 2015 08:15:27 +0000 (UTC) Received: by qgej70 with SMTP id j70so2133022qge.2 for ; Thu, 16 Apr 2015 01:15:26 -0700 (PDT) MIME-Version: 1.0 X-Received: by 10.141.18.208 with SMTP id u199mr37496420qhd.47.1429172126913; Thu, 16 Apr 2015 01:15:26 -0700 (PDT) Received: by 10.96.201.226 with HTTP; Thu, 16 Apr 2015 01:15:26 -0700 (PDT) In-Reply-To: References: Date: Thu, 16 Apr 2015 10:15:26 +0200 Message-ID: Subject: Re: Flink interactive Scala shell From: Kostas Tzoumas To: "dev@flink.apache.org" Content-Type: multipart/alternative; boundary=001a113fa19af2c5060513d31090 --001a113fa19af2c5060513d31090 Content-Type: text/plain; charset=UTF-8 Great, let us know if you run into any issues. Can you create a JIRA on the REPL and link to your repository for the community to track the status? On Wed, Apr 15, 2015 at 4:23 PM, Nikolaas s wrote: > Thanks for the feedback guys! > Apparently The Scala Shell compiles the Shell input to some kind of virtual > directory. > It should be possible to create a jar from it's content and then hand it > over to Flink for execution in some way. > I will further investigate.. > > cheers, > Nikolaas > > 2015-04-15 11:20 GMT+02:00 Stephan Ewen : > > > To give a bit of context for the exception: > > > > To execute a program, the classes of the user functions need to be > > available the executing TaskManagers. > > > > - If you execute locally from the IDE, all classes are in the classpath > > anyways. > > - If you use the remote environment, you need to attach the jar file to > > environment. > > > > - In your case (repl), you need to make sure that the generated classes > > are given to the TaskManager. In that sense, the approach is probably > > similar to the case of executing with a remote environment - only that > you > > do not have a jar file up front, but need to generate it on the fly. As > > Robert mentioned, https://github.com/apache/flink/pull/35 may have a > first > > solution to that. Other approaches are also possible, like simply always > > bundling all classes in the directory where the repl puts its generated > > classes. > > > > Greetings, > > Stephan > > > > > > On Tue, Apr 14, 2015 at 11:49 PM, Aljoscha Krettek > > wrote: > > > > > I will look into it once I have some time (end of this week, or next > > > week probably) > > > > > > On Tue, Apr 14, 2015 at 8:51 PM, Robert Metzger > > > wrote: > > > > Hey Nikolaas, > > > > > > > > Thank you for posting on the mailing list. I've met Nikolaas today in > > > > person and we were talking a bit about an interactive shell for > Flink, > > > > potentially also an integration with Zeppelin. > > > > > > > > Great stuff I'm really looking forward to :) > > > > > > > > We were wondering if somebody from the list has some experience with > > the > > > > scala shell. > > > > I've pointed Nikolaas also to this PR: > > > > https://github.com/apache/flink/pull/35. > > > > > > > > Best, > > > > Robert > > > > > > > > > > > > On Tue, Apr 14, 2015 at 5:26 PM, nse sik < > > nikolaas.steenbergen@gmail.com > > > > > > > > wrote: > > > > > > > >> Hi! > > > >> I am trying to implement a scala shell for flink. > > > >> > > > >> I've started with a simple scala object who's main function will > drop > > > the > > > >> user to the interactive scala shell (repl) at one point: > > > >> > > > >> > > > >> > > > >> > > > >> import scala.tools.nsc.interpreter.ILoop > > > >> import scala.tools.nsc.Settings > > > >> > > > >> object Job { > > > >> def main(args: Array[String]) { > > > >> > > > >> val repl = new ILoop() > > > >> repl.settings = new Settings() > > > >> > > > >> // enable this line to use scala in intellij > > > >> repl.settings.usejavacp.value = true > > > >> > > > >> repl.createInterpreter() > > > >> > > > >> // start scala interpreter shell > > > >> repl.process(repl.settings) > > > >> > > > >> repl.closeInterpreter() > > > >> } > > > >> } > > > >> > > > >> > > > >> > > > >> > > > >> Now I am trying to execute the word count example as in: > > > >> > > > >> > > > >> > > > >> > > > >> scala> import org.apache.flink.api.scala._ > > > >> > > > >> scala> val env = ExecutionEnvironment.getExecutionEnvironment > > > >> > > > >> scala> val text = env.fromElements("To be, or not to be,--that is > the > > > >> question:--","Whether 'tis nobler in the mind to suffer", "The > slings > > > and > > > >> arrows of outrageous fortune","Or to take arms against a sea of > > > troubles,") > > > >> > > > >> scala> val counts = text.flatMap { _.toLowerCase.split("\\W+") > }.map { > > > (_, > > > >> 1) }.groupBy(0).sum(1) > > > >> > > > >> scala> counts.print() > > > >> > > > >> scala> env.execute("Flink Scala Api Skeleton") > > > >> > > > >> > > > >> > > > >> > > > >> > > > >> > > > >> However I am running into following error: > > > >> > > > >> env.execute("Flink Scala Api Skeleton") > > > >> org.apache.flink.runtime.client.JobExecutionException: > > > >> java.lang.RuntimeException: The initialization of the DataSource's > > > outputs > > > >> caused an error: The type serializer factory could not load its > > > parameters > > > >> from the configuration due to missing classes. > > > >> at > > > >> > > > >> > > > > > > org.apache.flink.runtime.operators.DataSourceTask.registerInputOutput(DataSourceTask.java:89) > > > >> at > > > >> > > > >> > > > > > > org.apache.flink.runtime.execution.RuntimeEnvironment.(RuntimeEnvironment.java:187) > > > >> at > > > >> > > > >> > > > > > > org.apache.flink.runtime.taskmanager.TaskManager.submitTask(TaskManager.java:612) > > > >> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > > > >> at > > > >> > > > >> > > > > > > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) > > > >> at > > > >> > > > >> > > > > > > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > > > >> at java.lang.reflect.Method.invoke(Method.java:606) > > > >> at org.apache.flink.runtime.ipc.RPC$Server.call(RPC.java:420) > > > >> at org.apache.flink.runtime.ipc.Server$Handler.run(Server.java:949) > > > >> Caused by: java.lang.RuntimeException: The type serializer factory > > could > > > >> not load its parameters from the configuration due to missing > classes. > > > >> at > > > >> > > > >> > > > > > > org.apache.flink.runtime.operators.util.TaskConfig.getTypeSerializerFactory(TaskConfig.java:1086) > > > >> at > > > >> > > > >> > > > > > > org.apache.flink.runtime.operators.util.TaskConfig.getOutputSerializer(TaskConfig.java:542) > > > >> at > > > >> > > > >> > > > > > > org.apache.flink.runtime.operators.RegularPactTask.getOutputCollector(RegularPactTask.java:1251) > > > >> at > > > >> > > > >> > > > > > > org.apache.flink.runtime.operators.RegularPactTask.initOutputs(RegularPactTask.java:1359) > > > >> at > > > >> > > > >> > > > > > > org.apache.flink.runtime.operators.DataSourceTask.initOutputs(DataSourceTask.java:288) > > > >> at > > > >> > > > >> > > > > > > org.apache.flink.runtime.operators.DataSourceTask.registerInputOutput(DataSourceTask.java:87) > > > >> ... 8 more > > > >> Caused by: java.lang.ClassNotFoundException: $anon$2$$anon$1 > > > >> at java.net.URLClassLoader$1.run(URLClassLoader.java:366) > > > >> at java.net.URLClassLoader$1.run(URLClassLoader.java:355) > > > >> at java.security.AccessController.doPrivileged(Native Method) > > > >> at java.net.URLClassLoader.findClass(URLClassLoader.java:354) > > > >> at java.lang.ClassLoader.loadClass(ClassLoader.java:425) > > > >> at java.lang.ClassLoader.loadClass(ClassLoader.java:358) > > > >> at java.lang.Class.forName0(Native Method) > > > >> at java.lang.Class.forName(Class.java:274) > > > >> at > > > >> > > > >> > > > > > > org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.resolveClass(InstantiationUtil.java:54) > > > >> at > > > java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1612) > > > >> at > > java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1517) > > > >> at > > > >> > > > > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771) > > > >> at > java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) > > > >> at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) > > > >> at > > > >> > > > >> > > > > > > org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:274) > > > >> at > > > >> > > > >> > > > > > > org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:236) > > > >> at > > > >> > > > >> > > > > > > org.apache.flink.api.java.typeutils.runtime.RuntimeSerializerFactory.readParametersFromConfig(RuntimeSerializerFactory.java:76) > > > >> at > > > >> > > > >> > > > > > > org.apache.flink.runtime.operators.util.TaskConfig.getTypeSerializerFactory(TaskConfig.java:1084) > > > >> ... 13 more > > > >> > > > >> at > > > >> > > > >> > > > > > > org.apache.flink.runtime.client.JobClient.submitJobAndWait(JobClient.java:349) > > > >> at > > > >> > > > > org.apache.flink.client.LocalExecutor.executePlan(LocalExecutor.java:239) > > > >> at > > > >> > > > >> > > > > > > org.apache.flink.api.java.LocalEnvironment.execute(LocalEnvironment.java:51) > > > >> at > > > >> > > > >> > > > > > > org.apache.flink.api.scala.ExecutionEnvironment.execute(ExecutionEnvironment.scala:501) > > > >> at .(:12) > > > >> at .() > > > >> at .(:7) > > > >> at .() > > > >> at $print() > > > >> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > > > >> at > > > >> > > > >> > > > > > > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) > > > >> at > > > >> > > > >> > > > > > > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > > > >> at java.lang.reflect.Method.invoke(Method.java:606) > > > >> at > > scala.tools.nsc.interpreter.IMain$ReadEvalPrint.call(IMain.scala:734) > > > >> at > > scala.tools.nsc.interpreter.IMain$Request.loadAndRun(IMain.scala:983) > > > >> at > scala.tools.nsc.interpreter.IMain.loadAndRunReq$1(IMain.scala:573) > > > >> at scala.tools.nsc.interpreter.IMain.interpret(IMain.scala:604) > > > >> at scala.tools.nsc.interpreter.IMain.interpret(IMain.scala:568) > > > >> at > > scala.tools.nsc.interpreter.ILoop.reallyInterpret$1(ILoop.scala:760) > > > >> at > > > > scala.tools.nsc.interpreter.ILoop.interpretStartingWith(ILoop.scala:805) > > > >> at scala.tools.nsc.interpreter.ILoop.command(ILoop.scala:717) > > > >> at scala.tools.nsc.interpreter.ILoop.processLine$1(ILoop.scala:581) > > > >> at scala.tools.nsc.interpreter.ILoop.innerLoop$1(ILoop.scala:588) > > > >> at scala.tools.nsc.interpreter.ILoop.loop(ILoop.scala:591) > > > >> at > > > >> > > > >> > > > > > > scala.tools.nsc.interpreter.ILoop$$anonfun$process$1.apply$mcZ$sp(ILoop.scala:882) > > > >> at > > > >> > > > > > > scala.tools.nsc.interpreter.ILoop$$anonfun$process$1.apply(ILoop.scala:837) > > > >> at > > > >> > > > > > > scala.tools.nsc.interpreter.ILoop$$anonfun$process$1.apply(ILoop.scala:837) > > > >> at > > > >> > > > >> > > > > > > scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135) > > > >> at scala.tools.nsc.interpreter.ILoop.process(ILoop.scala:837) > > > >> at org.myorg.quickstart.Job$.main(Job.scala:37) > > > >> at org.myorg.quickstart.Job.main(Job.scala) > > > >> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > > > >> at > > > >> > > > >> > > > > > > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) > > > >> at > > > >> > > > >> > > > > > > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > > > >> at java.lang.reflect.Method.invoke(Method.java:606) > > > >> at > > com.intellij.rt.execution.application.AppMain.main(AppMain.java:134) > > > >> > > > >> > > > >> > > > >> I'm pretty new to Scala and Flink, so maybe someone has a suggestion > > or > > > can > > > >> point me in some direction? > > > >> > > > >> thanks, > > > >> Nikolaas > > > >> > > > > > > --001a113fa19af2c5060513d31090--