flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Chen Qin <qinnc...@gmail.com>
Subject Re: some question about side output
Date Thu, 07 Sep 2017 05:54:03 GMT
Hi Qingxiang,

getSideOuput is only available in SingleOutputOperator class. You might
consider update your DataStream<...> window to SingleOutputOperator and it
should works fine.

Thanks,
Chen

Code sample attached handy.

import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;

import javax.annotation.Nullable;

public class SideOutputExample {

    public static void main(String argv[]) throws Exception {
        StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

        final OutputTag<Tuple2<String, Long>> lateTag = new
OutputTag<Tuple2<String, Long>>("tag"){};

        SingleOutputStreamOperator output = env.addSource(new
SourceFunction<Tuple2<String, Long>>() {
            public void run(SourceContext<Tuple2<String, Long>>
sourceContext) throws Exception {
                // emit three events
                synchronized (sourceContext.getCheckpointLock()) {
                    sourceContext.collect(new Tuple2<String, Long>("a", 0l));
                    sourceContext.collect(new Tuple2<String, Long>("c", 2l));
                    sourceContext.collect(new Tuple2<String, Long>("b", 1l));
                }
            }

            public void cancel() {
                //SKIP
            }
        }).assignTimestampsAndWatermarks(new
AssignerWithPunctuatedWatermarks<Tuple2<String, Long>>() {
            @Nullable
            public Watermark checkAndGetNextWatermark(Tuple2<String,
Long> stringLongTuple2, long l) {
                return new Watermark(stringLongTuple2.f1);
            }

            public long extractTimestamp(Tuple2<String, Long>
stringLongTuple2, long l) {
                return stringLongTuple2.f1;
            }
        }).timeWindowAll(Time.milliseconds(1)).sideOutputLateData(lateTag)

                .apply(new AllWindowFunction<Tuple2<String,Long>,
String, TimeWindow>() {
            public void apply(TimeWindow timeWindow,
Iterable<Tuple2<String, Long>> iterable, Collector<String> collector)
throws Exception {
                for(Tuple2<String, Long> it : iterable) {
                    collector.collect(it.f0);
                }
            }
        });

        //print on time event
        output.print();

        // print late arriving event
        output.getSideOutput(lateTag).print();

        env.execute();
    }
}


On Wed, Sep 6, 2017 at 6:41 AM, 马庆祥 <xymaqingxiang777@gmail.com> wrote:

> Hi, all:
>
> Using Flink’s side output
> <https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/stream/side_output.html>
feature
> we can get a stream of the data that was discarded as late.But when i use
> the getSideOutput() method, i have the following error message:
>
> this is the link address of document:
> https://ci.apache.org/projects/flink/flink-docs-release-1.3/
> dev/windows.html#getting-late-data-as-a-side-output
>
> [image: 内嵌图片 2]
>
> Note: the code is as follows:
> [image: 内嵌图片 1]
>
> Thanks & Regards
> Qingxiang Ma
>
>

Mime
View raw message