flink-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Kostas Tzoumas <ktzou...@apache.org>
Subject Re: Flink interactive Scala shell
Date Thu, 16 Apr 2015 08:15:26 GMT
Great, let us know if you run into any issues.

Can you create a JIRA on the REPL and link to your repository for the
community to track the status?

On Wed, Apr 15, 2015 at 4:23 PM, Nikolaas s <nikolaas.steenbergen@gmail.com>
wrote:

> Thanks for the feedback guys!
> Apparently The Scala Shell compiles the Shell input to some kind of virtual
> directory.
> It should be possible to create a jar from it's content and then hand it
> over to Flink for execution in some way.
> I will further investigate..
>
> cheers,
> Nikolaas
>
> 2015-04-15 11:20 GMT+02:00 Stephan Ewen <sewen@apache.org>:
>
> > To give a bit of context for the exception:
> >
> > To execute a program, the classes of the user functions need to be
> > available the executing TaskManagers.
> >
> >  - If you execute locally from the IDE, all classes are in the classpath
> > anyways.
> >  - If you use the remote environment, you need to attach the jar file to
> > environment.
> >
> >  - In your case (repl), you need to make sure that the generated classes
> > are given to the TaskManager. In that sense, the approach is probably
> > similar to the case of executing with a remote environment - only that
> you
> > do not have a jar file up front, but need to generate it on the fly. As
> > Robert mentioned, https://github.com/apache/flink/pull/35 may have a
> first
> > solution to that. Other approaches are also possible, like simply always
> > bundling all classes in the directory where the repl puts its generated
> > classes.
> >
> > Greetings,
> > Stephan
> >
> >
> > On Tue, Apr 14, 2015 at 11:49 PM, Aljoscha Krettek <aljoscha@apache.org>
> > wrote:
> >
> > > I will look into it once I have some time (end of this week, or next
> > > week probably)
> > >
> > > On Tue, Apr 14, 2015 at 8:51 PM, Robert Metzger <rmetzger@apache.org>
> > > wrote:
> > > > Hey Nikolaas,
> > > >
> > > > Thank you for posting on the mailing list. I've met Nikolaas today in
> > > > person and we were talking a bit about an interactive shell for
> Flink,
> > > > potentially also an integration with Zeppelin.
> > > >
> > > > Great stuff I'm really looking forward to :)
> > > >
> > > > We were wondering if somebody from the list has some experience with
> > the
> > > > scala shell.
> > > > I've pointed Nikolaas also to this PR:
> > > > https://github.com/apache/flink/pull/35.
> > > >
> > > > Best,
> > > > Robert
> > > >
> > > >
> > > > On Tue, Apr 14, 2015 at 5:26 PM, nse sik <
> > nikolaas.steenbergen@gmail.com
> > > >
> > > > wrote:
> > > >
> > > >> Hi!
> > > >> I am trying to implement a scala shell for flink.
> > > >>
> > > >> I've started with a simple scala object who's main function will
> drop
> > > the
> > > >> user to the interactive scala shell (repl) at one point:
> > > >>
> > > >>
> > > >>
> > > >>
> > > >> import scala.tools.nsc.interpreter.ILoop
> > > >> import scala.tools.nsc.Settings
> > > >>
> > > >> object Job {
> > > >>   def main(args: Array[String]) {
> > > >>
> > > >>     val repl = new ILoop()
> > > >>     repl.settings = new Settings()
> > > >>
> > > >>     // enable this line to use scala in intellij
> > > >>     repl.settings.usejavacp.value = true
> > > >>
> > > >>     repl.createInterpreter()
> > > >>
> > > >>     // start scala interpreter shell
> > > >>     repl.process(repl.settings)
> > > >>
> > > >>     repl.closeInterpreter()
> > > >>     }
> > > >>   }
> > > >>
> > > >>
> > > >>
> > > >>
> > > >> Now I am trying to execute the word count example as in:
> > > >>
> > > >>
> > > >>
> > > >>
> > > >> scala> import org.apache.flink.api.scala._
> > > >>
> > > >> scala> val env = ExecutionEnvironment.getExecutionEnvironment
> > > >>
> > > >> scala> val text = env.fromElements("To be, or not to be,--that
is
> the
> > > >> question:--","Whether 'tis nobler in the mind to suffer", "The
> slings
> > > and
> > > >> arrows of outrageous fortune","Or to take arms against a sea of
> > > troubles,")
> > > >>
> > > >> scala> val counts = text.flatMap { _.toLowerCase.split("\\W+")
> }.map {
> > > (_,
> > > >> 1) }.groupBy(0).sum(1)
> > > >>
> > > >> scala> counts.print()
> > > >>
> > > >> scala> env.execute("Flink Scala Api Skeleton")
> > > >>
> > > >>
> > > >>
> > > >>
> > > >>
> > > >>
> > > >> However I am running into following error:
> > > >>
> > > >> env.execute("Flink Scala Api Skeleton")
> > > >> org.apache.flink.runtime.client.JobExecutionException:
> > > >> java.lang.RuntimeException: The initialization of the DataSource's
> > > outputs
> > > >> caused an error: The type serializer factory could not load its
> > > parameters
> > > >> from the configuration due to missing classes.
> > > >> at
> > > >>
> > > >>
> > >
> >
> org.apache.flink.runtime.operators.DataSourceTask.registerInputOutput(DataSourceTask.java:89)
> > > >> at
> > > >>
> > > >>
> > >
> >
> org.apache.flink.runtime.execution.RuntimeEnvironment.<init>(RuntimeEnvironment.java:187)
> > > >> at
> > > >>
> > > >>
> > >
> >
> org.apache.flink.runtime.taskmanager.TaskManager.submitTask(TaskManager.java:612)
> > > >> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> > > >> at
> > > >>
> > > >>
> > >
> >
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
> > > >> at
> > > >>
> > > >>
> > >
> >
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> > > >> at java.lang.reflect.Method.invoke(Method.java:606)
> > > >> at org.apache.flink.runtime.ipc.RPC$Server.call(RPC.java:420)
> > > >> at org.apache.flink.runtime.ipc.Server$Handler.run(Server.java:949)
> > > >> Caused by: java.lang.RuntimeException: The type serializer factory
> > could
> > > >> not load its parameters from the configuration due to missing
> classes.
> > > >> at
> > > >>
> > > >>
> > >
> >
> org.apache.flink.runtime.operators.util.TaskConfig.getTypeSerializerFactory(TaskConfig.java:1086)
> > > >> at
> > > >>
> > > >>
> > >
> >
> org.apache.flink.runtime.operators.util.TaskConfig.getOutputSerializer(TaskConfig.java:542)
> > > >> at
> > > >>
> > > >>
> > >
> >
> org.apache.flink.runtime.operators.RegularPactTask.getOutputCollector(RegularPactTask.java:1251)
> > > >> at
> > > >>
> > > >>
> > >
> >
> org.apache.flink.runtime.operators.RegularPactTask.initOutputs(RegularPactTask.java:1359)
> > > >> at
> > > >>
> > > >>
> > >
> >
> org.apache.flink.runtime.operators.DataSourceTask.initOutputs(DataSourceTask.java:288)
> > > >> at
> > > >>
> > > >>
> > >
> >
> org.apache.flink.runtime.operators.DataSourceTask.registerInputOutput(DataSourceTask.java:87)
> > > >> ... 8 more
> > > >> Caused by: java.lang.ClassNotFoundException: $anon$2$$anon$1
> > > >> at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
> > > >> at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
> > > >> at java.security.AccessController.doPrivileged(Native Method)
> > > >> at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
> > > >> at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
> > > >> at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
> > > >> at java.lang.Class.forName0(Native Method)
> > > >> at java.lang.Class.forName(Class.java:274)
> > > >> at
> > > >>
> > > >>
> > >
> >
> org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.resolveClass(InstantiationUtil.java:54)
> > > >> at
> > > java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1612)
> > > >> at
> > java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1517)
> > > >> at
> > > >>
> > >
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771)
> > > >> at
> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
> > > >> at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
> > > >> at
> > > >>
> > > >>
> > >
> >
> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:274)
> > > >> at
> > > >>
> > > >>
> > >
> >
> org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:236)
> > > >> at
> > > >>
> > > >>
> > >
> >
> org.apache.flink.api.java.typeutils.runtime.RuntimeSerializerFactory.readParametersFromConfig(RuntimeSerializerFactory.java:76)
> > > >> at
> > > >>
> > > >>
> > >
> >
> org.apache.flink.runtime.operators.util.TaskConfig.getTypeSerializerFactory(TaskConfig.java:1084)
> > > >> ... 13 more
> > > >>
> > > >> at
> > > >>
> > > >>
> > >
> >
> org.apache.flink.runtime.client.JobClient.submitJobAndWait(JobClient.java:349)
> > > >> at
> > > >>
> > >
> org.apache.flink.client.LocalExecutor.executePlan(LocalExecutor.java:239)
> > > >> at
> > > >>
> > > >>
> > >
> >
> org.apache.flink.api.java.LocalEnvironment.execute(LocalEnvironment.java:51)
> > > >> at
> > > >>
> > > >>
> > >
> >
> org.apache.flink.api.scala.ExecutionEnvironment.execute(ExecutionEnvironment.scala:501)
> > > >> at .<init>(<console>:12)
> > > >> at .<clinit>(<console>)
> > > >> at .<init>(<console>:7)
> > > >> at .<clinit>(<console>)
> > > >> at $print(<console>)
> > > >> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> > > >> at
> > > >>
> > > >>
> > >
> >
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
> > > >> at
> > > >>
> > > >>
> > >
> >
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> > > >> at java.lang.reflect.Method.invoke(Method.java:606)
> > > >> at
> > scala.tools.nsc.interpreter.IMain$ReadEvalPrint.call(IMain.scala:734)
> > > >> at
> > scala.tools.nsc.interpreter.IMain$Request.loadAndRun(IMain.scala:983)
> > > >> at
> scala.tools.nsc.interpreter.IMain.loadAndRunReq$1(IMain.scala:573)
> > > >> at scala.tools.nsc.interpreter.IMain.interpret(IMain.scala:604)
> > > >> at scala.tools.nsc.interpreter.IMain.interpret(IMain.scala:568)
> > > >> at
> > scala.tools.nsc.interpreter.ILoop.reallyInterpret$1(ILoop.scala:760)
> > > >> at
> > >
> scala.tools.nsc.interpreter.ILoop.interpretStartingWith(ILoop.scala:805)
> > > >> at scala.tools.nsc.interpreter.ILoop.command(ILoop.scala:717)
> > > >> at scala.tools.nsc.interpreter.ILoop.processLine$1(ILoop.scala:581)
> > > >> at scala.tools.nsc.interpreter.ILoop.innerLoop$1(ILoop.scala:588)
> > > >> at scala.tools.nsc.interpreter.ILoop.loop(ILoop.scala:591)
> > > >> at
> > > >>
> > > >>
> > >
> >
> scala.tools.nsc.interpreter.ILoop$$anonfun$process$1.apply$mcZ$sp(ILoop.scala:882)
> > > >> at
> > > >>
> > >
> >
> scala.tools.nsc.interpreter.ILoop$$anonfun$process$1.apply(ILoop.scala:837)
> > > >> at
> > > >>
> > >
> >
> scala.tools.nsc.interpreter.ILoop$$anonfun$process$1.apply(ILoop.scala:837)
> > > >> at
> > > >>
> > > >>
> > >
> >
> scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135)
> > > >> at scala.tools.nsc.interpreter.ILoop.process(ILoop.scala:837)
> > > >> at org.myorg.quickstart.Job$.main(Job.scala:37)
> > > >> at org.myorg.quickstart.Job.main(Job.scala)
> > > >> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> > > >> at
> > > >>
> > > >>
> > >
> >
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
> > > >> at
> > > >>
> > > >>
> > >
> >
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> > > >> at java.lang.reflect.Method.invoke(Method.java:606)
> > > >> at
> > com.intellij.rt.execution.application.AppMain.main(AppMain.java:134)
> > > >>
> > > >>
> > > >>
> > > >> I'm pretty new to Scala and Flink, so maybe someone has a suggestion
> > or
> > > can
> > > >> point me in some direction?
> > > >>
> > > >> thanks,
> > > >> Nikolaas
> > > >>
> > >
> >
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message