flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Ufuk Celebi <...@apache.org>
Subject Re: WindowFunction to push data from Kafka to S3
Date Mon, 16 Jan 2017 21:03:27 GMT
Can you check the log files of the TaskManagers and JobManager?

There is no obvious reason that the collection should not work.

On another note: the rolling file sink might be what you are looking for.

https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/connectors/filesystem_sink.html

On Fri, Jan 13, 2017 at 5:48 PM, Samra Kasim
<samra.kasim@thehumangeo.com> wrote:
> Hi,
>
> I am reading messages off a Kafka Topic and want to process the messages
> through Flink and save them into S3. It was pointed out to me that stream
> processing of the Kafka data won't be saved to S3 because S3 doesn't allow
> data to be appended to a file, so I want to convert the Kafka stream into
> batches and save them to S3. Based on other user questions/answers, it looks
> like this is possible using windowing by breaking the stream into batches
> and creating files. I have written the following code, but it doesn't work
> and I am not getting any errors either. I have a sys.out that shows the
> tuple is being processed, but it might not be emitted in the out.collect.
> Can someone help me figure out what may be the issue? Thanks!
>
> public class S3Sink {
>
>     public static void main(String[] args) throws Exception {
>
>         Map<String, String> configs =
> ConfigUtils.loadConfigs("/Users/path/to/configs.yaml");
>
>
>
>         final ParameterTool parameterTool = ParameterTool.fromMap(configs);
>
>
>
> StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
>
>         env.getConfig().disableSysoutLogging();
>
>         env.getConfig().setGlobalJobParameters(parameterTool);
>
>
>
>         DataStream messageStream = env
>
>                 .addSource(new
> FlinkKafkaConsumer09<String>(parameterTool.get("kafka.topic"),
>
>                         new SimpleStringSchema(),
>
>                         parameterTool.getProperties()));
>
>
>
>         String uuid = UUID.randomUUID().toString();
>
>
>
>         DataStreamSink tuple2DataStream = messageStream
>
>                 .flatMap(new Tupler())
>
>                 .keyBy(0)
>
>               .window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
>
>                 .apply(new MyWindowFunction())
>
>                 .writeAsText("s3://flink-test/flink-output-stream/"+ uuid +
> "testdoc.txt");
>
>
>         env.execute();
>
>     }
>
>
>
>     private static class Tupler implements FlatMapFunction<String,
> Tuple2<String, String>> {
>
>         @Override
>
>         public void flatMap(String record, Collector<Tuple2<String, String>>
> out) throws Exception {
>
>         out.collect(new Tuple2<String, String>("record",record));
>
>         }
>
>     }
>
>
>
>     private static class MyWindowFunction implements
> WindowFunction<Tuple2<String, String>, Tuple2<String, String>, Tuple,
> TimeWindow>{
>
>
>
>         @Override
>
>         public void apply(Tuple key, TimeWindow timeWindow,
> Iterable<Tuple2<String, String>> input,
>
>                           Collector<Tuple2<String, String>> out) throws
> Exception {
>
>             for (Tuple2<String, String> in: input){
>
>                 System.out.println(in);
>
>                 out.collect(in);
>
>             }
>
>         }
>
>     }
>
> }
>
>
> --
>
> Thanks,
>
> Sam

Mime
View raw message