flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Aljoscha Krettek <aljos...@apache.org>
Subject Re: Window Function on AllWindowed Stream - Combining Kafka Topics
Date Wed, 03 May 2017 09:11:54 GMT
Hi,
An AllWindow operator requires an AllWindowFunction, instead of a WindowFunction. In your
case, the keyBy() seems to be in the wrong place, to get a keyed window you have to write
something akin to:

inputStream
  .keyBy(…)
  .window(…)
  .apply(…) // or reduce()

In your case, you key the stream and then the keying is “lost” again because you apply
a flatMap(). That’s why you have an all-window and not a keyed window.

Best,
Aljoscha
> On 2. May 2017, at 09:20, G.S.Vijay Raajaa <gsvijayraajaa@gmail.com> wrote:
> 
> Hi,
> 
> I am trying to combine two kafka topics using the a single kafka consumer on a list of
topics, further convert the json string in the stream to POJO. Then, join them via keyBy (
On event time field ) and to merge them as a single fat json, I was planning to use a window
stream and apply a window function on the window stream. The assumption is that Topic-A &
Topic-B can be joined on Event Time and only one pair ( Topic A ( JSON ) , Topic B (JSON )
will be present with the same eventTime. Hence was planning to use a coutWindow(2) post keyBy
on eventTime.
> 
> I have couple of questions for the same;
> 
> 1. Is the approach fine for merging topics and creating a single JSON?
> 2. The window function on All Window stream doesnt seem to work fine; Any pointers will
be greatly appreciated.
> 
> Code Snippet : 
> StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
> 
> logger.info <http://logger.info/>("Flink Stream Window Charger has started");
> 
> Properties properties = new Properties();
> 
> properties.setProperty("bootstrap.servers", "127.0.0.1:1030 <http://127.0.0.1:1030/>");
> 
> properties.setProperty("zookeeper.connect", "127.0.0.1:2181/service-kafka <http://127.0.0.1:2181/service-kafka>");
> 
> properties.setProperty("group.id <http://group.id/>", "group-0011");
> 
> properties.setProperty("auto.offset.reset", "smallest");
> 
> 
> 
> List < String > names = new ArrayList < > ();
> 
> 
> 
> names.add("Topic-A");
> 
> names.add("Topic-B");
> 
> 
> 
> DataStream < String > stream = env.addSource(new FlinkKafkaConsumer08 < >
(names, new SimpleStringSchema(), properties));
> 
> DataStream < TopicPojo > pojo = stream.map(new Deserializer()).keyBy((eventTime)
-> TopicPojo.getEventTime());
> 
> List < String > where = new ArrayList < String > ();
> 
> AllWindowedStream < String, GlobalWindow > data_window = pojo.flatMap(new Tokenizer()).countWindowAll(2);
> 
> DataStream < String > data_charging = data_window.apply(new MyWindowFunction());
> 
> data_charging.addSink(new SinkFunction < String > () {
> 
> 
> 
> public void invoke(String value) throws Exception {
> 
> 
> 
>   // Yet to be implemented - Merge two POJO into one 
> 
>  }
> 
> });
> 
> 
> 
> try
> 
> {
> 
>  env.execute();
> 
> } catch (Exception e)
> 
> {
> 
>  return;
> 
> }
> 
> }
> 
> }
> 
> class Tokenizer implements FlatMapFunction < TopicPojo, String > {
> 
>  private static final long serialVersionUID = 1 L;
> 
>  @Override
> 
>  public void flatMap(TopicPojo value, Collector < String > out) throws Exception
{
> 
>   ObjectMapper mapper = new ObjectMapper();
> 
>   out.collect(mapper.writeValueAsString(value));
> 
>  }
> 
> }
> 
> class MyWindowFunction implements WindowFunction < TopicPojo, String, String, GlobalWindow
> {
> 
>  @Override
> 
>  public void apply(String key, GlobalWindow window, Iterable < TopicPojo > arg2,
Collector < String > out)
> 
>  throws Exception {
> 
>   int count = 0;
> 
>   for (TopicPojo in : arg2) {
> 
>    count++;
> 
>   }
> 
>   // Test Result - TO be modified
> 
>   out.collect("Window: " + window + "count: " + count);
> 
> 
> 
>  }
> 
> }
> 
> class Deserializer implements MapFunction < String, TopicPojo > {
> 
>  private static final long serialVersionUID = 1 L;
> 
>  @Override
> 
>  public TopicPojo map(String value) throws IOException {
> 
>   // TODO Auto-generated method stub
> 
>   ObjectMapper mapper = new ObjectMapper();
> 
>   TopicPojo obj = null;
> 
>   try {
> 
> 
> 
>    System.out.println(value);
> 
> 
> 
>    obj = mapper.readValue(value, TopicPojo.class);
> 
> 
> 
>   } catch (JsonParseException e) {
> 
> 
> 
>    // TODO Auto-generated catch block
> 
> 
> 
>    throw new IOException("Failed to deserialize JSON object.");
> 
> 
> 
>   } catch (JsonMappingException e) {
> 
> 
> 
>    // TODO Auto-generated catch block
> 
> 
> 
>    throw new IOException("Failed to deserialize JSON object.");
> 
>   } catch (IOException e) {
> 
> 
> 
>    // TODO Auto-generated catch block
> 
> 
> 
>    throw new IOException("Failed to deserialize JSON object.");
> 
>   }
> 
>   return obj;
> 
>  }
> 
> }
> 
> 
> I am getting - The method apply(AllWindowFunction<String,R,GlobalWindow>) in the
type AllWindowedStream<String,GlobalWindow> is not applicable for the arguments (MyWindowFunction)
error.
> 
> Kindly give your input.
> 
> Regards,
> Vijay Raajaa GS 
> 


Mime
View raw message