flink-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Robert Metzger <rmetz...@apache.org>
Subject Re: Kafka As Sink in Flink Streaming
Date Fri, 22 Jan 2016 21:34:00 GMT
Hi,

did you get any error message while executing the job? I don't think you
can serialize the "Demo" type with the "SimpleStringSchema".




On Fri, Jan 22, 2016 at 8:13 PM, Deepak Jha <dkjhanitt@gmail.com> wrote:

> Hi Devs,
> I just started using Flink and would like to ass kafka as Sink. I went
> through the documentation but so far I've not succeeded in writing to Kafka
> from Flink....
>
> I' building application in Scala.... Here is my code snippet
>
> case class *Demo*(city: String, country: String, zipcode: Int)
>
> The map stage returns an instance of Demo type
>
>  val env = StreamExecutionEnvironment.getExecutionEnvironment
>
>   val properties = new Properties()
>
>   properties.setProperty("bootstrap.servers", "127.0.0.1:9092")
>   properties.setProperty("zookeeper.connect", "127.0.0.1:2181")
>   properties.setProperty("group.id", "test_topic")
> val mapToDemo: String => Demo = {//Implementation}
>
> val stream = env.addSource(new FlinkKafkaConsumer082[String]("test", new
> SimpleStringSchema, properties))
>
> stream.map(mapToDemo).addSink(new FlinkKafkaProducer[Demo]("127.0.0.1:9092
> ",
> "test_topic", new SimpleStringSchema()))
>
> Can anyone explain me what am I doing wrong in adding Kafka as Sink ?
> --
> Thanks,
> Deepak Jha
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message