flink-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Deepak Jha <dkjhan...@gmail.com>
Subject Re: Kafka As Sink in Flink Streaming
Date Sat, 23 Jan 2016 02:31:11 GMT
Hi Robert,
No it was compile time issue. Actually i tried to write a string as well
but it did not work for me. Just for clarity my flink-connector-kafka
version is 0.10.1....

I was able to fix the issue... SimpleStringSchema should be replaced with
JavaDefaultStringSchema as the later is doing conversion from String to
Byte Array.

Thanks for the help though.

On Fri, Jan 22, 2016 at 1:34 PM, Robert Metzger <rmetzger@apache.org> wrote:

> Hi,
>
> did you get any error message while executing the job? I don't think you
> can serialize the "Demo" type with the "SimpleStringSchema".
>
>
>
>
> On Fri, Jan 22, 2016 at 8:13 PM, Deepak Jha <dkjhanitt@gmail.com> wrote:
>
> > Hi Devs,
> > I just started using Flink and would like to ass kafka as Sink. I went
> > through the documentation but so far I've not succeeded in writing to
> Kafka
> > from Flink....
> >
> > I' building application in Scala.... Here is my code snippet
> >
> > case class *Demo*(city: String, country: String, zipcode: Int)
> >
> > The map stage returns an instance of Demo type
> >
> >  val env = StreamExecutionEnvironment.getExecutionEnvironment
> >
> >   val properties = new Properties()
> >
> >   properties.setProperty("bootstrap.servers", "127.0.0.1:9092")
> >   properties.setProperty("zookeeper.connect", "127.0.0.1:2181")
> >   properties.setProperty("group.id", "test_topic")
> > val mapToDemo: String => Demo = {//Implementation}
> >
> > val stream = env.addSource(new FlinkKafkaConsumer082[String]("test", new
> > SimpleStringSchema, properties))
> >
> > stream.map(mapToDemo).addSink(new FlinkKafkaProducer[Demo]("
> 127.0.0.1:9092
> > ",
> > "test_topic", new SimpleStringSchema()))
> >
> > Can anyone explain me what am I doing wrong in adding Kafka as Sink ?
> > --
> > Thanks,
> > Deepak Jha
> >
>



-- 
Thanks,
Deepak Jha

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message