flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Ankur Sharma <an...@stud.uni-saarland.de>
Subject SourceFunction Scala
Date Sun, 06 Mar 2016 19:17:28 GMT
Hello,

I am trying to use a custom source function (declaration given below) for DataStream.
if I add the source to stream using add source: 

val stream = env.addSource(new QueryOneSource(args))
I get following error:  Any explanations and help ??

Error:(14, 31) could not find implicit value for evidence parameter of type org.apache.flink.api.common.typeinfo.TypeInformation[org.mpi.debs.Tuple]
    val stream = env.addSource(new QueryOneSource(args))
                              ^
Error:(14, 31) not enough arguments for method addSource: (implicit evidence$15: scala.reflect.ClassTag[org.mpi.debs.Tuple],
implicit evidence$16: org.apache.flink.api.common.typeinfo.TypeInformation[org.mpi.debs.Tuple])org.apache.flink.streaming.api.scala.DataStream[org.mpi.debs.Tuple].
Unspecified value parameter evidence$16.
    val stream = env.addSource(new QueryOneSource(args))
                              ^

class QueryOneSource(filenames: Array[String]) extends SourceFunction[Tuple] {
val nextTuple: Tuple // case class Tuple(id: Long, value: Int)
override def run(ctx: SourceContext[Tuple]) = {
  while (true) {
    nextRecord()
    ctx.collect(this.nextTuple)
  }
}

override def cancel() = { }
}

override def nextRecord() = {
}
}

Best,
Ankur Sharma
Information Systems Group
3.15 E1.1 Universit├Ąt des Saarlandes
66123, Saarbr├╝cken Germany
Email: ankur.sharma@mpi-inf.mpg.de <mailto:ankur.sharma@mpi-inf.mpg.de> 
            ankur@stud.uni-saarland.de <mailto:ankur@stud.uni-saarland.de>

Mime
View raw message