flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Maximilian Michels <...@apache.org>
Subject Re: Does 'DataStream.writeAsCsv' suppose to work like this?
Date Wed, 18 Nov 2015 16:27:57 GMT
Yes, that does make sense! Thank you for explaining. Have you made the
change yet? I couldn't find it on the master.

On Wed, Nov 18, 2015 at 5:16 PM, Stephan Ewen <sewen@apache.org> wrote:
> That makes sense...
>
> On Mon, Oct 26, 2015 at 12:31 PM, Márton Balassi <balassi.marton@gmail.com>
> wrote:
>>
>> Hey Max,
>>
>> The solution I am proposing is not flushing on every record, but it makes
>> sure to forward the flushing from the sinkfunction to the outputformat
>> whenever it is triggered. Practically this means that the buffering is done
>> (almost) solely in the sink and not in the outputformat any more.
>>
>> On Mon, Oct 26, 2015 at 10:11 AM, Maximilian Michels <mxm@apache.org>
>> wrote:
>>>
>>> Not sure whether we really want to flush at every invoke call. If you
>>> want to flush every time, you may want to set the update condition to 0
>>> milliseconds. That way, flush will be called every time. In the API this is
>>> exposed by using the FileSinkFunctionByMillis. If you flush every time,
>>> performance might degrade.
>>>
>>> By the way, you may also use the RollingFileSink which splits the output
>>> into several files for each hour/week/day. You can then be sure those files
>>> are already completely written to HDFS.
>>>
>>> Best regards,
>>> Max
>>>
>>> On Mon, Oct 26, 2015 at 8:36 AM, Márton Balassi
>>> <balassi.marton@gmail.com> wrote:
>>>>
>>>> The problem persists in the current master, simply a format.flush() is
>>>> needed here [1]. I'll do a quick hotfix, thanks for the report again!
>>>>
>>>> [1]
>>>> https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/FileSinkFunction.java#L99
>>>>
>>>> On Mon, Oct 26, 2015 at 8:23 AM, Márton Balassi
>>>> <balassi.marton@gmail.com> wrote:
>>>>>
>>>>> Hey Rex,
>>>>>
>>>>> Writing half-baked records is definitely unwanted, thanks for spotting
>>>>> this. Most likely it can be solved by adding a flush at the end of every
>>>>> invoke call, let me check.
>>>>>
>>>>> Best,
>>>>>
>>>>> Marton
>>>>>
>>>>> On Mon, Oct 26, 2015 at 7:56 AM, Rex Ge <lungothrin@gmail.com>
wrote:
>>>>>>
>>>>>> Hi, flinkers!
>>>>>>
>>>>>> I'm new to this whole thing,
>>>>>> and it seems to me that '
>>>>>> org.apache.flink.streaming.api.datastream.DataStream.writeAsCsv(String,
>>>>>> WriteMode, long)' does not work properly.
>>>>>> To be specific, data were not flushed by update frequency when write
>>>>>> to HDFS.
>>>>>>
>>>>>> what make it more disturbing is that, if I check the content with
>>>>>> 'hdfs dfs -cat xxx', sometimes I got partial records.
>>>>>>
>>>>>>
>>>>>> I did a little digging in flink-0.9.1.
>>>>>> And it turns out all
>>>>>> 'org.apache.flink.streaming.api.functions.sink.FileSinkFunction.invoke(IN)'
>>>>>> does
>>>>>> is pushing data to
>>>>>> 'org.apache.flink.runtime.fs.hdfs.HadoopDataOutputStream'
>>>>>> which is a delegate of  'org.apache.hadoop.fs.FSDataOutputStream'.
>>>>>>
>>>>>> In this scenario, 'org.apache.hadoop.fs.FSDataOutputStream' is never
>>>>>> flushed.
>>>>>> Which result in data being held in local buffer, and 'hdfs dfs -cat
>>>>>> xxx' might return partial records.
>>>>>>
>>>>>>
>>>>>> Does 'DataStream.writeAsCsv' suppose to work like this? Or I messed
up
>>>>>> somewhere?
>>>>>>
>>>>>>
>>>>>> Best regards and thanks for your time!
>>>>>>
>>>>>> Rex
>>>>>
>>>>>
>>>>
>>>
>>
>

Mime
View raw message