flink-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Deepak Jha <dkjhan...@gmail.com>
Subject Re: Writing multiple streams to multiple kafka
Date Thu, 31 Mar 2016 18:21:35 GMT
It works... Thanks

On Thu, Mar 31, 2016 at 2:23 AM, Aljoscha Krettek <aljoscha@apache.org>
wrote:

> Hi,
> yes you can output the stages to several different Kafka Topics. If you
> don't want to call addSink inside the run() method you somehow have to
> return the handle to your stage3 DataStream, for example:
>
> private val env = StreamExecutionEnvironment.getExecutionEnvironment
> private val src = env.addSource(Source.kafka(streams.abc.topic))
>
> override def run(stream: DataStream[TypeX]) : = {
>
>   val stage1 = stream
>                        .map(doA)
>                        .map(doB)
>                        .map(doC)
>
>  val stage2 = stage1.map(doD)  *// Returns (isTrue: Boolean, somethingElse:
> TypeT)*
>
> val stage3 = stage2.filter(_.isTrue)
> val stage4 = stage2.filter(! _.isTrue)
>
> (stage3, stage4.map(_.toString)) // return both stages
> }
>
> val (stage3, stage4) = run(src)
> stage3.addSink(Write_To_Kafka_Topic_Y)
> stage4.addSink(Write_To_Kafka_Topic_X)
>
>
> On Wed, 30 Mar 2016 at 20:19 Deepak Jha <dkjhanitt@gmail.com> wrote:
>
> > Hi,
> > I'm building a pipeline using Flink using Kafka as source and sink. As
> part
> > of the this pipeline I have multiple stages in my run command and I would
> > like to publish some substages output into separate kafka topic.
> > My question is can I write multiple stages of run to multiple kafka
> topics
> > ?
> >
> > private val env = StreamExecutionEnvironment.getExecutionEnvironment
> > private val src = env.addSource(Source.kafka(streams.abc.topic))
> >
> > override def run(stream: DataStream[TypeX]) : = {
> >
> >   val stage1 = stream
> >                        .map(doA)
> >                        .map(doB)
> >                        .map(doC)
> >
> >  val stage2 = stage1.map(doD)  *// Returns (isTrue: Boolean,
> somethingElse:
> > TypeT)*
> >
> > val stage3 = stage2.filter(_.isTrue)
> > *stage3.addSink(Write_To_Kafka_Topic_Y)  // Can I do it outside run
> method
> > ?*
> > val stage4 = stage2.filter(! _.isTrue)
> >
> > stage4.map(_.toString)
> > }
> >
> > run(src).addSink(Write_To_Kafka_Topic_X)
> >
> >
> > Ideally I will not prefer to call addSink method inside run (as mentioned
> > in bold lines above).
> > --
> > Thanks,
> > Deepak Jha
> >
>



-- 
Thanks,
Deepak Jha

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message