flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Aljoscha Krettek <aljos...@apache.org>
Subject Re: Window Function on AllWindowed Stream - Combining Kafka Topics
Date Wed, 03 May 2017 09:58:36 GMT
The approach could work, but if it can happen that an event from stream A is not matched by
an event in stream B you will have lingering state that never goes away. For such cases it
might be better to write a custom CoProcessFunction as sketched here: https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/stream/process_function.html
<https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/stream/process_function.html>.

The idea is to keep events from each side in state and emit a result when you get the event
from the other side. You also set a cleanup timer in case no other event arrives to make sure
that state eventually goes away.

Best,
Aljoscha
> On 3. May 2017, at 11:47, G.S.Vijay Raajaa <gsvijayraajaa@gmail.com> wrote:
> 
> Sure. Thanks for the pointer, let me reorder the same. Any comments about the approach
followed for merging topics and creating a single JSON?
> 
> Regards,
> Vijay Raajaa G S
> 
> On Wed, May 3, 2017 at 2:41 PM, Aljoscha Krettek <aljoscha@apache.org <mailto:aljoscha@apache.org>>
wrote:
> Hi,
> An AllWindow operator requires an AllWindowFunction, instead of a WindowFunction. In
your case, the keyBy() seems to be in the wrong place, to get a keyed window you have to write
something akin to:
> 
> inputStream
>   .keyBy(…)
>   .window(…)
>   .apply(…) // or reduce()
> 
> In your case, you key the stream and then the keying is “lost” again because you
apply a flatMap(). That’s why you have an all-window and not a keyed window.
> 
> Best,
> Aljoscha
> 
>> On 2. May 2017, at 09:20, G.S.Vijay Raajaa <gsvijayraajaa@gmail.com <mailto:gsvijayraajaa@gmail.com>>
wrote:
>> 
>> Hi,
>> 
>> I am trying to combine two kafka topics using the a single kafka consumer on a list
of topics, further convert the json string in the stream to POJO. Then, join them via keyBy
( On event time field ) and to merge them as a single fat json, I was planning to use a window
stream and apply a window function on the window stream. The assumption is that Topic-A &
Topic-B can be joined on Event Time and only one pair ( Topic A ( JSON ) , Topic B (JSON )
will be present with the same eventTime. Hence was planning to use a coutWindow(2) post keyBy
on eventTime.
>> 
>> I have couple of questions for the same;
>> 
>> 1. Is the approach fine for merging topics and creating a single JSON?
>> 2. The window function on All Window stream doesnt seem to work fine; Any pointers
will be greatly appreciated.
>> 
>> Code Snippet : 
>> StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
>> 
>> logger.info <http://logger.info/>("Flink Stream Window Charger has started");
>> 
>> Properties properties = new Properties();
>> 
>> properties.setProperty("bootstrap.servers", "127.0.0.1:1030 <http://127.0.0.1:1030/>");
>> 
>> properties.setProperty("zookeeper.connect", "127.0.0.1:2181/service-kafka <http://127.0.0.1:2181/service-kafka>");
>> 
>> properties.setProperty("group.id <http://group.id/>", "group-0011");
>> 
>> properties.setProperty("auto.offset.reset", "smallest");
>> 
>> 
>> 
>> List < String > names = new ArrayList < > ();
>> 
>> 
>> 
>> names.add("Topic-A");
>> 
>> names.add("Topic-B");
>> 
>> 
>> 
>> DataStream < String > stream = env.addSource(new FlinkKafkaConsumer08 <
> (names, new SimpleStringSchema(), properties));
>> 
>> DataStream < TopicPojo > pojo = stream.map(new Deserializer()).keyBy((eventTime)
-> TopicPojo.getEventTime());
>> 
>> List < String > where = new ArrayList < String > ();
>> 
>> AllWindowedStream < String, GlobalWindow > data_window = pojo.flatMap(new Tokenizer()).countWindowAll(2);
>> 
>> DataStream < String > data_charging = data_window.apply(new MyWindowFunction());
>> 
>> data_charging.addSink(new SinkFunction < String > () {
>> 
>> 
>> 
>> public void invoke(String value) throws Exception {
>> 
>> 
>> 
>>   // Yet to be implemented - Merge two POJO into one 
>> 
>>  }
>> 
>> });
>> 
>> 
>> 
>> try
>> 
>> {
>> 
>>  env.execute();
>> 
>> } catch (Exception e)
>> 
>> {
>> 
>>  return;
>> 
>> }
>> 
>> }
>> 
>> }
>> 
>> class Tokenizer implements FlatMapFunction < TopicPojo, String > {
>> 
>>  private static final long serialVersionUID = 1 L;
>> 
>>  @Override
>> 
>>  public void flatMap(TopicPojo value, Collector < String > out) throws Exception
{
>> 
>>   ObjectMapper mapper = new ObjectMapper();
>> 
>>   out.collect(mapper.writeValueAsString(value));
>> 
>>  }
>> 
>> }
>> 
>> class MyWindowFunction implements WindowFunction < TopicPojo, String, String,
GlobalWindow > {
>> 
>>  @Override
>> 
>>  public void apply(String key, GlobalWindow window, Iterable < TopicPojo >
arg2, Collector < String > out)
>> 
>>  throws Exception {
>> 
>>   int count = 0;
>> 
>>   for (TopicPojo in : arg2) {
>> 
>>    count++;
>> 
>>   }
>> 
>>   // Test Result - TO be modified
>> 
>>   out.collect("Window: " + window + "count: " + count);
>> 
>> 
>> 
>>  }
>> 
>> }
>> 
>> class Deserializer implements MapFunction < String, TopicPojo > {
>> 
>>  private static final long serialVersionUID = 1 L;
>> 
>>  @Override
>> 
>>  public TopicPojo map(String value) throws IOException {
>> 
>>   // TODO Auto-generated method stub
>> 
>>   ObjectMapper mapper = new ObjectMapper();
>> 
>>   TopicPojo obj = null;
>> 
>>   try {
>> 
>> 
>> 
>>    System.out.println(value);
>> 
>> 
>> 
>>    obj = mapper.readValue(value, TopicPojo.class);
>> 
>> 
>> 
>>   } catch (JsonParseException e) {
>> 
>> 
>> 
>>    // TODO Auto-generated catch block
>> 
>> 
>> 
>>    throw new IOException("Failed to deserialize JSON object.");
>> 
>> 
>> 
>>   } catch (JsonMappingException e) {
>> 
>> 
>> 
>>    // TODO Auto-generated catch block
>> 
>> 
>> 
>>    throw new IOException("Failed to deserialize JSON object.");
>> 
>>   } catch (IOException e) {
>> 
>> 
>> 
>>    // TODO Auto-generated catch block
>> 
>> 
>> 
>>    throw new IOException("Failed to deserialize JSON object.");
>> 
>>   }
>> 
>>   return obj;
>> 
>>  }
>> 
>> }
>> 
>> 
>> I am getting - The method apply(AllWindowFunction<String,R,GlobalWindow>) in
the type AllWindowedStream<String,GlobalWindow> is not applicable for the arguments
(MyWindowFunction) error.
>> 
>> Kindly give your input.
>> 
>> Regards,
>> Vijay Raajaa GS 
>> 
> 
> 


Mime
View raw message