flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Stefano Baghino <stefano.bagh...@radicalbit.io>
Subject Re: SourceFunction Scala
Date Sat, 12 Mar 2016 19:25:32 GMT
Hi Ankur,

I'm catching up with this week mailing list right now; I hope you already
solved the issue, but if you haven't this kind of problem happen when you
use a version of Scala for which your Flink dependencies have not been
compiled for. Make sure you append the correct Scala version to the
dependencies you're using, depending on the one you are using for your
project.

You can find more details here:
https://cwiki.apache.org/confluence/display/FLINK/Maven+artifact+names+suffixed+with+Scala+version

On Mon, Mar 7, 2016 at 1:19 PM, Ankur Sharma <ankur@stud.uni-saarland.de>
wrote:

> Hi,
>
>
> I am getting following error while executing the fat jar of project: Any
> help?
>
>
> Exception in thread "main" java.lang.NoClassDefFoundError:
> org/apache/flink/streaming/util/serialization/DeserializationSchema
>         at org.mpi.debs.Main.main(Main.scala)
> Caused by: java.lang.ClassNotFoundException:
> org.apache.flink.streaming.util.serialization.DeserializationSchema
>         at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
>         at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
>         at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
>         at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
>         ... 1 more
>
>
> Main.scala:
>
> import org.apache.flink.streaming.api.functions.sink.SinkFunction
> import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
> import org.apache.flink.streaming.api.scala._
> import org.apache.flink.streaming.connectors.rabbitmq.RMQSource
> import org.apache.flink.streaming.util.serialization.SimpleStringSchema
>
>
> object Main {
>   def main(args: Array[String]) {
>     val env = StreamExecutionEnvironment.createLocalEnvironment(1)
>     val stream = env.addSource(new RMQSource[String]("localhost","query-one", new SimpleStringSchema))
>     stream.addSink(new SinkFunction[String] {
>       override def invoke(value: String) = {
>         println(value)
>       }
>     })
>     env.execute("QueryOneExecutor")
>   }
> }
>
> Best,
> *Ankur Sharma*
>
> On 06 Mar 2016, at 20:34, Márton Balassi <balassi.marton@gmail.com> wrote:
>
> Hey Ankur,
>
> Add the following line to your imports, and have a look at the referenced
> FAQ. [1]
>
> import org.apache.flink.streaming.api.scala._
>
> [1]
> https://flink.apache.org/faq.html#in-scala-api-i-get-an-error-about-implicit-values-and-evidence-parameters
>
> Best,
>
> Marton
>
> On Sun, Mar 6, 2016 at 8:17 PM, Ankur Sharma <ankur@stud.uni-saarland.de>
> wrote:
>
>> Hello,
>>
>> I am trying to use a custom source function (declaration given below) for
>> DataStream.
>> if I add the source to stream using add source:
>>
>> val stream = env.addSource(new QueryOneSource(args))
>>
>> *I get following error:  Any explanations and help ??*
>>
>>
>> Error:(14, 31) could not find implicit value for evidence parameter of type org.apache.flink.api.common.typeinfo.TypeInformation[org.mpi.debs.Tuple]
>>
>>     val stream = env.addSource(new QueryOneSource(args))
>>
>>                               ^
>>
>> Error:(14, 31) not enough arguments for method addSource: (implicit evidence$15:
scala.reflect.ClassTag[org.mpi.debs.Tuple], implicit evidence$16: org.apache.flink.api.common.typeinfo.TypeInformation[org.mpi.debs.Tuple])org.apache.flink.streaming.api.scala.DataStream[org.mpi.debs.Tuple].
>>
>> Unspecified value parameter evidence$16.
>>
>>     val stream = env.addSource(new QueryOneSource(args))
>>
>>                               ^
>>
>>
>> class QueryOneSource(filenames: Array[String]) extends SourceFunction[Tuple] {
>>
>> val nextTuple: Tuple // case class Tuple(id: Long, value: Int)
>>
>> override def run(ctx: SourceContext[Tuple]) = {
>>   while (true) {
>>     nextRecord()
>>     ctx.collect(this.nextTuple)
>>   }
>> }
>>
>> override def cancel() = { }
>>
>> }
>>
>> override def nextRecord() = {
>>
>> }
>>
>> }
>>
>> Best,
>> *Ankur Sharma*
>> *Information Systems Group*
>> *3.15 E1.1 Universität des Saarlandes*
>> *66123, Saarbrücken Germany*
>> *Email: ankur.sharma@mpi-inf.mpg.de <ankur.sharma@mpi-inf.mpg.de> *
>> *            ankur@stud.uni-saarland.de <ankur@stud.uni-saarland.de>*
>>
>>
>
>


-- 
BR,
Stefano Baghino

Software Engineer @ Radicalbit

Mime
View raw message