Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 350FD200BFE for ; Mon, 16 Jan 2017 22:04:15 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id 33AC7160B41; Mon, 16 Jan 2017 21:04:15 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 7D3AE160B28 for ; Mon, 16 Jan 2017 22:04:14 +0100 (CET) Received: (qmail 69751 invoked by uid 500); 16 Jan 2017 21:04:08 -0000 Mailing-List: contact user-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: user@flink.apache.org Delivered-To: mailing list user@flink.apache.org Received: (qmail 69742 invoked by uid 99); 16 Jan 2017 21:04:08 -0000 Received: from mail-relay.apache.org (HELO mail-relay.apache.org) (140.211.11.15) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 16 Jan 2017 21:04:08 +0000 Received: from mail-ot0-f176.google.com (mail-ot0-f176.google.com [74.125.82.176]) by mail-relay.apache.org (ASF Mail Server at mail-relay.apache.org) with ESMTPSA id 437B71A0329 for ; Mon, 16 Jan 2017 21:04:08 +0000 (UTC) Received: by mail-ot0-f176.google.com with SMTP id 65so50389931otq.2 for ; Mon, 16 Jan 2017 13:04:08 -0800 (PST) X-Gm-Message-State: AIkVDXIFc6zmDc/ye3rT64B8w88oPRg0iwa5zBKOamEDbfvNwzAwHtpwYaMhoZ20HWH5DahIl0h02p/nYlRhVThp X-Received: by 10.157.9.238 with SMTP id 43mr18417641otz.124.1484600647603; Mon, 16 Jan 2017 13:04:07 -0800 (PST) MIME-Version: 1.0 Received: by 10.157.46.23 with HTTP; Mon, 16 Jan 2017 13:03:27 -0800 (PST) In-Reply-To: References: From: Ufuk Celebi Date: Mon, 16 Jan 2017 22:03:27 +0100 X-Gmail-Original-Message-ID: Message-ID: Subject: Re: WindowFunction to push data from Kafka to S3 To: user@flink.apache.org Content-Type: text/plain; charset=UTF-8 archived-at: Mon, 16 Jan 2017 21:04:15 -0000 Can you check the log files of the TaskManagers and JobManager? There is no obvious reason that the collection should not work. On another note: the rolling file sink might be what you are looking for. https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/connectors/filesystem_sink.html On Fri, Jan 13, 2017 at 5:48 PM, Samra Kasim wrote: > Hi, > > I am reading messages off a Kafka Topic and want to process the messages > through Flink and save them into S3. It was pointed out to me that stream > processing of the Kafka data won't be saved to S3 because S3 doesn't allow > data to be appended to a file, so I want to convert the Kafka stream into > batches and save them to S3. Based on other user questions/answers, it looks > like this is possible using windowing by breaking the stream into batches > and creating files. I have written the following code, but it doesn't work > and I am not getting any errors either. I have a sys.out that shows the > tuple is being processed, but it might not be emitted in the out.collect. > Can someone help me figure out what may be the issue? Thanks! > > public class S3Sink { > > public static void main(String[] args) throws Exception { > > Map configs = > ConfigUtils.loadConfigs("/Users/path/to/configs.yaml"); > > > > final ParameterTool parameterTool = ParameterTool.fromMap(configs); > > > > StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > > env.getConfig().disableSysoutLogging(); > > env.getConfig().setGlobalJobParameters(parameterTool); > > > > DataStream messageStream = env > > .addSource(new > FlinkKafkaConsumer09(parameterTool.get("kafka.topic"), > > new SimpleStringSchema(), > > parameterTool.getProperties())); > > > > String uuid = UUID.randomUUID().toString(); > > > > DataStreamSink tuple2DataStream = messageStream > > .flatMap(new Tupler()) > > .keyBy(0) > > .window(TumblingProcessingTimeWindows.of(Time.seconds(5))) > > .apply(new MyWindowFunction()) > > .writeAsText("s3://flink-test/flink-output-stream/"+ uuid + > "testdoc.txt"); > > > env.execute(); > > } > > > > private static class Tupler implements FlatMapFunction Tuple2> { > > @Override > > public void flatMap(String record, Collector> > out) throws Exception { > > out.collect(new Tuple2("record",record)); > > } > > } > > > > private static class MyWindowFunction implements > WindowFunction, Tuple2, Tuple, > TimeWindow>{ > > > > @Override > > public void apply(Tuple key, TimeWindow timeWindow, > Iterable> input, > > Collector> out) throws > Exception { > > for (Tuple2 in: input){ > > System.out.println(in); > > out.collect(in); > > } > > } > > } > > } > > > -- > > Thanks, > > Sam