Return-Path: X-Original-To: apmail-flink-user-archive@minotaur.apache.org Delivered-To: apmail-flink-user-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id D52A118BD9 for ; Wed, 18 Nov 2015 16:28:17 +0000 (UTC) Received: (qmail 76489 invoked by uid 500); 18 Nov 2015 16:28:17 -0000 Delivered-To: apmail-flink-user-archive@flink.apache.org Received: (qmail 76402 invoked by uid 500); 18 Nov 2015 16:28:17 -0000 Mailing-List: contact user-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: user@flink.apache.org Delivered-To: mailing list user@flink.apache.org Received: (qmail 76393 invoked by uid 99); 18 Nov 2015 16:28:17 -0000 Received: from mail-relay.apache.org (HELO mail-relay.apache.org) (140.211.11.15) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 18 Nov 2015 16:28:17 +0000 Received: from mail-yk0-f169.google.com (mail-yk0-f169.google.com [209.85.160.169]) by mail-relay.apache.org (ASF Mail Server at mail-relay.apache.org) with ESMTPSA id 4AD6A1A006D for ; Wed, 18 Nov 2015 16:28:17 +0000 (UTC) Received: by ykfs79 with SMTP id s79so73374196ykf.1 for ; Wed, 18 Nov 2015 08:28:16 -0800 (PST) X-Gm-Message-State: ALoCoQnGm9O0yMpTb4MpUDZCx77BdOqLgRY+eeYULgLNhY7W3Qx4Txm5XIsvfmkmdr0TxFZIdXEl X-Received: by 10.129.108.195 with SMTP id h186mr2796473ywc.75.1447864096373; Wed, 18 Nov 2015 08:28:16 -0800 (PST) MIME-Version: 1.0 Received: by 10.31.61.133 with HTTP; Wed, 18 Nov 2015 08:27:57 -0800 (PST) In-Reply-To: References: From: Maximilian Michels Date: Wed, 18 Nov 2015 17:27:57 +0100 X-Gmail-Original-Message-ID: Message-ID: Subject: Re: Does 'DataStream.writeAsCsv' suppose to work like this? To: "user@flink.apache.org" Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: quoted-printable Yes, that does make sense! Thank you for explaining. Have you made the change yet? I couldn't find it on the master. On Wed, Nov 18, 2015 at 5:16 PM, Stephan Ewen wrote: > That makes sense... > > On Mon, Oct 26, 2015 at 12:31 PM, M=C3=A1rton Balassi > wrote: >> >> Hey Max, >> >> The solution I am proposing is not flushing on every record, but it make= s >> sure to forward the flushing from the sinkfunction to the outputformat >> whenever it is triggered. Practically this means that the buffering is d= one >> (almost) solely in the sink and not in the outputformat any more. >> >> On Mon, Oct 26, 2015 at 10:11 AM, Maximilian Michels >> wrote: >>> >>> Not sure whether we really want to flush at every invoke call. If you >>> want to flush every time, you may want to set the update condition to 0 >>> milliseconds. That way, flush will be called every time. In the API thi= s is >>> exposed by using the FileSinkFunctionByMillis. If you flush every time, >>> performance might degrade. >>> >>> By the way, you may also use the RollingFileSink which splits the outpu= t >>> into several files for each hour/week/day. You can then be sure those f= iles >>> are already completely written to HDFS. >>> >>> Best regards, >>> Max >>> >>> On Mon, Oct 26, 2015 at 8:36 AM, M=C3=A1rton Balassi >>> wrote: >>>> >>>> The problem persists in the current master, simply a format.flush() is >>>> needed here [1]. I'll do a quick hotfix, thanks for the report again! >>>> >>>> [1] >>>> https://github.com/apache/flink/blob/master/flink-streaming-java/src/m= ain/java/org/apache/flink/streaming/api/functions/sink/FileSinkFunction.jav= a#L99 >>>> >>>> On Mon, Oct 26, 2015 at 8:23 AM, M=C3=A1rton Balassi >>>> wrote: >>>>> >>>>> Hey Rex, >>>>> >>>>> Writing half-baked records is definitely unwanted, thanks for spottin= g >>>>> this. Most likely it can be solved by adding a flush at the end of ev= ery >>>>> invoke call, let me check. >>>>> >>>>> Best, >>>>> >>>>> Marton >>>>> >>>>> On Mon, Oct 26, 2015 at 7:56 AM, Rex Ge wrote: >>>>>> >>>>>> Hi, flinkers! >>>>>> >>>>>> I'm new to this whole thing, >>>>>> and it seems to me that ' >>>>>> org.apache.flink.streaming.api.datastream.DataStream.writeAsCsv(Stri= ng, >>>>>> WriteMode, long)' does not work properly. >>>>>> To be specific, data were not flushed by update frequency when write >>>>>> to HDFS. >>>>>> >>>>>> what make it more disturbing is that, if I check the content with >>>>>> 'hdfs dfs -cat xxx', sometimes I got partial records. >>>>>> >>>>>> >>>>>> I did a little digging in flink-0.9.1. >>>>>> And it turns out all >>>>>> 'org.apache.flink.streaming.api.functions.sink.FileSinkFunction.invo= ke(IN)' >>>>>> does >>>>>> is pushing data to >>>>>> 'org.apache.flink.runtime.fs.hdfs.HadoopDataOutputStream' >>>>>> which is a delegate of 'org.apache.hadoop.fs.FSDataOutputStream'. >>>>>> >>>>>> In this scenario, 'org.apache.hadoop.fs.FSDataOutputStream' is never >>>>>> flushed. >>>>>> Which result in data being held in local buffer, and 'hdfs dfs -cat >>>>>> xxx' might return partial records. >>>>>> >>>>>> >>>>>> Does 'DataStream.writeAsCsv' suppose to work like this? Or I messed = up >>>>>> somewhere? >>>>>> >>>>>> >>>>>> Best regards and thanks for your time! >>>>>> >>>>>> Rex >>>>> >>>>> >>>> >>> >> >