flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Ankur Sharma <an...@stud.uni-saarland.de>
Subject Re: SourceFunction Scala
Date Mon, 07 Mar 2016 12:19:44 GMT
Hi, 


I am getting following error while executing the fat jar of project: Any help?


Exception in thread "main" java.lang.NoClassDefFoundError: org/apache/flink/streaming/util/serialization/DeserializationSchema
        at org.mpi.debs.Main.main(Main.scala)
Caused by: java.lang.ClassNotFoundException: org.apache.flink.streaming.util.serialization.DeserializationSchema
        at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
        at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
        ... 1 more


Main.scala: 

import org.apache.flink.streaming.api.functions.sink.SinkFunction
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.rabbitmq.RMQSource
import org.apache.flink.streaming.util.serialization.SimpleStringSchema


object Main {
  def main(args: Array[String]) {
    val env = StreamExecutionEnvironment.createLocalEnvironment(1)
    val stream = env.addSource(new RMQSource[String]("localhost","query-one", new SimpleStringSchema))
    stream.addSink(new SinkFunction[String] {
      override def invoke(value: String) = {
        println(value)
      }
    })
    env.execute("QueryOneExecutor")
  }
}
Best,
Ankur Sharma

> On 06 Mar 2016, at 20:34, Márton Balassi <balassi.marton@gmail.com> wrote:
> 
> Hey Ankur,
> 
> Add the following line to your imports, and have a look at the referenced FAQ. [1]
> 
> import org.apache.flink.streaming.api.scala._
> 
> [1] https://flink.apache.org/faq.html#in-scala-api-i-get-an-error-about-implicit-values-and-evidence-parameters
<https://flink.apache.org/faq.html#in-scala-api-i-get-an-error-about-implicit-values-and-evidence-parameters>
> 
> Best,
> 
> Marton
> 
> On Sun, Mar 6, 2016 at 8:17 PM, Ankur Sharma <ankur@stud.uni-saarland.de <mailto:ankur@stud.uni-saarland.de>>
wrote:
> Hello,
> 
> I am trying to use a custom source function (declaration given below) for DataStream.
> if I add the source to stream using add source: 
> 
> val stream = env.addSource(new QueryOneSource(args))
> I get following error:  Any explanations and help ??
> 
> Error:(14, 31) could not find implicit value for evidence parameter of type org.apache.flink.api.common.typeinfo.TypeInformation[org.mpi.debs.Tuple]
>     val stream = env.addSource(new QueryOneSource(args))
>                               ^
> Error:(14, 31) not enough arguments for method addSource: (implicit evidence$15: scala.reflect.ClassTag[org.mpi.debs.Tuple],
implicit evidence$16: org.apache.flink.api.common.typeinfo.TypeInformation[org.mpi.debs.Tuple])org.apache.flink.streaming.api.scala.DataStream[org.mpi.debs.Tuple].
> Unspecified value parameter evidence$16.
>     val stream = env.addSource(new QueryOneSource(args))
>                               ^
> 
> class QueryOneSource(filenames: Array[String]) extends SourceFunction[Tuple] {
> val nextTuple: Tuple // case class Tuple(id: Long, value: Int)
> override def run(ctx: SourceContext[Tuple]) = {
>   while (true) {
>     nextRecord()
>     ctx.collect(this.nextTuple)
>   }
> }
> 
> override def cancel() = { }
> }
> 
> override def nextRecord() = {
> }
> }
> 
> Best,
> Ankur Sharma
> Information Systems Group
> 3.15 E1.1 Universität des Saarlandes
> 66123, Saarbrücken Germany
> Email: ankur.sharma@mpi-inf.mpg.de <mailto:ankur.sharma@mpi-inf.mpg.de> 
>             ankur@stud.uni-saarland.de <mailto:ankur@stud.uni-saarland.de>
> 


Mime
View raw message