flink-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Vinay Patil <vinay18.pa...@gmail.com>
Subject [Discussion] Query regarding Join
Date Mon, 13 Jun 2016 16:53:51 GMT
Hi,

I have a question regarding the join operation, consider the following
dummy example:

StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
DataStreamSource<Integer> sourceStream =
env.fromElements(10,20,23,25,30,33,102,18);
DataStreamSource<Integer> destStream = env.fromElements(20,30,40,50,60,10);

sourceStream.join(destStream)
.where(new ElementSelector())
.equalTo(new ElementSelector())
.window(TumblingEventTimeWindows.of(Time.milliseconds(10)))
.apply(new JoinFunction<Integer, Integer, Integer>() {

private static final long serialVersionUID = 1L;

@Override
public Integer join(Integer paramIN1, Integer paramIN2) throws Exception {
return paramIN1;
}
}).print();

I perfectly get the elements that are matching in both the streams, however
my requirement is to write these matched elements and also the unmatched
elements to sink(S3)

How do I get the unmatched elements from each stream ?

Regards,
Vinay Patil

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message