beam-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Raghu Angadi <rang...@google.com>
Subject Re: Write bulks files from streaming app
Date Fri, 20 Jul 2018 19:06:33 GMT
On Fri, Jul 20, 2018 at 2:58 AM Jozef Vilcek <jozo.vilcek@gmail.com> wrote:

> Hm, that is interesting idea to make the write composite and merge files
> later. Do not know Beam well yet.
> I will look into it and learn about Wait.on() transform (wonder how it
> will work with late fires). Thanks!
>
> But keeps me thinking...
> Does it make sense to have support from SDK?
> Is my use case that uncommon? Not fit for Beam? How does others out there
> does similar thing?
>

SDK does allow it. Looks like you are running into scaling and memory
limits with amount of state stored in large windows. This is something that
will improve. I am not familiar enough with Flink runner to  comment on
specifics. I was mainly thinking of a work around.

Raghu.


>
> On Thu, Jul 19, 2018 at 11:21 PM Raghu Angadi <rangadi@google.com> wrote:
>
>> One option (but requires more code): Write to smaller files with frequent
>> triggers to directory_X and once the window properly closes, copy all the
>> files to a single file in your own DoFn. This is certainly more code on
>> your part, but might be worth it. You can use Wait.on() transoform to run
>> your finalizer DoFn right after the window that writes smaller files closes.
>>
>>
>> On Thu, Jul 19, 2018 at 2:43 AM Jozef Vilcek <jozo.vilcek@gmail.com>
>> wrote:
>>
>>> Hey,
>>>
>>> I am looking for the advice.
>>>
>>> I am trying to do a stream processing with Beam on Flink runtime.
>>> Reading data from Kafka, doing some processing with it which is not
>>> important here and in the same time want to store consumed data to history
>>> storage for archive and reprocessing, which is HDFS.
>>>
>>> Now, the part of writing batches to HDFS is giving me hard time.
>>> Logically, I want to do:
>>>
>>> fileIO = FileIO.writeDynamic()
>>>         .by(destinationFn)
>>>         .via(AvroIO.sink(avroClass))
>>>         .to(path)
>>>         .withNaming(namingFn)
>>>         .withTempDirectory(tmp)
>>>         .withNumShards(shards)
>>>
>>> data
>>>    .withFixedWindow(1H, afterWatermarkTrigger, discardFiredPanes)
>>>    .saveTo(fileIO)
>>>
>>>
>>> This write generates in Flink execution graph 3 operators, which I do
>>> not full understand yet.
>>>
>>> Now, the problem is, that I am not able to run this at scale.
>>>
>>> If I want to write big enough files to not to have lots of files on
>>> HDFS, I keep running into the OOM. With Flink, I use rocksdb state backend
>>> and I was warned about this JIRA which is probably related to my OOM
>>> https://issues.apache.org/jira/browse/FLINK-8297
>>> Therefore, I need to trigger more often and small batches which leads to
>>> too many files on HDFS.
>>>
>>> Question here is, if there is some path I do not see how to make this
>>> work ( write bulks of data to HDFS of my choosing without running to memory
>>> troubles ). Also, keeping whole window data which is designated for write
>>> to output to filesystem in state involves more IO.
>>>
>>> Thanks for any thoughts and guidelines,
>>> Jozef
>>>
>>>

Mime
View raw message